Skip to content

Commit

Permalink
[Enhancement] avoid sync listPartitionNames when query iceberg & mv (
Browse files Browse the repository at this point in the history
…#53168)

Signed-off-by: yanz <[email protected]>
(cherry picked from commit ca1c066)
  • Loading branch information
dirtysalt authored and mergify[bot] committed Nov 28, 2024
1 parent 8ca0663 commit 20a46a8
Show file tree
Hide file tree
Showing 32 changed files with 561 additions and 117 deletions.
42 changes: 21 additions & 21 deletions fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public class IcebergTable extends Table {

private String catalogName;
@SerializedName(value = "dn")
protected String remoteDbName;
protected String catalogDBName;
@SerializedName(value = "tn")
protected String remoteTableName;
protected String catalogTableName;
@SerializedName(value = "rn")
private String resourceName;
@SerializedName(value = "prop")
Expand All @@ -88,14 +88,14 @@ public IcebergTable() {
super(TableType.ICEBERG);
}

public IcebergTable(long id, String srTableName, String catalogName, String resourceName, String remoteDbName,
String remoteTableName, String comment, List<Column> schema,
public IcebergTable(long id, String srTableName, String catalogName, String resourceName, String catalogDBName,
String catalogTableName, String comment, List<Column> schema,
org.apache.iceberg.Table nativeTable, Map<String, String> icebergProperties) {
super(id, srTableName, TableType.ICEBERG, schema);
this.catalogName = catalogName;
this.resourceName = resourceName;
this.remoteDbName = remoteDbName;
this.remoteTableName = remoteTableName;
this.catalogDBName = catalogDBName;
this.catalogTableName = catalogTableName;
this.comment = comment;
this.nativeTable = nativeTable;
this.icebergProperties = icebergProperties;
Expand All @@ -113,19 +113,19 @@ public String getResourceName() {

@Override
public String getCatalogDBName() {
return remoteDbName;
return catalogDBName;
}

@Override
public String getCatalogTableName() {
return remoteTableName;
return catalogTableName;
}

@Override
public String getUUID() {
if (CatalogMgr.isExternalCatalog(catalogName)) {
String uuid = ((BaseTable) getNativeTable()).operations().current().uuid();
return String.join(".", catalogName, remoteDbName, remoteTableName,
return String.join(".", catalogName, catalogDBName, catalogTableName,
uuid == null ? "" : uuid);
} else {
return Long.toString(id);
Expand Down Expand Up @@ -295,10 +295,10 @@ public org.apache.iceberg.Table getNativeTable() {
// For compatibility with the resource iceberg table. native table is lazy. Prevent failure during fe restarting.
if (nativeTable == null) {
IcebergTable resourceMappingTable = (IcebergTable) GlobalStateMgr.getCurrentState().getMetadataMgr()
.getTable(getCatalogName(), remoteDbName, remoteTableName);
.getTable(getCatalogName(), catalogDBName, catalogTableName);
if (resourceMappingTable == null) {
throw new StarRocksConnectorException("Can't find table %s.%s.%s",
getCatalogName(), remoteDbName, remoteTableName);
getCatalogName(), catalogDBName, catalogTableName);
}
nativeTable = resourceMappingTable.getNativeTable();
}
Expand Down Expand Up @@ -350,7 +350,7 @@ public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> p
}

TTableDescriptor tTableDescriptor = new TTableDescriptor(id, TTableType.ICEBERG_TABLE,
fullSchema.size(), 0, remoteTableName, remoteDbName);
fullSchema.size(), 0, catalogTableName, catalogDBName);
tTableDescriptor.setIcebergTable(tIcebergTable);
return tTableDescriptor;
}
Expand Down Expand Up @@ -379,7 +379,7 @@ public boolean isTemporal() {

@Override
public int hashCode() {
return com.google.common.base.Objects.hashCode(getCatalogName(), remoteDbName, getTableIdentifier());
return com.google.common.base.Objects.hashCode(getCatalogName(), catalogDBName, getTableIdentifier());
}

@Override
Expand All @@ -392,7 +392,7 @@ public boolean equals(Object other) {
String catalogName = getCatalogName();
String tableIdentifier = getTableIdentifier();
return Objects.equal(catalogName, otherTable.getCatalogName()) &&
Objects.equal(remoteDbName, otherTable.remoteDbName) &&
Objects.equal(catalogDBName, otherTable.catalogDBName) &&
Objects.equal(tableIdentifier, otherTable.getTableIdentifier());
}

Expand All @@ -405,8 +405,8 @@ public static class Builder {
private String srTableName;
private String catalogName;
private String resourceName;
private String remoteDbName;
private String remoteTableName;
private String catalogDBName;
private String catalogTableName;

private String comment;
private List<Column> fullSchema;
Expand Down Expand Up @@ -441,13 +441,13 @@ public Builder setResourceName(String resourceName) {
return this;
}

public Builder setRemoteDbName(String remoteDbName) {
this.remoteDbName = remoteDbName;
public Builder setCatalogDBName(String catalogDbName) {
this.catalogDBName = catalogDbName;
return this;
}

public Builder setRemoteTableName(String remoteTableName) {
this.remoteTableName = remoteTableName;
public Builder setCatalogTableName(String catalogTableName) {
this.catalogTableName = catalogTableName;
return this;
}

Expand All @@ -467,7 +467,7 @@ public Builder setNativeTable(org.apache.iceberg.Table nativeTable) {
}

public IcebergTable build() {
return new IcebergTable(id, srTableName, catalogName, resourceName, remoteDbName, remoteTableName,
return new IcebergTable(id, srTableName, catalogName, resourceName, catalogDBName, catalogTableName,
comment, fullSchema, nativeTable, icebergProperties);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,9 @@ public Set<String> getUpdatedPartitionNamesOfExternalTable(Table baseTable, bool
return result;
}

return ConnectorPartitionTraits.build(this, baseTable).getUpdatedPartitionNames(
ConnectorPartitionTraits traits = ConnectorPartitionTraits.build(this, baseTable);
traits.setQueryMVRewrite(isQueryRewrite);
return traits.getUpdatedPartitionNames(
this.getBaseTableInfos(),
this.refreshScheme.getAsyncRefreshContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.Lists;
import com.starrocks.catalog.Table;
import com.starrocks.common.AnalysisException;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.server.GlobalStateMgr;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -55,7 +56,7 @@ public ProcResult fetchResult() throws AnalysisException {
try {
List<String> partitionNames = GlobalStateMgr.getCurrentState().getMetadataMgr()
.listPartitionNames(hmsTable.getCatalogName(), hmsTable.getCatalogDBName(),
hmsTable.getCatalogTableName());
hmsTable.getCatalogTableName(), ConnectorMetadatRequestContext.DEFAULT);
for (String partitionName : partitionNames) {
result.addRow(Lists.newArrayList(partitionName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public class ConnectorMetadatRequestContext {
public static ConnectorMetadatRequestContext DEFAULT = new ConnectorMetadatRequestContext();
TableVersionRange tableVersionRange = TableVersionRange.empty();
boolean queryMVRewrite = false;

public void setTableVersionRange(TableVersionRange value) {
tableVersionRange = value;
Expand All @@ -25,5 +26,17 @@ public void setTableVersionRange(TableVersionRange value) {
public TableVersionRange getTableVersionRange() {
return tableVersionRange;
}

public void setQueryMVRewrite(boolean v) {
queryMVRewrite = v;
}

public boolean isQueryMVRewrite() {
return queryMVRewrite;
}

public long getSnapshotId() {
return tableVersionRange.end().isPresent() ? tableVersionRange.end().get() : -1;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public abstract class ConnectorPartitionTraits {

protected Table table;

protected boolean queryMVRewrite = false;

public static boolean isSupported(Table.TableType tableType) {
return TRAITS_TABLE.containsKey(tableType);
}
Expand Down Expand Up @@ -222,4 +224,12 @@ public abstract Set<String> getUpdatedPartitionNames(List<BaseTableInfo> baseTab
* inconsistency between the two systems, so we add extraSeconds
*/
public abstract LocalDateTime getTableLastUpdateTime(int extraSeconds);

public void setQueryMVRewrite(boolean value) {
queryMVRewrite = value;
}

public boolean isQueryMVRewrite() {
return queryMVRewrite;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.common.Config;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.Pair;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorViewDefinition;
import com.starrocks.connector.PlanMode;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.mysql.MysqlCommand;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
Expand Down Expand Up @@ -87,13 +90,13 @@ public CachingIcebergCatalog(String catalogName, IcebergCatalog delegate, Iceber
enableCache ? DEFAULT_CACHE_NUM : NEVER_CACHE).build();
this.partitionCache = newCacheBuilder(icebergProperties.getIcebergMetaCacheTtlSec(),
enableCache ? DEFAULT_CACHE_NUM : NEVER_CACHE).build(
CacheLoader.asyncReloading(new CacheLoader<>() {
@Override
public Map<String, Partition> load(IcebergTableName key) throws Exception {
// use default executor service.
return delegate.getPartitions(key.dbName, key.tableName, key.snapshotId, null);
}
}, executorService));
CacheLoader.from(key -> {
Table nativeTable = getTable(key.dbName, key.tableName);
IcebergTable icebergTable =
IcebergTable.builder().setCatalogDBName(key.dbName).setCatalogTableName(key.tableName)
.setNativeTable(nativeTable).build();
return delegate.getPartitions(icebergTable, key.snapshotId, null);
}));
this.dataFileCache = enableCache ?
newCacheBuilder(
icebergProperties.getIcebergMetaCacheTtlSec(), icebergProperties.getIcebergManifestCacheMaxNum()).build()
Expand All @@ -115,12 +118,12 @@ public List<String> listAllDatabases() {
return delegate.listAllDatabases();
}

public void createDb(String dbName, Map<String, String> properties) {
delegate.createDb(dbName, properties);
public void createDB(String dbName, Map<String, String> properties) {
delegate.createDB(dbName, properties);
}

public void dropDb(String dbName) throws MetaNotFoundException {
delegate.dropDb(dbName);
public void dropDB(String dbName) throws MetaNotFoundException {
delegate.dropDB(dbName);
databases.invalidate(dbName);
}

Expand Down Expand Up @@ -174,7 +177,7 @@ public boolean createTable(String dbName,
@Override
public boolean dropTable(String dbName, String tableName, boolean purge) {
boolean dropped = delegate.dropTable(dbName, tableName, purge);
tables.invalidate(new IcebergTableName(dbName, tableName));
invalidateCache(new IcebergTableName(dbName, tableName));
return dropped;
}

Expand All @@ -198,12 +201,33 @@ public View getView(String dbName, String viewName) {
}

@Override
public Map<String, Partition> getPartitions(String dbName, String tableName, long snapshotId,
public Map<String, Partition> getPartitions(IcebergTable icebergTable, long snapshotId,
ExecutorService executorService) {
IcebergTableName key = new IcebergTableName(dbName, tableName, snapshotId);
IcebergTableName key =
new IcebergTableName(icebergTable.getCatalogDBName(), icebergTable.getCatalogTableName(), snapshotId);
return partitionCache.getUnchecked(key);
}

@Override
public List<String> listPartitionNames(IcebergTable icebergTable, ConnectorMetadatRequestContext requestContext,
ExecutorService executorService) {
SessionVariable sv = ConnectContext.getSessionVariableOrDefault();
// optimization for query mv rewrite, we can optionally return null to bypass it.
// if we don't have cache right now, which means it probably takes time to load it during query,
// so we can do load in background while return null to bypass this synchronous process.
if (requestContext.isQueryMVRewrite() && sv.isEnableConnectorAsyncListPartitions()) {
long snapshotId = requestContext.getSnapshotId();
IcebergTableName key =
new IcebergTableName(icebergTable.getCatalogDBName(), icebergTable.getCatalogTableName(), snapshotId);
Map<String, Partition> cacheValue = partitionCache.getIfPresent(key);
if (cacheValue == null) {
backgroundExecutor.submit(() -> partitionCache.refresh(key));
return null;
}
}
return IcebergCatalog.super.listPartitionNames(icebergTable, requestContext, executorService);
}

@Override
public void deleteUncommittedDataFiles(List<String> fileLocations) {
delegate.deleteUncommittedDataFiles(fileLocations);
Expand Down Expand Up @@ -307,13 +331,24 @@ public void refreshCatalog() {
}
}

public void invalidateCacheWithoutTable(IcebergTableName icebergTableName) {
partitionCache.invalidate(icebergTableName);
@Override
public void invalidatePartitionCache(String dbName, String tableName) {
// will invalidate all snapshots of this table
IcebergTableName key = new IcebergTableName(dbName, tableName);
partitionCache.invalidate(key);
}

@Override
public void invalidateCache(String dbName, String tableName) {
IcebergTableName key = new IcebergTableName(dbName, tableName);
invalidateCache(key);

}

public void invalidateCache(IcebergTableName icebergTableName) {
tables.invalidate(icebergTableName);
partitionCache.invalidate(icebergTableName);
private void invalidateCache(IcebergTableName key) {
tables.invalidate(key);
// will invalidate all snapshots of this table
partitionCache.invalidate(key);
}

@Override
Expand All @@ -340,10 +375,15 @@ private CacheBuilder<Object, Object> newCacheBuilder(long expiresAfterWriteSec,
public static class IcebergTableName {
private final String dbName;
private final String tableName;
// if as cache key for `getTable`, ignoreSnapshotId = true
// otherwise it's false
private boolean ignoreSnapshotId = false;
// -1 mean it's an empty table without any snapshot.
private long snapshotId = -1;

public IcebergTableName(String dbName, String tableName) {
this(dbName, tableName, -1);
this.ignoreSnapshotId = true;
}

public IcebergTableName(String dbName, String tableName, long snapshotId) {
Expand All @@ -362,7 +402,7 @@ public boolean equals(Object o) {
}
IcebergTableName that = (IcebergTableName) o;
return dbName.equalsIgnoreCase(that.dbName) && tableName.equalsIgnoreCase(that.tableName) &&
(snapshotId == -1 || snapshotId == that.snapshotId);
(ignoreSnapshotId || snapshotId == that.snapshotId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public static IcebergTable toIcebergTable(Table nativeTbl, String catalogName, S
.setSrTableName(remoteTableName)
.setCatalogName(catalogName)
.setResourceName(toResourceName(catalogName, "iceberg"))
.setRemoteDbName(remoteDbName)
.setRemoteTableName(remoteTableName)
.setCatalogDBName(remoteDbName)
.setCatalogTableName(remoteTableName)
.setComment(nativeTbl.properties().getOrDefault("common", ""))
.setNativeTable(nativeTbl)
.setFullSchema(toFullSchemas(nativeTbl.schema()))
Expand Down
Loading

0 comments on commit 20a46a8

Please sign in to comment.