Skip to content

Commit

Permalink
HIVE-28601: Leverage configurable getPartitions API in HMS to decreas…
Browse files Browse the repository at this point in the history
…e memory footprint in HS2
  • Loading branch information
Araika committed Jan 16, 2025
1 parent a057684 commit c2b7308
Show file tree
Hide file tree
Showing 17 changed files with 633 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
Expand All @@ -33,6 +36,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
Expand Down Expand Up @@ -89,7 +93,18 @@ List<Path> getLocations(Hive db, Partition partition, Table table) throws HiveEx
List<Path> locations = new ArrayList<Path>();
if (table.isPartitioned()) {
if (partition == null) {
for (Partition currPartition : db.getPartitions(table)) {
List<Partition> partitions;
GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectFieldList(Arrays.asList("sd.location")).build();
GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(), getProjectionsSpec, null);
request.setCatName(table.getCatName());
try{
partitions = db.getPartitionsWithSpecs(table, request);
} catch (Exception e){
throw new HiveException(e);
}

for (Partition currPartition : partitions) {
if (currPartition.getLocation() != null) {
locations.add(new Path(currPartition.getLocation()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
Expand Down Expand Up @@ -119,6 +121,27 @@ public static List<Partition> getPartitions(Hive db, Table table, Map<String, St
return partitions;
}

public static List<Partition> getPartitionsWithSpecs(Hive db, Table table, GetPartitionsRequest request,
boolean throwException) throws SemanticException {
List<Partition> partitions = null;
try {
partitions = db.getPartitionsWithSpecs(table, request);
} catch (Exception e) {
throw new SemanticException(toMessage(ErrorMsg.INVALID_PARTITION, request.getFilterSpec())
+ " for the following partition keys: " + tablePartitionColNames(table), e);
}
if (partitions.isEmpty() && throwException) {
throw new SemanticException(toMessage(ErrorMsg.INVALID_PARTITION, request.getFilterSpec())
+ " for the following partition keys: " + tablePartitionColNames(table));
}
return partitions;
}

private static String tablePartitionColNames(Table table) {
List<FieldSchema> partCols = table.getPartCols();
return String.join("/", partCols.toString());
}

private static String toMessage(ErrorMsg message, Object detail) {
return detail == null ? message.getMsg() : message.getMsg(detail.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@

package org.apache.hadoop.hive.ql.ddl.table.partition.exchange;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
Expand Down Expand Up @@ -77,9 +84,20 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
if (AcidUtils.isTransactionalTable(sourceTable) || AcidUtils.isTransactionalTable(destTable)) {
throw new SemanticException(ErrorMsg.EXCHANGE_PARTITION_NOT_ALLOWED_WITH_TRANSACTIONAL_TABLES.getMsg());
}
List<String> sourceProjectFilters = MetaStoreUtils.getPvals(sourceTable.getPartCols(), partitionSpecs);

// check if source partition exists
PartitionUtils.getPartitions(db, sourceTable, partitionSpecs, true);
GetPartitionsFilterSpec sourcePartitionsFilterSpec = new GetPartitionsFilterSpec();
sourcePartitionsFilterSpec.setFilters(sourceProjectFilters);
sourcePartitionsFilterSpec.setFilterMode(PartitionFilterMode.BY_VALUES);

GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectFieldList(Arrays.asList("values")).build();

GetPartitionsRequest request = new GetPartitionsRequest(sourceTable.getDbName(), sourceTable.getTableName(),
getProjectionsSpec, sourcePartitionsFilterSpec);
request.setCatName(sourceTable.getCatName());
PartitionUtils.getPartitionsWithSpecs(db, sourceTable, request, true);

// Verify that the partitions specified are continuous
// If a subpartition value is specified without specifying a partition's value then we throw an exception
Expand All @@ -88,13 +106,23 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
throw new SemanticException(ErrorMsg.PARTITION_VALUE_NOT_CONTINUOUS.getMsg(partitionSpecs.toString()));
}

List<String> destProjectFilters = MetaStoreUtils.getPvals(destTable.getPartCols(), partitionSpecs);

// check if dest partition exists
GetPartitionsFilterSpec getDestPartitionsFilterSpec = new GetPartitionsFilterSpec();
getDestPartitionsFilterSpec.setFilters(destProjectFilters);
getDestPartitionsFilterSpec.setFilterMode(PartitionFilterMode.BY_VALUES);

List<Partition> destPartitions = null;
GetPartitionsRequest destRequest = new GetPartitionsRequest(destTable.getDbName(), destTable.getTableName(),
getProjectionsSpec, getDestPartitionsFilterSpec);
destRequest.setCatName(destTable.getCatName());
try {
destPartitions = PartitionUtils.getPartitions(db, destTable, partitionSpecs, true);
destPartitions = PartitionUtils.getPartitionsWithSpecs(db, destTable, destRequest, true);
} catch (SemanticException ex) {
// We should expect a semantic exception being throw as this partition should not be present.
}
if (destPartitions != null) {
if (CollectionUtils.isNotEmpty(destPartitions)) {
// If any destination partition is present then throw a Semantic Exception.
throw new SemanticException(ErrorMsg.PARTITION_EXISTS.getMsg(destPartitions.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

package org.apache.hadoop.hive.ql.ddl.view.create;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
Expand All @@ -37,6 +41,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.thrift.TException;

/**
* Abstract ancestor of analyzers that can create a view.
Expand Down Expand Up @@ -112,9 +117,14 @@ protected void validateReplaceWithPartitions(String viewName, Table oldView, Lis

String partitionViewErrorMsg = "The following view has partition, it could not be replaced: " + viewName;
List<Partition> partitions = null;
GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectFieldList(Arrays.asList("values")).build();

GetPartitionsRequest request = new GetPartitionsRequest(oldView.getDbName(), oldView.getTableName(), getProjectionsSpec, null);
request.setCatName(oldView.getCatName());
try {
partitions = db.getPartitions(oldView);
} catch (HiveException e) {
partitions = db.getPartitionsWithSpecs(oldView, request);
} catch (HiveException | TException e) {
throw new SemanticException(ErrorMsg.REPLACE_VIEW_WITH_PARTITION.getMsg(partitionViewErrorMsg));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
Expand All @@ -44,6 +47,7 @@
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -397,7 +401,17 @@ private static ArrayList<String> getListing(String dbName, String tableName, Hiv
// Check if the table is partitioned, in case the table is partitioned we need to check for the partitions
// listing as well.
if (table.isPartitioned()) {
List<Partition> partitions = hiveDb.getPartitions(table);
GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectFieldList(Arrays.asList("sd.location")).build();
GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(),
getProjectionsSpec, null);
request.setCatName(table.getCatName());
List<Partition> partitions;
try {
partitions = hiveDb.getPartitionsWithSpecs(table, request);
} catch (TException e) {
throw new HiveException(e);
}
for (Partition part : partitions) {
Path partPath = part.getDataLocation();
// Build listing for the partition only if it doesn't lies within the table location, else it would have been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
Expand All @@ -36,13 +39,15 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -97,15 +102,20 @@ void dataLocationDump(Table table, FileList fileList, HashMap<String, Boolean> s
}
if (table.isPartitioned()) {
List<Partition> partitions;
GetProjectionsSpec projectionSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectFieldList(Arrays.asList("sd.location")).build();
GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(),
projectionSpec, null);
request.setCatName(table.getCatName());
try {
partitions = Hive.get(hiveConf).getPartitions(table);
} catch (HiveException e) {
partitions = Hive.get(hiveConf).getPartitionsWithSpecs(table, request);
} catch (HiveException | TException e) {
if (e.getCause() instanceof NoSuchObjectException) {
// If table is dropped when dump in progress, just skip partitions data location dump
LOG.debug(e.getMessage());
return;
}
throw e;
throw new HiveException(e);
}

for (Partition partition : partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
import org.apache.hadoop.hive.ql.ddl.table.partition.drop.AlterTableDropPartitionDesc;
Expand Down Expand Up @@ -53,6 +56,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -137,7 +141,16 @@ public TaskTracker tasks() throws Exception {
if (tablesToBootstrap.stream().anyMatch(table.getTableName()::equalsIgnoreCase)) {
Hive hiveDb = Hive.get(context.hiveConf);
// Collect the non-existing partitions to drop.
List<Partition> partitions = hiveDb.getPartitions(table);
GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectFieldList(Arrays.asList("values")).build();
GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(),
getProjectionsSpec, null);
List<Partition> partitions;
try {
partitions = hiveDb.getPartitionsWithSpecs(table, request);
} catch (Exception e) {
throw new HiveException(e);
}
List<String> newParts = event.partitions(tableDesc);
for (Partition part : partitions) {
if (!newParts.contains(part.getName())) {
Expand Down
Loading

0 comments on commit c2b7308

Please sign in to comment.