Skip to content

Commit

Permalink
[HUDI-7346] Remove usage of org.apache.hadoop.hbase.util.Bytes (#10574)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored Jan 29, 2024
1 parent 565e7c5 commit e9389ff
Show file tree
Hide file tree
Showing 31 changed files with 221 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -55,6 +54,7 @@
import java.util.Map;

import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -285,6 +285,6 @@ private String getFileContent(String fileToReadStr) throws IOException {
byte[] data = new byte[(int) fileToRead.length()];
fis.read(data);
fis.close();
return new String(data, StandardCharsets.UTF_8);
return fromUTF8Bytes(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -96,6 +95,8 @@
import static org.apache.hadoop.hbase.security.SecurityConstants.REGIONSERVER_KRB_PRINCIPAL;
import static org.apache.hadoop.hbase.security.User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY;
import static org.apache.hadoop.hbase.security.User.HBASE_SECURITY_CONF_KEY;
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* Hoodie Index implementation backed by HBase.
Expand All @@ -107,10 +108,10 @@ public class SparkHoodieHBaseIndex extends HoodieIndex<Object, Object> {
public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME =
"spark.dynamicAllocation.maxExecutors";

private static final byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes("_s");
private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
private static final byte[] SYSTEM_COLUMN_FAMILY = getUTF8Bytes("_s");
private static final byte[] COMMIT_TS_COLUMN = getUTF8Bytes("commit_ts");
private static final byte[] FILE_NAME_COLUMN = getUTF8Bytes("file_name");
private static final byte[] PARTITION_PATH_COLUMN = getUTF8Bytes("partition_path");

private static final Logger LOG = LoggerFactory.getLogger(SparkHoodieHBaseIndex.class);
private static Connection hbaseConnection = null;
Expand Down Expand Up @@ -217,7 +218,7 @@ public void close() {
}

private Get generateStatement(String key) throws IOException {
return new Get(Bytes.toBytes(getHBaseKey(key))).readVersions(1).addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
return new Get(getUTF8Bytes(getHBaseKey(key))).readVersions(1).addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN);
}

Expand Down Expand Up @@ -272,10 +273,10 @@ private <R> Function2<Integer, Iterator<HoodieRecord<R>>, Iterator<HoodieRecord<
taggedRecords.add(currentRecord);
continue;
}
String keyFromResult = Bytes.toString(result.getRow());
String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
String keyFromResult = fromUTF8Bytes(result.getRow());
String commitTs = fromUTF8Bytes(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
String fileId = fromUTF8Bytes(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
String partitionPath = fromUTF8Bytes(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
if (!HoodieIndexUtils.checkIfValidCommit(completedCommitsTimeline, commitTs)) {
// if commit is invalid, treat this as a new taggedRecord
taggedRecords.add(currentRecord);
Expand Down Expand Up @@ -369,14 +370,14 @@ private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateL
// This is an update, no need to update index
continue;
}
Put put = new Put(Bytes.toBytes(getHBaseKey(recordDelegate.getRecordKey())));
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime()));
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId()));
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(recordDelegate.getPartitionPath()));
Put put = new Put(getUTF8Bytes(getHBaseKey(recordDelegate.getRecordKey())));
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, getUTF8Bytes(loc.get().getInstantTime()));
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, getUTF8Bytes(loc.get().getFileId()));
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, getUTF8Bytes(recordDelegate.getPartitionPath()));
mutations.add(put);
} else {
// Delete existing index for a deleted record
Delete delete = new Delete(Bytes.toBytes(getHBaseKey(recordDelegate.getRecordKey())));
Delete delete = new Delete(getUTF8Bytes(getHBaseKey(recordDelegate.getRecordKey())));
mutations.add(delete);
}
}
Expand Down Expand Up @@ -616,7 +617,7 @@ public boolean rollbackCommit(String instantTime) {
while (scannerIterator.hasNext()) {
Result result = scannerIterator.next();
currentVersionResults.add(result);
statements.add(generateStatement(Bytes.toString(result.getRow()), 0L, rollbackTime - 1));
statements.add(generateStatement(fromUTF8Bytes(result.getRow()), 0L, rollbackTime - 1));

if (scannerIterator.hasNext() && statements.size() < multiGetBatchSize) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -84,6 +83,7 @@
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_CLIENT_PORT;
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_QUORUM;
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -124,7 +124,7 @@ public static void init() throws Exception {
utility = new HBaseTestingUtility(hbaseConfig);
utility.startMiniCluster();
hbaseConfig = utility.getConnection().getConfiguration();
utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"),2);
utility.createTable(TableName.valueOf(TABLE_NAME), getUTF8Bytes("_s"), 2);
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;

import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;


Expand Down Expand Up @@ -81,7 +81,7 @@ private Schema getSchema(byte[] schemaBytes) {
if (schemaCache.containsKey(schemaByteBuffer)) {
return schemaCache.get(schemaByteBuffer);
} else {
String schema = new String(schemaBytes, StandardCharsets.UTF_8);
String schema = fromUTF8Bytes(schemaBytes);
Schema parsedSchema = new Schema.Parser().parse(schema);
schemaCache.put(schemaByteBuffer, parsedSchema);
return parsedSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,7 +99,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {

// Additional Metadata written to HFiles.
public static final String INDEX_INFO_KEY_STRING = "INDEX_INFO";
public static final byte[] INDEX_INFO_KEY = Bytes.toBytes(INDEX_INFO_KEY_STRING);
public static final byte[] INDEX_INFO_KEY = getUTF8Bytes(INDEX_INFO_KEY_STRING);

private final boolean isPresent;

Expand Down Expand Up @@ -513,11 +512,11 @@ private <T> List<T> getAllKeys(HFileScanner scanner, Function<String, T> convert
@Override
public List<BootstrapFileMapping> getSourceFileMappingForPartition(String partition) {
try (HFileScanner scanner = partitionIndexReader().getScanner(true, false)) {
KeyValue keyValue = new KeyValue(Bytes.toBytes(getPartitionKey(partition)), new byte[0], new byte[0],
KeyValue keyValue = new KeyValue(getUTF8Bytes(getPartitionKey(partition)), new byte[0], new byte[0],
HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, new byte[0]);
if (scanner.seekTo(keyValue) == 0) {
ByteBuffer readValue = scanner.getValue();
byte[] valBytes = Bytes.toBytes(readValue);
byte[] valBytes = IOUtils.toBytes(readValue);
HoodieBootstrapPartitionMetadata metadata =
TimelineMetadataUtils.deserializeAvroMetadata(valBytes, HoodieBootstrapPartitionMetadata.class);
return metadata.getFileIdToBootstrapFile().entrySet().stream()
Expand Down Expand Up @@ -546,11 +545,11 @@ public Map<HoodieFileGroupId, BootstrapFileMapping> getSourceFileMappingForFileI
Collections.sort(fileGroupIds);
try (HFileScanner scanner = fileIdIndexReader().getScanner(true, false)) {
for (HoodieFileGroupId fileGroupId : fileGroupIds) {
KeyValue keyValue = new KeyValue(Bytes.toBytes(getFileGroupKey(fileGroupId)), new byte[0], new byte[0],
KeyValue keyValue = new KeyValue(getUTF8Bytes(getFileGroupKey(fileGroupId)), new byte[0], new byte[0],
HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, new byte[0]);
if (scanner.seekTo(keyValue) == 0) {
ByteBuffer readValue = scanner.getValue();
byte[] valBytes = Bytes.toBytes(readValue);
byte[] valBytes = IOUtils.toBytes(readValue);
HoodieBootstrapFilePartitionInfo fileInfo = TimelineMetadataUtils.deserializeAvroMetadata(valBytes,
HoodieBootstrapFilePartitionInfo.class);
BootstrapFileMapping mapping = new BootstrapFileMapping(bootstrapBasePath,
Expand Down Expand Up @@ -639,7 +638,7 @@ private void writeNextPartition(String partitionPath, String bootstrapPartitionP
Option<byte[]> bytes = TimelineMetadataUtils.serializeAvroMetadata(bootstrapPartitionMetadata, HoodieBootstrapPartitionMetadata.class);
if (bytes.isPresent()) {
indexByPartitionWriter
.append(new KeyValue(Bytes.toBytes(getPartitionKey(partitionPath)), new byte[0], new byte[0],
.append(new KeyValue(getUTF8Bytes(getPartitionKey(partitionPath)), new byte[0], new byte[0],
HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, bytes.get()));
numPartitionKeysAdded++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -48,6 +47,7 @@

import static org.apache.hudi.common.table.timeline.MetadataConversionUtils.convertCommitMetadataToJsonBytes;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata;
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;

/**
* All the metadata that gets stored along with a commit.
Expand Down Expand Up @@ -249,9 +249,8 @@ public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Except
// TODO: refactor this method to avoid doing the json tree walking (HUDI-4822).
public static Option<Pair<String, List<String>>> getFileSliceForFileGroupFromDeltaCommit(byte[] bytes, HoodieFileGroupId fileGroupId) {
try {
String jsonStr = new String(
convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieCommitMetadata.class),
StandardCharsets.UTF_8);
String jsonStr = fromUTF8Bytes(
convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieCommitMetadata.class));
if (jsonStr.isEmpty()) {
return Option.empty();
}
Expand Down Expand Up @@ -517,9 +516,8 @@ public static <T> T fromBytes(byte[] bytes, Class<T> clazz) throws IOException {
return clazz.newInstance();
}
return fromJsonString(
new String(
convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieCommitMetadata.class),
StandardCharsets.UTF_8),
fromUTF8Bytes(
convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieCommitMetadata.class)),
clazz);
} catch (Exception e) {
throw new IOException("unable to read commit metadata for bytes length: " + bytes.length, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
Expand Down Expand Up @@ -159,7 +159,7 @@ public byte[] toBytes() throws IOException {

public static HoodieConsistentHashingMetadata fromBytes(byte[] bytes) throws IOException {
try {
return fromJsonString(new String(bytes, StandardCharsets.UTF_8), HoodieConsistentHashingMetadata.class);
return fromJsonString(fromUTF8Bytes(bytes), HoodieConsistentHashingMetadata.class);
} catch (Exception e) {
throw new IOException("unable to read hashing metadata", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.hudi.common.table.timeline.MetadataConversionUtils.convertCommitMetadataToJsonBytes;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeReplaceCommitMetadata;
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;

/**
* All the metadata that gets stored along with a commit.
Expand Down Expand Up @@ -117,9 +117,8 @@ public static <T> T fromBytes(byte[] bytes, Class<T> clazz) throws IOException {
return clazz.newInstance();
}
return fromJsonString(
new String(
convertCommitMetadataToJsonBytes(deserializeReplaceCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.class),
StandardCharsets.UTF_8),
fromUTF8Bytes(
convertCommitMetadataToJsonBytes(deserializeReplaceCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.class)),
clazz);
} catch (Exception e) {
throw new IOException("unable to read commit metadata for bytes length: " + bytes.length, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;

import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;

/**
* Provides support for seamlessly applying changes captured via Debezium for PostgresDB.
* <p>
Expand Down Expand Up @@ -141,7 +142,7 @@ private boolean containsBytesToastedValues(IndexedRecord incomingRecord, Schema.
|| (field.schema().getType() == Schema.Type.UNION && field.schema().getTypes().stream().anyMatch(s -> s.getType() == Schema.Type.BYTES)))
// Check length first as an optimization
&& ((ByteBuffer) ((GenericData.Record) incomingRecord).get(field.name())).array().length == DEBEZIUM_TOASTED_VALUE.length()
&& DEBEZIUM_TOASTED_VALUE.equals(new String(((ByteBuffer) ((GenericData.Record) incomingRecord).get(field.name())).array(), StandardCharsets.UTF_8)));
&& DEBEZIUM_TOASTED_VALUE.equals(fromUTF8Bytes(((ByteBuffer) ((GenericData.Record) incomingRecord).get(field.name())).array())));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -84,6 +83,7 @@

import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* <code>HoodieTableMetaClient</code> allows to access meta-data about a hoodie table It returns meta-data about
Expand Down Expand Up @@ -212,8 +212,7 @@ public void buildFunctionalIndexDefinition(String indexMetaPath,
functionalIndexMetadata = Option.of(new HoodieFunctionalIndexMetadata(Collections.singletonMap(indexName, functionalIndexDefinition)));
}
try {
//fs.mkdirs(new Path(indexMetaPath).getParent());
FileIOUtils.createFileInPath(fs, new Path(indexMetaPath), Option.of(functionalIndexMetadata.get().toJson().getBytes(StandardCharsets.UTF_8)));
FileIOUtils.createFileInPath(fs, new Path(indexMetaPath), Option.of(getUTF8Bytes(functionalIndexMetadata.get().toJson())));
} catch (IOException e) {
throw new HoodieIOException("Could not write functional index metadata at path: " + indexMetaPath, e);
}
Expand Down Expand Up @@ -241,7 +240,7 @@ public void updateFunctionalIndexMetadata(HoodieFunctionalIndexMetadata newFunct
this.functionalIndexMetadata = Option.of(newFunctionalIndexMetadata);
try {
// update the index metadata file as well
FileIOUtils.createFileInPath(fs, new Path(indexMetaPath), Option.of(functionalIndexMetadata.get().toJson().getBytes(StandardCharsets.UTF_8)));
FileIOUtils.createFileInPath(fs, new Path(indexMetaPath), Option.of(getUTF8Bytes(functionalIndexMetadata.get().toJson())));
} catch (IOException e) {
throw new HoodieIOException("Could not write functional index metadata at path: " + indexMetaPath, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.hadoop.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.hadoop.fs.TimedFSDataInputStream;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.io.util.IOUtils;
import org.apache.hudi.storage.StorageSchemes;

import org.apache.avro.Schema;
Expand All @@ -49,7 +50,6 @@
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -332,7 +332,7 @@ private long scanForNextAvailableBlockOffset() throws IOException {
} catch (EOFException e) {
eof = true;
}
long pos = Bytes.indexOf(dataBuf, HoodieLogFormat.MAGIC);
long pos = IOUtils.indexOf(dataBuf, HoodieLogFormat.MAGIC);
if (pos >= 0) {
return currentPos + pos;
}
Expand Down
Loading

0 comments on commit e9389ff

Please sign in to comment.