-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2159] Adding support for partition level copy in Iceberg distcp #4058
Changes from 22 commits
02ae2fc
981357c
7cd9353
82d10d3
c43d3e1
0cf7638
63bb9aa
6e1cf6b
065cde3
a13220d
e1d812f
4364044
66d81a3
24b4823
d8356e1
4dcc88b
46bd976
e1e6f57
b6163ba
6c73a25
9c35733
cdc863a
1dbe929
383ed91
942ad8d
6a4cf78
2adaa8b
c948854
a55ee61
1afc37a
bb35070
eeb8d25
675e8bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.iceberg.CatalogUtil; | ||
import org.apache.iceberg.Table; | ||
import org.apache.iceberg.TableOperations; | ||
import org.apache.iceberg.catalog.Catalog; | ||
import org.apache.iceberg.catalog.TableIdentifier; | ||
|
@@ -43,7 +44,10 @@ protected BaseIcebergCatalog(String catalogName, Class<? extends Catalog> compan | |
@Override | ||
public IcebergTable openTable(String dbName, String tableName) { | ||
TableIdentifier tableId = TableIdentifier.of(dbName, tableName); | ||
return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), createTableOperations(tableId), this.getCatalogUri()); | ||
return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), | ||
createTableOperations(tableId), | ||
this.getCatalogUri(), | ||
loadTableInstance(tableId)); | ||
} | ||
|
||
protected Catalog createCompanionCatalog(Map<String, String> properties, Configuration configuration) { | ||
|
@@ -67,4 +71,6 @@ protected String getDatasetDescriptorPlatform() { | |
} | ||
|
||
protected abstract TableOperations createTableOperations(TableIdentifier tableId); | ||
|
||
protected abstract Table loadTableInstance(TableIdentifier tableId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for good measure you could also make I'm tempted to re-situate the exception as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed not throwing here instead catching NoSuchTableException in BaseIcebergCatalog::openTable and throwing IcebergTable.TableNotFoundException from there. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.gobblin.data.management.copy.iceberg; | ||
|
||
import java.io.IOException; | ||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Properties; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.ExecutionException; | ||
|
||
import org.apache.iceberg.DataFile; | ||
import org.apache.iceberg.catalog.TableIdentifier; | ||
import org.apache.iceberg.util.SerializationUtil; | ||
|
||
import com.github.rholder.retry.Attempt; | ||
import com.github.rholder.retry.RetryException; | ||
import com.github.rholder.retry.RetryListener; | ||
import com.github.rholder.retry.Retryer; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.typesafe.config.Config; | ||
import com.typesafe.config.ConfigFactory; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import org.apache.gobblin.commit.CommitStep; | ||
import org.apache.gobblin.util.retry.RetryerFactory; | ||
|
||
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS; | ||
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES; | ||
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE; | ||
import static org.apache.gobblin.util.retry.RetryerFactory.RetryType; | ||
|
||
/** | ||
* Commit step for overwriting partitions in an Iceberg table. | ||
* <p> | ||
* This class implements the {@link CommitStep} interface and provides functionality to overwrite | ||
* partitions in the destination Iceberg table using serialized data files. | ||
* </p> | ||
*/ | ||
@Slf4j | ||
public class IcebergOverwritePartitionsStep implements CommitStep { | ||
private final String destTableIdStr; | ||
private final Properties properties; | ||
private final byte[] serializedDataFiles; | ||
private final String partitionColName; | ||
private final String partitionValue; | ||
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + | ||
".catalog.overwrite.partitions.retries"; | ||
private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of( | ||
RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L), | ||
RETRY_TIMES, 3, | ||
RETRY_TYPE, RetryType.FIXED_ATTEMPT.name())); | ||
|
||
/** | ||
* Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters. | ||
* | ||
* @param destTableIdStr the identifier of the destination table as a string | ||
* @param serializedDataFiles [from List<DataFiles>] the serialized data files to be used for replacing partitions | ||
* @param properties the properties containing configuration | ||
*/ | ||
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, byte[] serializedDataFiles, Properties properties) { | ||
this.destTableIdStr = destTableIdStr; | ||
this.partitionColName = partitionColName; | ||
this.partitionValue = partitionValue; | ||
this.serializedDataFiles = serializedDataFiles; | ||
this.properties = properties; | ||
} | ||
|
||
@Override | ||
public boolean isCompleted() { | ||
return false; | ||
} | ||
|
||
/** | ||
* Executes the partition replacement in the destination Iceberg table. | ||
* Also, have retry mechanism as done in {@link IcebergRegisterStep#execute()} | ||
* | ||
* @throws IOException if an I/O error occurs during execution | ||
*/ | ||
@Override | ||
public void execute() throws IOException { | ||
Blazer-007 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// In IcebergRegisterStep::execute we validated if dest table metadata prior to starting the generate copy entities | ||
// is similar to table metadata while committing metadata in IcebergRegisterStep but that check in here will lead | ||
// to failure most of the time here as it is possible that the table metadata has changed (maybe data has been | ||
// written to newer partitions or other cases as well) between the time of generating copy entities | ||
// and committing metadata. Hence, we are not doing that check here. | ||
// Incase data has been written to the partition we are trying to overwrite, the overwrite step will remove the data | ||
// and copy only data that has been collected in the copy entities. | ||
Blazer-007 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
IcebergTable destTable = createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr)); | ||
List<DataFile> dataFiles = SerializationUtil.deserializeFromBytes(this.serializedDataFiles); | ||
try { | ||
log.info("~{}~ Starting partition overwrite - partition: {}; value: {}; numDataFiles: {}; path[0]: {}", | ||
this.destTableIdStr, | ||
this.partitionColName, | ||
this.partitionValue, | ||
dataFiles.size(), | ||
dataFiles.get(0).path() | ||
); | ||
Retryer<Void> overwritePartitionsRetryer = createOverwritePartitionsRetryer(); | ||
overwritePartitionsRetryer.call(() -> { | ||
destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue); | ||
return null; | ||
}); | ||
log.info("~{}~ Successful partition overwrite - partition: {}; value: {}", | ||
this.destTableIdStr, | ||
this.partitionColName, | ||
this.partitionValue | ||
); | ||
} catch (ExecutionException executionException) { | ||
String msg = String.format("Failed to overwrite partitions for destination iceberg table : {%s}", this.destTableIdStr); | ||
Blazer-007 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log.error(msg, executionException); | ||
throw new RuntimeException(msg, executionException.getCause()); | ||
} catch (RetryException retryException) { | ||
String interruptedNote = Thread.currentThread().isInterrupted() ? "... then interrupted" : ""; | ||
String msg = String.format("Failed to overwrite partition for destination table : {%s} : (retried %d times) %s ", | ||
Blazer-007 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.destTableIdStr, | ||
retryException.getNumberOfFailedAttempts(), | ||
interruptedNote); | ||
Throwable informativeException = retryException.getLastFailedAttempt().hasException() | ||
? retryException.getLastFailedAttempt().getExceptionCause() | ||
: retryException; | ||
log.error(msg, informativeException); | ||
throw new RuntimeException(msg, informativeException); | ||
} | ||
} | ||
|
||
protected IcebergCatalog createDestinationCatalog() throws IOException { | ||
return IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION); | ||
} | ||
|
||
private Retryer<Void> createOverwritePartitionsRetryer() { | ||
Config config = ConfigFactory.parseProperties(this.properties); | ||
Config retryerOverridesConfig = config.hasPath(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX) | ||
? config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX) | ||
: ConfigFactory.empty(); | ||
|
||
return RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() { | ||
@Override | ||
public <V> void onRetry(Attempt<V> attempt) { | ||
if (attempt.hasException()) { | ||
String msg = String.format("Exception caught while overwriting partitions for destination table : {%s} : [attempt: %d; %s after start]", | ||
Blazer-007 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
destTableIdStr, | ||
attempt.getAttemptNumber(), | ||
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString()); | ||
log.warn(msg, attempt.getExceptionCause()); | ||
} | ||
} | ||
})); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed a test now validating a throw of
org.apache.iceberg.exceptions.NoSuchTableException
. does that arise from this call toloadTableInstance
?wherever it is, let's catch it, so it exceptions from
org.apache.iceberg
don't bleed through. locate where and re-wrap with (our own)IcebergTable.TableNotFoundException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it arise from
loadTableInstance
or more specifically fromcatalog.loadTable(tableIdentifier)
and i dont believe it will be easy to catch it and rethrow asIcebergTable.TableNotFoundException
because this exception arises before creating IcebergTable instance itself as opposite to catalog.newTableOps(tableIdentifier) which doesn't throw this exception while initializing rather it throws when used for first time inside IcebergTable.Please let me know your thoughts on this if we really want to throw
IcebergTable.TableNotFoundException
itself in catalog onlyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's OK to raise the exception at creation time rather than upon first-use. in fact, that's arguably even better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g.