Skip to content

Commit

Permalink
[Enhancement] add catalog properties to control get stats from metada…
Browse files Browse the repository at this point in the history
…ta (#51882)

Signed-off-by: Youngwb <[email protected]>
  • Loading branch information
Youngwb authored Oct 16, 2024
1 parent 47314a6 commit 24dfaaf
Show file tree
Hide file tree
Showing 27 changed files with 209 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector;

import org.apache.iceberg.util.PropertyUtil;

import java.util.Map;

public class ConnectorProperties {
public static final String ENABLE_GET_STATS_FROM_EXTERNAL_METADATA = "enable_get_stats_from_external_metadata";

private final ConnectorType connectorType;
private final Map<String, String> properties;

public ConnectorProperties(ConnectorType connectorType) {
this.connectorType = connectorType;
this.properties = Map.of();
}

public ConnectorProperties(ConnectorType connectorType, Map<String, String> properties) {
this.connectorType = connectorType;
this.properties = properties;
}

public boolean enableGetTableStatsFromExternalMetadata() {
// For Iceberg and DeltaLake, we don't get table stats from metadata by default.
boolean defaultValue = connectorType != ConnectorType.ICEBERG && connectorType != ConnectorType.DELTALAKE;
return PropertyUtil.propertyAsBoolean(properties, ConnectorProperties.ENABLE_GET_STATS_FROM_EXTERNAL_METADATA,
defaultValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.GetRemoteFilesParams;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.MetastoreType;
Expand All @@ -34,6 +35,7 @@
import com.starrocks.connector.RemoteFileInfoSource;
import com.starrocks.connector.TableVersionRange;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.statistics.StatisticsUtils;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.qe.ConnectContext;
import com.starrocks.sql.common.ErrorType;
Expand Down Expand Up @@ -77,13 +79,15 @@ public class DeltaLakeMetadata implements ConnectorMetadata {
private final Map<PredicateSearchKey, List<FileScanTask>> splitTasks = new ConcurrentHashMap<>();
private final Set<PredicateSearchKey> scannedTables = new HashSet<>();
private final DeltaStatisticProvider statisticProvider = new DeltaStatisticProvider();
private final ConnectorProperties properties;

public DeltaLakeMetadata(HdfsEnvironment hdfsEnvironment, String catalogName, DeltaMetastoreOperations deltaOps,
DeltaLakeCacheUpdateProcessor cacheUpdateProcessor) {
DeltaLakeCacheUpdateProcessor cacheUpdateProcessor, ConnectorProperties properties) {
this.hdfsEnvironment = hdfsEnvironment;
this.catalogName = catalogName;
this.deltaOps = deltaOps;
this.cacheUpdateProcessor = cacheUpdateProcessor;
this.properties = properties;
}

public String getCatalogName() {
Expand Down Expand Up @@ -143,6 +147,10 @@ public RemoteFileInfoSource getRemoteFilesAsync(Table table, GetRemoteFilesParam
public Statistics getTableStatistics(OptimizerContext session, Table table, Map<ColumnRefOperator, Column> columns,
List<PartitionKey> partitionKeys, ScalarOperator predicate, long limit,
TableVersionRange versionRange) {
if (!properties.enableGetTableStatsFromExternalMetadata()) {
return StatisticsUtils.buildDefaultStatistics(columns.keySet());
}

DeltaLakeTable deltaLakeTable = (DeltaLakeTable) table;
SnapshotImpl snapshot = (SnapshotImpl) deltaLakeTable.getDeltaSnapshot();
String dbName = deltaLakeTable.getDbName();
Expand All @@ -161,12 +169,12 @@ public Statistics getTableStatistics(OptimizerContext session, Table table, Map<
dbName, table, predicate);
}

if (session.getSessionVariable().enableDeltaLakeColumnStatistics()) {
try (Timer ignored = Tracers.watchScope(EXTERNAL, "DELTA_LAKE.getTableStatistics" + key)) {
String traceLabel = session.getSessionVariable().enableDeltaLakeColumnStatistics() ?
"DELTA_LAKE.getTableStatistics" + key : "DELTA_LAKE.getCardinalityStats" + key;
try (Timer ignored = Tracers.watchScope(EXTERNAL, traceLabel)) {
if (session.getSessionVariable().enableDeltaLakeColumnStatistics()) {
return statisticProvider.getTableStatistics(schema, key, columns);
}
} else {
try (Timer ignored = Tracers.watchScope(EXTERNAL, "DELTA_LAKE.getCardinalityStats" + key)) {
} else {
return statisticProvider.getCardinalityStats(schema, key, columns);
}
}
Expand Down Expand Up @@ -310,22 +318,17 @@ private void collectDeltaLakePlanFiles(PredicateSearchKey key, Table table, Scal

List<FileScanTask> files = Lists.newArrayList();
boolean enableCollectColumnStats = enableCollectColumnStatistics(connectContext);
String traceLabel = enableCollectColumnStats ? "DELTA_LAKE.updateDeltaLakeFileStats" :
"DELTA_LAKE.updateDeltaLakeCardinality";

Iterator<Pair<FileScanTask, DeltaLakeAddFileStatsSerDe>> iterator =
buildFileScanTaskIterator(table, operator, enableCollectColumnStats);
while (iterator.hasNext()) {
Pair<FileScanTask, DeltaLakeAddFileStatsSerDe> pair = iterator.next();
files.add(pair.first);

if (enableCollectColumnStats) {
try (Timer ignored = Tracers.watchScope(EXTERNAL, "DELTA_LAKE.updateDeltaLakeFileStats")) {
statisticProvider.updateFileStats(deltaLakeTable, key, pair.first, pair.second,
nonPartitionPrimitiveColumns, partitionPrimitiveColumns);
}
} else {
try (Timer ignored = Tracers.watchScope(EXTERNAL, "DELTA_LAKE.updateDeltaLakeCardinality")) {
statisticProvider.updateFileStats(deltaLakeTable, key, pair.first, pair.second,
nonPartitionPrimitiveColumns, partitionPrimitiveColumns);
}
try (Timer ignored = Tracers.watchScope(EXTERNAL, traceLabel)) {
statisticProvider.updateFileStats(deltaLakeTable, key, pair.first, pair.second,
nonPartitionPrimitiveColumns, partitionPrimitiveColumns);
}
}
splitTasks.put(key, files);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.starrocks.connector.delta;

import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.ConnectorType;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.MetastoreType;
import com.starrocks.connector.hive.CachingHiveMetastoreConf;
Expand All @@ -30,6 +32,7 @@ public class DeltaLakeMetadataFactory {
protected final IDeltaLakeMetastore metastore;
protected final long perQueryMetastoreMaxNum;
private final HdfsEnvironment hdfsEnvironment;
protected final ConnectorProperties connectorProperties;
protected final MetastoreType metastoreType;

public DeltaLakeMetadataFactory(String catalogName, IDeltaLakeMetastore metastore, CachingHiveMetastoreConf hmsConf,
Expand All @@ -39,6 +42,7 @@ public DeltaLakeMetadataFactory(String catalogName, IDeltaLakeMetastore metastor
this.metastore = metastore;
this.perQueryMetastoreMaxNum = hmsConf.getPerQueryCacheMaxNum();
this.hdfsEnvironment = hdfsEnvironment;
this.connectorProperties = new ConnectorProperties(ConnectorType.DELTALAKE, properties);
if (properties.containsKey(HIVE_METASTORE_URIS)) {
this.hdfsEnvironment.getConfiguration().set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(),
properties.get(HIVE_METASTORE_URIS));
Expand All @@ -57,7 +61,7 @@ public DeltaLakeMetadata create() {

Optional<DeltaLakeCacheUpdateProcessor> cacheUpdateProcessor = getCacheUpdateProcessor();
return new DeltaLakeMetadata(hdfsEnvironment, catalogName, metastoreOperations,
cacheUpdateProcessor.orElse(null));
cacheUpdateProcessor.orElse(null), connectorProperties);
}

public synchronized Optional<DeltaLakeCacheUpdateProcessor> getCacheUpdateProcessor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private HiveMetadataFactory createMetadataFactory(HdfsEnvironment hdfsEnvironmen
internalMgr.isSearchRecursive(),
internalMgr.enableHmsEventsIncrementalSync(),
hdfsEnvironment,
internalMgr.getMetastoreType()
internalMgr.getMetastoreType(),
properties
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.GetRemoteFilesParams;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.PartitionInfo;
Expand All @@ -41,6 +42,7 @@
import com.starrocks.connector.TableVersionRange;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.hive.PartitionUpdate.UpdateMode;
import com.starrocks.connector.statistics.StatisticsUtils;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
Expand Down Expand Up @@ -87,6 +89,7 @@ public class HiveMetadata implements ConnectorMetadata {
private final Optional<HiveCacheUpdateProcessor> cacheUpdateProcessor;
private Executor updateExecutor;
private Executor refreshOthersFeExecutor;
private final ConnectorProperties properties;

public HiveMetadata(String catalogName,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -95,7 +98,8 @@ public HiveMetadata(String catalogName,
HiveStatisticsProvider statisticsProvider,
Optional<HiveCacheUpdateProcessor> cacheUpdateProcessor,
Executor updateExecutor,
Executor refreshOthersFeExecutor) {
Executor refreshOthersFeExecutor,
ConnectorProperties properties) {
this.catalogName = catalogName;
this.hdfsEnvironment = hdfsEnvironment;
this.hmsOps = hmsOps;
Expand All @@ -104,6 +108,7 @@ public HiveMetadata(String catalogName,
this.cacheUpdateProcessor = cacheUpdateProcessor;
this.updateExecutor = updateExecutor;
this.refreshOthersFeExecutor = refreshOthersFeExecutor;
this.properties = properties;
}

@Override
Expand Down Expand Up @@ -314,6 +319,10 @@ public Statistics getTableStatistics(OptimizerContext session,
ScalarOperator predicate,
long limit,
TableVersionRange version) {
if (!properties.enableGetTableStatsFromExternalMetadata()) {
return StatisticsUtils.buildDefaultStatistics(columns.keySet());
}

Statistics statistics = null;
List<ColumnRefOperator> columnRefOperators = Lists.newArrayList(columns.keySet());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

import com.starrocks.connector.CachingRemoteFileConf;
import com.starrocks.connector.CachingRemoteFileIO;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.ConnectorType;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.MetastoreType;
import com.starrocks.connector.RemoteFileIO;
import com.starrocks.connector.RemoteFileOperations;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand All @@ -41,6 +44,7 @@ public class HiveMetadataFactory {
private final boolean enableHmsEventsIncrementalSync;
private final HdfsEnvironment hdfsEnvironment;
private final MetastoreType metastoreType;
private final ConnectorProperties connectorProperties;

public HiveMetadataFactory(String catalogName,
IHiveMetastore metastore,
Expand All @@ -54,7 +58,8 @@ public HiveMetadataFactory(String catalogName,
boolean isRecursive,
boolean enableHmsEventsIncrementalSync,
HdfsEnvironment hdfsEnvironment,
MetastoreType metastoreType) {
MetastoreType metastoreType,
Map<String, String> properties) {
this.catalogName = catalogName;
this.metastore = metastore;
this.remoteFileIO = remoteFileIO;
Expand All @@ -68,6 +73,7 @@ public HiveMetadataFactory(String catalogName,
this.enableHmsEventsIncrementalSync = enableHmsEventsIncrementalSync;
this.hdfsEnvironment = hdfsEnvironment;
this.metastoreType = metastoreType;
this.connectorProperties = new ConnectorProperties(ConnectorType.HIVE, properties);
}

public HiveMetadata create() {
Expand All @@ -86,7 +92,8 @@ public HiveMetadata create() {

Optional<HiveCacheUpdateProcessor> cacheUpdateProcessor = getCacheUpdateProcessor();
return new HiveMetadata(catalogName, hdfsEnvironment, hiveMetastoreOperations, remoteFileOperations,
statisticsProvider, cacheUpdateProcessor, updateStatisticsExecutor, refreshOthersFeExecutor);
statisticsProvider, cacheUpdateProcessor, updateStatisticsExecutor, refreshOthersFeExecutor,
connectorProperties);
}

public synchronized Optional<HiveCacheUpdateProcessor> getCacheUpdateProcessor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@
public class HudiConnector implements Connector {
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final List<String> SUPPORTED_METASTORE_TYPE = Lists.newArrayList("hive", "glue", "dlf");
private final Map<String, String> properties;
private final String catalogName;
private final HudiConnectorInternalMgr internalMgr;
private final HudiMetadataFactory metadataFactory;

public HudiConnector(ConnectorContext context) {
Map<String, String> properties = context.getProperties();
this.properties = context.getProperties();
CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
this.catalogName = context.getCatalogName();
Expand All @@ -62,7 +63,8 @@ private HudiMetadataFactory createMetadataFactory(HdfsEnvironment hdfsEnvironmen
internalMgr.getPullRemoteFileExecutor(),
internalMgr.isSearchRecursive(),
hdfsEnvironment,
internalMgr.getMetastoreType()
internalMgr.getMetastoreType(),
properties
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.catalog.Table;
import com.starrocks.common.DdlException;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.GetRemoteFilesParams;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.RemoteFileInfo;
Expand All @@ -35,6 +36,7 @@
import com.starrocks.connector.hive.HiveMetastoreOperations;
import com.starrocks.connector.hive.HiveStatisticsProvider;
import com.starrocks.connector.hive.Partition;
import com.starrocks.connector.statistics.StatisticsUtils;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.qe.SessionVariable;
import com.starrocks.server.GlobalStateMgr;
Expand Down Expand Up @@ -64,19 +66,22 @@ public class HudiMetadata implements ConnectorMetadata {
private final RemoteFileOperations fileOps;
private final HiveStatisticsProvider statisticsProvider;
private final Optional<HiveCacheUpdateProcessor> cacheUpdateProcessor;
private final ConnectorProperties properties;

public HudiMetadata(String catalogName,
HdfsEnvironment hdfsEnvironment,
HiveMetastoreOperations hmsOps,
RemoteFileOperations fileOperations,
HiveStatisticsProvider statisticsProvider,
Optional<HiveCacheUpdateProcessor> cacheUpdateProcessor) {
Optional<HiveCacheUpdateProcessor> cacheUpdateProcessor,
ConnectorProperties properties) {
this.catalogName = catalogName;
this.hdfsEnvironment = hdfsEnvironment;
this.hmsOps = hmsOps;
this.fileOps = fileOperations;
this.statisticsProvider = statisticsProvider;
this.cacheUpdateProcessor = cacheUpdateProcessor;
this.properties = properties;
}

@Override
Expand Down Expand Up @@ -187,6 +192,10 @@ public Statistics getTableStatistics(OptimizerContext session,
Map<ColumnRefOperator, Column> columns,
List<PartitionKey> partitionKeys,
ScalarOperator predicate, long limit, TableVersionRange version) {
if (!properties.enableGetTableStatsFromExternalMetadata()) {
return StatisticsUtils.buildDefaultStatistics(columns.keySet());
}

Statistics statistics = null;
List<ColumnRefOperator> columnRefOperators = Lists.newArrayList(columns.keySet());
try {
Expand Down
Loading

0 comments on commit 24dfaaf

Please sign in to comment.