diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java new file mode 100644 index 000000000..4d7d656ad --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityCore; +import org.apache.polaris.core.entity.PolarisEntityId; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisPrivilege; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Wraps an existing impl of PolarisMetaStoreManager and delegates expected "read" operations + * through to the wrapped instance while throwing errors on unexpected operations or enqueuing + * expected write operations into a collection to be committed as a single atomic unit. + * + *

Note that as long as the server-side multi-commit transaction semantics are effectively only + * SERIALIZABLE isolation (i.e. if we can resolve all UpdateRequirements "statically" before the set + * of commits and translate these into an atomic collection of compare-and-swap operations to apply + * the transaction), this workspace should also reject readEntity/loadEntity operations to avoid + * implying that any reads from this transaction workspace include writes performed into this + * transaction workspace that haven't yet been committed. + * + *

Not thread-safe; instances should only be used within a single request context and should not + * be reused between requests. + */ +public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreManager { + private final PolarisMetaStoreManager delegate; + + // TODO: If we want to support the semantic of opening a transaction in which multiple + // reads and writes occur on the same entities, where the reads are expected to see the writes + // within the transaction workspace that haven't actually been committed, we can augment this + // class by allowing these pendingUpdates to represent the latest state of the entity if we + // also increment entityVersion. We'd need to store both a "latest view" of all updated entities + // to serve reads within the same transaction while also storing the ordered list of + // pendingUpdates that ultimately need to be applied in order within the real MetaStoreManager. + private final List pendingUpdates = new ArrayList<>(); + + public TransactionWorkspaceMetaStoreManager(PolarisMetaStoreManager delegate) { + this.delegate = delegate; + } + + public List getPendingUpdates() { + return ImmutableList.copyOf(pendingUpdates); + } + + @Override + public BaseResult bootstrapPolarisService(@NotNull PolarisCallContext callCtx) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "bootstrapPolarisService"); + return null; + } + + @Override + public BaseResult purge(@NotNull PolarisCallContext callCtx) { + callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "purge"); + return null; + } + + @Override + public PolarisMetaStoreManager.EntityResult readEntityByName( + @NotNull PolarisCallContext callCtx, + @Nullable List catalogPath, + @NotNull PolarisEntityType entityType, + @NotNull PolarisEntitySubType entitySubType, + @NotNull String name) { + callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "readEntityByName"); + return null; + } + + @Override + public ListEntitiesResult listEntities( + @NotNull PolarisCallContext callCtx, + @Nullable List catalogPath, + @NotNull PolarisEntityType entityType, + @NotNull PolarisEntitySubType entitySubType) { + callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "listEntities"); + return null; + } + + @Override + public GenerateEntityIdResult generateNewEntityId(@NotNull PolarisCallContext callCtx) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "generateNewEntityId"); + return null; + } + + @Override + public CreatePrincipalResult createPrincipal( + @NotNull PolarisCallContext callCtx, @NotNull PolarisBaseEntity principal) { + callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "createPrincipal"); + return null; + } + + @Override + public PrincipalSecretsResult loadPrincipalSecrets( + @NotNull PolarisCallContext callCtx, @NotNull String clientId) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "loadPrincipalSecrets"); + return null; + } + + @Override + public PrincipalSecretsResult rotatePrincipalSecrets( + @NotNull PolarisCallContext callCtx, + @NotNull String clientId, + long principalId, + @NotNull String mainSecret, + boolean reset) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "rotatePrincipalSecrets"); + return null; + } + + @Override + public CreateCatalogResult createCatalog( + @NotNull PolarisCallContext callCtx, + @NotNull PolarisBaseEntity catalog, + @NotNull List principalRoles) { + callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "createCatalog"); + return null; + } + + @Override + public EntityResult createEntityIfNotExists( + @NotNull PolarisCallContext callCtx, + @Nullable List catalogPath, + @NotNull PolarisBaseEntity entity) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "createEntityIfNotExists"); + return null; + } + + @Override + public EntitiesResult createEntitiesIfNotExist( + @NotNull PolarisCallContext callCtx, + @Nullable List catalogPath, + @NotNull List entities) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "createEntitiesIfNotExist"); + return null; + } + + @Override + public EntityResult updateEntityPropertiesIfNotChanged( + @NotNull PolarisCallContext callCtx, + @Nullable List catalogPath, + @NotNull PolarisBaseEntity entity) { + pendingUpdates.add(new EntityWithPath(catalogPath, entity)); + return new EntityResult(entity); + } + + @Override + public EntitiesResult updateEntitiesPropertiesIfNotChanged( + @NotNull PolarisCallContext callCtx, @NotNull List entities) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "updateEntitiesPropertiesIfNotChanged"); + return null; + } + + @Override + public EntityResult renameEntity( + @NotNull PolarisCallContext callCtx, + @Nullable List catalogPath, + @NotNull PolarisEntityCore entityToRename, + @Nullable List newCatalogPath, + @NotNull PolarisEntity renamedEntity) { + callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "renameEntity"); + return null; + } + + @Override + public DropEntityResult dropEntityIfExists( + @NotNull PolarisCallContext callCtx, + @Nullable List catalogPath, + @NotNull PolarisEntityCore entityToDrop, + @Nullable Map cleanupProperties, + boolean cleanup) { + callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "dropEntityIfExists"); + return null; + } + + @Override + public PrivilegeResult grantUsageOnRoleToGrantee( + @NotNull PolarisCallContext callCtx, + @Nullable PolarisEntityCore catalog, + @NotNull PolarisEntityCore role, + @NotNull PolarisEntityCore grantee) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "grantUsageOnRoleToGrantee"); + return null; + } + + @Override + public PrivilegeResult revokeUsageOnRoleFromGrantee( + @NotNull PolarisCallContext callCtx, + @Nullable PolarisEntityCore catalog, + @NotNull PolarisEntityCore role, + @NotNull PolarisEntityCore grantee) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "revokeUsageOnRoleFromGrantee"); + return null; + } + + @Override + public PrivilegeResult grantPrivilegeOnSecurableToRole( + @NotNull PolarisCallContext callCtx, + @NotNull PolarisEntityCore grantee, + @Nullable List catalogPath, + @NotNull PolarisEntityCore securable, + @NotNull PolarisPrivilege privilege) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "grantPrivilegeOnSecurableToRole"); + return null; + } + + @Override + public PrivilegeResult revokePrivilegeOnSecurableFromRole( + @NotNull PolarisCallContext callCtx, + @NotNull PolarisEntityCore grantee, + @Nullable List catalogPath, + @NotNull PolarisEntityCore securable, + @NotNull PolarisPrivilege privilege) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "revokePrivilegeOnSecurableFromRole"); + return null; + } + + @Override + public LoadGrantsResult loadGrantsOnSecurable( + @NotNull PolarisCallContext callCtx, long securableCatalogId, long securableId) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "loadGrantsOnSecurable"); + return null; + } + + @Override + public LoadGrantsResult loadGrantsToGrantee( + PolarisCallContext callCtx, long granteeCatalogId, long granteeId) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "loadGrantsToGrantee"); + return null; + } + + @Override + public ChangeTrackingResult loadEntitiesChangeTracking( + @NotNull PolarisCallContext callCtx, @NotNull List entityIds) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "loadEntitiesChangeTracking"); + return null; + } + + @Override + public EntityResult loadEntity( + @NotNull PolarisCallContext callCtx, long entityCatalogId, long entityId) { + callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadEntity"); + return null; + } + + @Override + public EntitiesResult loadTasks( + @NotNull PolarisCallContext callCtx, String executorId, int limit) { + callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadTasks"); + return null; + } + + @Override + public ScopedCredentialsResult getSubscopedCredsForEntity( + @NotNull PolarisCallContext callCtx, + long catalogId, + long entityId, + boolean allowListOperation, + @NotNull Set allowedReadLocations, + @NotNull Set allowedWriteLocations) { + return delegate.getSubscopedCredsForEntity( + callCtx, + catalogId, + entityId, + allowListOperation, + allowedReadLocations, + allowedWriteLocations); + } + + @Override + public ValidateAccessResult validateAccessToLocations( + @NotNull PolarisCallContext callCtx, + long catalogId, + long entityId, + @NotNull Set actions, + @NotNull Set locations) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "validateAccessToLocations"); + return null; + } + + @Override + public PolarisMetaStoreManager.CachedEntryResult loadCachedEntryById( + @NotNull PolarisCallContext callCtx, long entityCatalogId, long entityId) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "loadCachedEntryById"); + return null; + } + + @Override + public PolarisMetaStoreManager.CachedEntryResult loadCachedEntryByName( + @NotNull PolarisCallContext callCtx, + long entityCatalogId, + long parentId, + @NotNull PolarisEntityType entityType, + @NotNull String entityName) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "loadCachedEntryByName"); + return null; + } + + @Override + public PolarisMetaStoreManager.CachedEntryResult refreshCachedEntity( + @NotNull PolarisCallContext callCtx, + int entityVersion, + int entityGrantRecordsVersion, + @NotNull PolarisEntityType entityType, + long entityCatalogId, + long entityId) { + callCtx + .getDiagServices() + .fail("illegal_method_in_transaction_workspace", "refreshCachedEntity"); + return null; + } +} diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java index 5bd51be36..0df8717ff 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java @@ -171,6 +171,7 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog private Map catalogProperties; private Map tableDefaultProperties; private final FileIOFactory fileIOFactory; + private PolarisMetaStoreManager metaStoreManager; /** * @param entityManager provides handle to underlying PolarisMetaStoreManager with which to @@ -197,6 +198,7 @@ public BasePolarisCatalog( this.catalogId = catalogEntity.getId(); this.catalogName = catalogEntity.getName(); this.fileIOFactory = fileIOFactory; + this.metaStoreManager = entityManager.getMetaStoreManager(); } @Override @@ -275,6 +277,10 @@ public void initialize(String name, Map properties) { } } + public void setMetaStoreManager(PolarisMetaStoreManager newMetaStoreManager) { + this.metaStoreManager = newMetaStoreManager; + } + @Override protected Map properties() { return catalogProperties == null ? ImmutableMap.of() : catalogProperties; @@ -489,11 +495,7 @@ private void createNamespaceInternal( NamespaceEntity entity = new NamespaceEntity.Builder(namespace) .setCatalogId(getCatalogId()) - .setId( - entityManager - .getMetaStoreManager() - .generateNewEntityId(getCurrentPolarisContext()) - .getId()) + .setId(getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId()) .setParentId(resolvedParent.getRawLeafEntity().getId()) .setProperties(metadata) .setCreateTimestamp(System.currentTimeMillis()) @@ -513,8 +515,7 @@ private void createNamespaceInternal( } PolarisEntity returnedEntity = PolarisEntity.of( - entityManager - .getMetaStoreManager() + getMetaStoreManager() .createEntityIfNotExists( getCurrentPolarisContext(), PolarisEntity.toCoreList(resolvedParent.getRawFullPath()), @@ -610,8 +611,7 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept // drop if exists and is empty PolarisCallContext polarisCallContext = callContext.getPolarisCallContext(); PolarisMetaStoreManager.DropEntityResult dropEntityResult = - entityManager - .getMetaStoreManager() + getMetaStoreManager() .dropEntityIfExists( getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), @@ -663,8 +663,7 @@ public boolean setProperties(Namespace namespace, Map properties List parentPath = resolvedEntities.getRawFullPath(); PolarisEntity returnedEntity = Optional.ofNullable( - entityManager - .getMetaStoreManager() + getMetaStoreManager() .updateEntityPropertiesIfNotChanged( getCurrentPolarisContext(), PolarisEntity.toCoreList(parentPath), @@ -696,8 +695,7 @@ public boolean removeProperties(Namespace namespace, Set properties) List parentPath = resolvedEntities.getRawFullPath(); PolarisEntity returnedEntity = Optional.ofNullable( - entityManager - .getMetaStoreManager() + getMetaStoreManager() .updateEntityPropertiesIfNotChanged( getCurrentPolarisContext(), PolarisEntity.toCoreList(parentPath), @@ -743,8 +741,7 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac List catalogPath = resolvedEntities.getRawFullPath(); List entities = PolarisEntity.toNameAndIdList( - entityManager - .getMetaStoreManager() + getMetaStoreManager() .listEntities( getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), @@ -885,7 +882,7 @@ private Map refreshCredentials( entityManager .getCredentialCache() .getOrGenerateSubScopeCreds( - entityManager.getMetaStoreManager(), + getMetaStoreManager(), callContext.getPolarisCallContext(), entity, allowList, @@ -977,7 +974,7 @@ private void validateLocationsForTableLike( // implementation, then the validation should go through that API instead as follows: // // PolarisMetaStoreManager.ValidateAccessResult validateResult = - // entityManager.getMetaStoreManager().validateAccessToLocations( + // getMetaStoreManager().validateAccessToLocations( // getCurrentPolarisContext(), // storageInfoHolderEntity.getCatalogId(), // storageInfoHolderEntity.getId(), @@ -1036,8 +1033,7 @@ private void validateNoLocationOverlap( private void validateNoLocationOverlap( String location, List parentPath, String name) { PolarisMetaStoreManager.ListEntitiesResult siblingNamespacesResult = - entityManager - .getMetaStoreManager() + getMetaStoreManager() .listEntities( callContext.getPolarisCallContext(), parentPath.stream().map(PolarisEntity::toCore).collect(Collectors.toList()), @@ -1060,8 +1056,7 @@ private void validateNoLocationOverlap( .map( ns -> { PolarisMetaStoreManager.ListEntitiesResult siblingTablesResult = - entityManager - .getMetaStoreManager() + getMetaStoreManager() .listEntities( callContext.getPolarisCallContext(), parentPath.stream() @@ -1335,10 +1330,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { .setSubType(PolarisEntitySubType.TABLE) .setBaseLocation(metadata.location()) .setId( - entityManager - .getMetaStoreManager() - .generateNewEntityId(getCurrentPolarisContext()) - .getId()) + getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId()) .build(); } else { existingLocation = entity.getMetadataLocation(); @@ -1539,10 +1531,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { .setCatalogId(getCatalogId()) .setSubType(PolarisEntitySubType.VIEW) .setId( - entityManager - .getMetaStoreManager() - .generateNewEntityId(getCurrentPolarisContext()) - .getId()) + getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId()) .build(); } else { existingLocation = entity.getMetadataLocation(); @@ -1610,6 +1599,10 @@ private PolarisCallContext getCurrentPolarisContext() { return callContext.getPolarisCallContext(); } + private PolarisMetaStoreManager getMetaStoreManager() { + return metaStoreManager; + } + @VisibleForTesting long getCatalogId() { // TODO: Properly handle initialization @@ -1660,8 +1653,7 @@ private void renameTableLike( // rename the entity now PolarisMetaStoreManager.EntityResult returnedEntityResult = - entityManager - .getMetaStoreManager() + getMetaStoreManager() .renameEntity( getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), @@ -1724,8 +1716,7 @@ private void renameTableLike( .addKeyValue("toEntity.getTableIdentifier()", toEntity.getTableIdentifier()) .addKeyValue("returnedEntity.getTableIdentifier()", returnedEntity.getTableIdentifier()) .log("Returned entity identifier doesn't match toEntity identifier"); - entityManager - .getMetaStoreManager() + getMetaStoreManager() .updateEntityPropertiesIfNotChanged( getCurrentPolarisContext(), PolarisEntity.toCoreList(newCatalogPath), @@ -1771,8 +1762,7 @@ private void createTableLike( PolarisEntity returnedEntity = PolarisEntity.of( - entityManager - .getMetaStoreManager() + getMetaStoreManager() .createEntityIfNotExists( getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity)); LOGGER.debug("Created TableLike entity {} with TableIdentifier {}", entity, identifier); @@ -1801,8 +1791,7 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) { List catalogPath = resolvedEntities.getRawParentPath(); PolarisEntity returnedEntity = Optional.ofNullable( - entityManager - .getMetaStoreManager() + getMetaStoreManager() .updateEntityPropertiesIfNotChanged( getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity) .getEntity()) @@ -1851,8 +1840,7 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) { } } - return entityManager - .getMetaStoreManager() + return getMetaStoreManager() .dropEntityIfExists( getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), @@ -1895,10 +1883,7 @@ private boolean sendNotificationForTableLike( .setCatalogId(getCatalogId()) .setSubType(PolarisEntitySubType.TABLE) .setId( - entityManager - .getMetaStoreManager() - .generateNewEntityId(getCurrentPolarisContext()) - .getId()) + getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId()) .setLastNotificationTimestamp(request.getPayload().getTimestamp()) .build(); } else { @@ -1984,8 +1969,7 @@ private List listTableLike(PolarisEntitySubType subType, Namesp List catalogPath = resolvedEntities.getRawFullPath(); List entities = PolarisEntity.toNameAndIdList( - entityManager - .getMetaStoreManager() + getMetaStoreManager() .listEntities( getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java index 53cc053b8..5488f820a 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java @@ -27,11 +27,8 @@ import java.net.URLEncoder; import java.nio.charset.Charset; import java.util.EnumSet; -import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -47,7 +44,6 @@ import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; -import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; @@ -308,7 +304,7 @@ public Response updateTable( Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); - if (isCreate(commitTableRequest)) { + if (PolarisCatalogHandlerWrapper.isCreate(commitTableRequest)) { return Response.ok( newHandlerWrapper(securityContext, prefix) .updateTableForStagedCreate(tableIdentifier, commitTableRequest)) @@ -321,27 +317,6 @@ public Response updateTable( } } - /** - * TODO: Make the helper in org.apache.iceberg.rest.CatalogHandlers public instead of needing to - * copy/pastehere. - */ - private static boolean isCreate(UpdateTableRequest request) { - boolean isCreate = - request.requirements().stream() - .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance); - - if (isCreate) { - List invalidRequirements = - request.requirements().stream() - .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist)) - .collect(Collectors.toList()); - Preconditions.checkArgument( - invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements); - } - - return isCreate; - } - @Override public Response createView( String prefix, diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java index 78e3935d8..29aa328c7 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.catalog; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.io.Closeable; import java.io.IOException; @@ -31,6 +32,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.MetadataUpdate; @@ -38,8 +40,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.Transaction; -import org.apache.iceberg.Transactions; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -48,6 +49,7 @@ import org.apache.iceberg.catalog.ViewCatalog; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -68,6 +70,7 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.polaris.core.PolarisConfiguration; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizableOperation; import org.apache.polaris.core.auth.PolarisAuthorizer; @@ -77,7 +80,9 @@ import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.persistence.PolarisEntityManager; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.TransactionWorkspaceMetaStoreManager; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; import org.apache.polaris.core.persistence.resolver.ResolverPath; import org.apache.polaris.core.persistence.resolver.ResolverStatus; @@ -136,6 +141,27 @@ public PolarisCatalogHandlerWrapper( this.catalogFactory = catalogFactory; } + /** + * TODO: Make the helper in org.apache.iceberg.rest.CatalogHandlers public instead of needing to + * copy/paste here. + */ + public static boolean isCreate(UpdateTableRequest request) { + boolean isCreate = + request.requirements().stream() + .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance); + + if (isCreate) { + List invalidRequirements = + request.requirements().stream() + .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist)) + .collect(Collectors.toList()); + Preconditions.checkArgument( + invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements); + } + + return isCreate; + } + private void initializeCatalog() { this.baseCatalog = catalogFactory.createCallContextCatalog( @@ -954,25 +980,91 @@ public void commitTransaction(CommitTransactionRequest commitTransactionRequest) throw new BadRequestException("Cannot update table on external catalogs."); } - // TODO: Implement this properly - List transactions = - commitTransactionRequest.tableChanges().stream() - .map( - change -> { - Table table = baseCatalog.loadTable(change.identifier()); - if (!(table instanceof BaseTable)) { - throw new IllegalStateException( - "Cannot wrap catalog that does not produce BaseTable"); - } - Transaction transaction = - Transactions.newTransaction( - change.identifier().toString(), ((BaseTable) table).operations()); - CatalogHandlers.updateTable(baseCatalog, change.identifier(), change); - return transaction; - }) - .toList(); + if (!(baseCatalog instanceof BasePolarisCatalog)) { + throw new BadRequestException( + "Unsupported operation: commitTransaction with baseCatalog type: %s", + baseCatalog.getClass().getName()); + } - transactions.forEach(Transaction::commitTransaction); + // Swap in TransactionWorkspaceMetaStoreManager for all mutations made by this baseCatalog to + // only go into an in-memory collection that we can commit as a single atomic unit after all + // validations. + TransactionWorkspaceMetaStoreManager transactionMetaStoreManager = + new TransactionWorkspaceMetaStoreManager(entityManager.getMetaStoreManager()); + ((BasePolarisCatalog) baseCatalog).setMetaStoreManager(transactionMetaStoreManager); + + commitTransactionRequest.tableChanges().stream() + .forEach( + change -> { + Table table = baseCatalog.loadTable(change.identifier()); + if (!(table instanceof BaseTable)) { + throw new IllegalStateException( + "Cannot wrap catalog that does not produce BaseTable"); + } + if (isCreate(change)) { + throw new BadRequestException( + "Unsupported operation: commitTranaction with updateForStagedCreate: %s", + change); + } + + TableOperations tableOps = ((BaseTable) table).operations(); + TableMetadata currentMetadata = tableOps.current(); + + // Validate requirements; any CommitFailedExceptions will fail the overall request + change.requirements().forEach(requirement -> requirement.validate(currentMetadata)); + + // Apply changes + TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(currentMetadata); + change.updates().stream() + .forEach( + singleUpdate -> { + // Note: If location-overlap checking is refactored to be atomic, we could + // support validation within a single multi-table transaction as well, but + // will need to update the TransactionWorkspaceMetaStoreManager to better + // expose the concept of being able to read uncommitted updates. + if (singleUpdate instanceof MetadataUpdate.SetLocation) { + if (!currentMetadata + .location() + .equals(((MetadataUpdate.SetLocation) singleUpdate).location()) + && !callContext + .getPolarisCallContext() + .getConfigurationStore() + .getConfiguration( + callContext.getPolarisCallContext(), + PolarisConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) { + throw new BadRequestException( + "Unsupported operation: commitTransaction containing SetLocation" + + " for table '%s' and new location '%s'", + change.identifier(), + ((MetadataUpdate.SetLocation) singleUpdate).location()); + } + } + + // Apply updates to builder + singleUpdate.applyTo(metadataBuilder); + }); + + // Commit into transaction workspace we swapped the baseCatalog to use + TableMetadata updatedMetadata = metadataBuilder.build(); + if (!updatedMetadata.changes().isEmpty()) { + tableOps.commit(currentMetadata, updatedMetadata); + } + }); + + // Commit the collected updates in a single atomic operation + List pendingUpdates = + transactionMetaStoreManager.getPendingUpdates(); + PolarisMetaStoreManager.EntitiesResult result = + entityManager + .getMetaStoreManager() + .updateEntitiesPropertiesIfNotChanged( + callContext.getPolarisCallContext(), pendingUpdates); + if (!result.isSuccess()) { + // TODO: Retries and server-side cleanup on failure + throw new CommitFailedException( + "Transaction commit failed with status: %s, extraInfo: %s", + result.getReturnStatus(), result.getExtraInformation()); + } } public ListTablesResponse listViews(Namespace namespace) { diff --git a/polaris-service/src/test/java/org/apache/polaris/service/catalog/PolarisRestCatalogIntegrationTest.java b/polaris-service/src/test/java/org/apache/polaris/service/catalog/PolarisRestCatalogIntegrationTest.java index 5c858ccf9..7b937fdce 100644 --- a/polaris-service/src/test/java/org/apache/polaris/service/catalog/PolarisRestCatalogIntegrationTest.java +++ b/polaris-service/src/test/java/org/apache/polaris/service/catalog/PolarisRestCatalogIntegrationTest.java @@ -18,7 +18,6 @@ */ package org.apache.polaris.service.catalog; -import static org.apache.iceberg.types.Types.NestedField.required; import static org.apache.polaris.service.context.DefaultContextResolver.REALM_PROPERTY_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -37,14 +36,22 @@ import java.util.Optional; import java.util.UUID; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.BaseTransaction; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.auth.OAuth2Properties; @@ -116,7 +123,6 @@ public class PolarisRestCatalogIntegrationTest extends CatalogTests ConfigOverride.config( "server.adminConnectors[0].port", "0")); // Bind to random port to support parallelism - protected static final Schema SCHEMA = new Schema(required(4, "data", Types.StringType.get())); protected static final String VIEW_QUERY = "select * from ns1.layer1_table"; private RESTCatalog restCatalog; @@ -769,4 +775,191 @@ public void testSendNotificationInternalCatalog() { .returns("Cannot update internal catalog via notifications", ErrorResponse::message); } } + + // Test copied from iceberg/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java + // TODO: If TestRESTCatalog can be refactored to be more usable as a shared base test class, + // just inherit these test cases from that instead of copying them here. + @Test + public void diffAgainstSingleTable() { + Namespace namespace = Namespace.of("namespace"); + TableIdentifier identifier = TableIdentifier.of(namespace, "multipleDiffsAgainstSingleTable"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(namespace); + } + + Table table = catalog().buildTable(identifier, SCHEMA).create(); + Transaction transaction = table.newTransaction(); + + UpdateSchema updateSchema = + transaction.updateSchema().addColumn("new_col", Types.LongType.get()); + Schema expectedSchema = updateSchema.apply(); + updateSchema.commit(); + + UpdatePartitionSpec updateSpec = + transaction.updateSpec().addField("shard", Expressions.bucket("id", 16)); + PartitionSpec expectedSpec = updateSpec.apply(); + updateSpec.commit(); + + TableCommit tableCommit = + TableCommit.create( + identifier, + ((BaseTransaction) transaction).startMetadata(), + ((BaseTransaction) transaction).currentMetadata()); + + restCatalog.commitTransaction(tableCommit); + + Table loaded = catalog().loadTable(identifier); + assertThat(loaded.schema().asStruct()).isEqualTo(expectedSchema.asStruct()); + assertThat(loaded.spec().fields()).isEqualTo(expectedSpec.fields()); + } + + // Test copied from iceberg/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java + // TODO: If TestRESTCatalog can be refactored to be more usable as a shared base test class, + // just inherit these test cases from that instead of copying them here. + @Test + public void multipleDiffsAgainstMultipleTables() { + Namespace namespace = Namespace.of("multiDiffNamespace"); + TableIdentifier identifier1 = TableIdentifier.of(namespace, "multiDiffTable1"); + TableIdentifier identifier2 = TableIdentifier.of(namespace, "multiDiffTable2"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(namespace); + } + + Table table1 = catalog().buildTable(identifier1, SCHEMA).create(); + Table table2 = catalog().buildTable(identifier2, SCHEMA).create(); + Transaction t1Transaction = table1.newTransaction(); + Transaction t2Transaction = table2.newTransaction(); + + UpdateSchema updateSchema = + t1Transaction.updateSchema().addColumn("new_col", Types.LongType.get()); + Schema expectedSchema = updateSchema.apply(); + updateSchema.commit(); + + UpdateSchema updateSchema2 = + t2Transaction.updateSchema().addColumn("new_col2", Types.LongType.get()); + Schema expectedSchema2 = updateSchema2.apply(); + updateSchema2.commit(); + + TableCommit tableCommit1 = + TableCommit.create( + identifier1, + ((BaseTransaction) t1Transaction).startMetadata(), + ((BaseTransaction) t1Transaction).currentMetadata()); + + TableCommit tableCommit2 = + TableCommit.create( + identifier2, + ((BaseTransaction) t2Transaction).startMetadata(), + ((BaseTransaction) t2Transaction).currentMetadata()); + + restCatalog.commitTransaction(tableCommit1, tableCommit2); + + assertThat(catalog().loadTable(identifier1).schema().asStruct()) + .isEqualTo(expectedSchema.asStruct()); + + assertThat(catalog().loadTable(identifier2).schema().asStruct()) + .isEqualTo(expectedSchema2.asStruct()); + } + + // Test copied from iceberg/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java + // TODO: If TestRESTCatalog can be refactored to be more usable as a shared base test class, + // just inherit these test cases from that instead of copying them here. + @Test + public void multipleDiffsAgainstMultipleTablesLastFails() { + Namespace namespace = Namespace.of("multiDiffNamespace"); + TableIdentifier identifier1 = TableIdentifier.of(namespace, "multiDiffTable1"); + TableIdentifier identifier2 = TableIdentifier.of(namespace, "multiDiffTable2"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(namespace); + } + + catalog().createTable(identifier1, SCHEMA); + catalog().createTable(identifier2, SCHEMA); + + Table table1 = catalog().loadTable(identifier1); + Table table2 = catalog().loadTable(identifier2); + Schema originalSchemaOne = table1.schema(); + + Transaction t1Transaction = catalog().loadTable(identifier1).newTransaction(); + t1Transaction.updateSchema().addColumn("new_col1", Types.LongType.get()).commit(); + + Transaction t2Transaction = catalog().loadTable(identifier2).newTransaction(); + t2Transaction.updateSchema().renameColumn("data", "new-column").commit(); + + // delete the colum that is being renamed in the above TX to cause a conflict + table2.updateSchema().deleteColumn("data").commit(); + Schema updatedSchemaTwo = table2.schema(); + + TableCommit tableCommit1 = + TableCommit.create( + identifier1, + ((BaseTransaction) t1Transaction).startMetadata(), + ((BaseTransaction) t1Transaction).currentMetadata()); + + TableCommit tableCommit2 = + TableCommit.create( + identifier2, + ((BaseTransaction) t2Transaction).startMetadata(), + ((BaseTransaction) t2Transaction).currentMetadata()); + + assertThatThrownBy(() -> restCatalog.commitTransaction(tableCommit1, tableCommit2)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1"); + + Schema schema1 = catalog().loadTable(identifier1).schema(); + assertThat(schema1.asStruct()).isEqualTo(originalSchemaOne.asStruct()); + + Schema schema2 = catalog().loadTable(identifier2).schema(); + assertThat(schema2.asStruct()).isEqualTo(updatedSchemaTwo.asStruct()); + assertThat(schema2.findField("data")).isNull(); + assertThat(schema2.findField("new-column")).isNull(); + assertThat(schema2.columns()).hasSize(1); + } + + @Test + public void testMultipleConflictingCommitsToSingleTableInTransaction() { + Namespace namespace = Namespace.of("ns1"); + TableIdentifier identifier = + TableIdentifier.of(namespace, "multipleConflictingCommitsAgainstSingleTable"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(namespace); + } + + // Start two independent transactions on the same base table. + Table table = catalog().buildTable(identifier, SCHEMA).create(); + Schema originalSchema = catalog().loadTable(identifier).schema(); + Transaction transaction1 = table.newTransaction(); + Transaction transaction2 = table.newTransaction(); + + transaction1.updateSchema().renameColumn("data", "new-column1").commit(); + transaction2.updateSchema().renameColumn("data", "new-column2").commit(); + + TableCommit tableCommit1 = + TableCommit.create( + identifier, + ((BaseTransaction) transaction1).startMetadata(), + ((BaseTransaction) transaction1).currentMetadata()); + TableCommit tableCommit2 = + TableCommit.create( + identifier, + ((BaseTransaction) transaction2).startMetadata(), + ((BaseTransaction) transaction2).currentMetadata()); + + // "Initial" commit requirements will succeed for both commits being based on the original + // table but should fail the entire transaction on the second commit. + assertThatThrownBy(() -> restCatalog.commitTransaction(tableCommit1, tableCommit2)) + .isInstanceOf(CommitFailedException.class); + + // If an implementation validates all UpdateRequirements up-front, then it might pass + // tests where the UpdateRequirement fails up-front without being atomic. Here we can + // catch such scenarios where update requirements appear to be fine up-front but will + // fail when trying to commit the second update, and verify that nothing was actually + // committed in the end. + Schema latestCommittedSchema = catalog().loadTable(identifier).schema(); + assertThat(latestCommittedSchema.asStruct()).isEqualTo(originalSchema.asStruct()); + } }