Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2159] Adding support for partition level copy in Iceberg distcp #4058

Merged
merged 33 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
02ae2fc
initial changes for iceberg distcp partition copy
Blazer-007 Sep 12, 2024
981357c
added datetime filter predicate with unit tests
Blazer-007 Sep 16, 2024
7cd9353
changing string.class to object.class
Blazer-007 Sep 17, 2024
82d10d3
updated replace partition to use serialized data files
Blazer-007 Sep 19, 2024
c43d3e1
some code cleanup
Blazer-007 Sep 20, 2024
0cf7638
added unit test
Blazer-007 Sep 20, 2024
63bb9aa
added replace partition unit test
Blazer-007 Sep 20, 2024
6e1cf6b
refactored and added more test
Blazer-007 Sep 21, 2024
065cde3
added javadoc
Blazer-007 Sep 21, 2024
a13220d
removed extra lines
Blazer-007 Sep 21, 2024
e1d812f
some minor changes
Blazer-007 Sep 21, 2024
4364044
added retry and tests for replace partitions commit step
Blazer-007 Sep 22, 2024
66d81a3
minor test changes
Blazer-007 Sep 22, 2024
24b4823
added metadata validator
Blazer-007 Sep 23, 2024
d8356e1
removed validator class for now
Blazer-007 Sep 24, 2024
4dcc88b
addressed comments and removed some classes for now
Blazer-007 Sep 27, 2024
46bd976
fixing checkstyle bugs and disabling newly added tests to find root c…
Blazer-007 Sep 27, 2024
e1e6f57
addressed pr comments and added few extra logs
Blazer-007 Oct 8, 2024
b6163ba
refactored classes
Blazer-007 Oct 17, 2024
6c73a25
removed extra import statements
Blazer-007 Oct 17, 2024
9c35733
enabled the tests
Blazer-007 Oct 17, 2024
cdc863a
fixed iceberg table tests
Blazer-007 Oct 17, 2024
1dbe929
some refactoring
Blazer-007 Oct 22, 2024
383ed91
refactored tests as per review comments
Blazer-007 Oct 22, 2024
942ad8d
throw tablenotfoundexception in place of nosuchtableexception
Blazer-007 Oct 22, 2024
6a4cf78
fixed throwing proper exception
Blazer-007 Oct 23, 2024
2adaa8b
removed unused imports
Blazer-007 Oct 23, 2024
c948854
replcaed runtime exception with ioexception
Blazer-007 Oct 23, 2024
a55ee61
added check to avoid printing same log line
Blazer-007 Oct 23, 2024
1afc37a
fixed import order
Blazer-007 Oct 23, 2024
bb35070
added catch for CheckedExceptionFunction.WrappedIOException wrapper
Blazer-007 Oct 23, 2024
eeb8d25
fixed compile issue
Blazer-007 Oct 23, 2024
675e8bb
removing unwanted logging
Blazer-007 Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.iceberg.exceptions.NoSuchTableException;

/**
* Base implementation of {@link IcebergCatalog} to access {@link IcebergTable} and the
Expand All @@ -44,10 +45,12 @@ protected BaseIcebergCatalog(String catalogName, Class<? extends Catalog> compan
@Override
public IcebergTable openTable(String dbName, String tableName) throws IcebergTable.TableNotFoundException {
TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(),
createTableOperations(tableId),
this.getCatalogUri(),
loadTableInstance(tableId));
try {
return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(),
createTableOperations(tableId), this.getCatalogUri(), loadTableInstance(tableId));
} catch (NoSuchTableException ex) {
throw new IcebergTable.TableNotFoundException(tableId);
}
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
}

protected Catalog createCompanionCatalog(Map<String, String> properties, Configuration configuration) {
Expand All @@ -72,5 +75,5 @@ protected String getDatasetDescriptorPlatform() {

protected abstract TableOperations createTableOperations(TableIdentifier tableId);

protected abstract Table loadTableInstance(TableIdentifier tableId) throws IcebergTable.TableNotFoundException;
protected abstract Table loadTableInstance(TableIdentifier tableId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for good measure you could also make IcebergTable.TableNotFoundException a declared/checked exception here.

I'm tempted to re-situate the exception as IcebergCatalog.TableNotFoundException, but I don't want two classes w/ the same semantics - and renaming public interfaces is probably too late... so I'll make peace with the current name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed not throwing here instead catching NoSuchTableException in BaseIcebergCatalog::openTable and throwing IcebergTable.TableNotFoundException from there.

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,7 @@ public boolean tableAlreadyExists(IcebergTable icebergTable) {
}

@Override
protected Table loadTableInstance(TableIdentifier tableId) throws IcebergTable.TableNotFoundException {
try {
return hc.loadTable(tableId);
} catch (Exception e) {
throw new IcebergTable.TableNotFoundException(tableId);
}
protected Table loadTableInstance(TableIdentifier tableId) {
return hc.loadTable(tableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -53,6 +55,7 @@
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.util.function.CheckedExceptionFunction;
import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate;
import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil;
Expand Down Expand Up @@ -187,19 +190,18 @@ private Path addUUIDToPath(String filePathStr) {
return new Path(fileDir, newFileName);
}

private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, DataFile> destDataFileBySrcPath) {
private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, DataFile> destDataFileBySrcPath)
throws IOException {
Function<Path, FileStatus> getFileStatus = CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
Map<Path, FileStatus> srcFileStatusByDestFilePath = Maps.newHashMap();
destDataFileBySrcPath.forEach((srcPath, destDataFile) -> {
FileStatus srcFileStatus;
try {
srcFileStatus = this.sourceFs.getFileStatus(srcPath);
} catch (IOException e) {
String errMsg = String.format("~%s~ Failed to get file status for path : %s", this.getFileSetId(), srcPath);
log.error(errMsg);
throw new RuntimeException(errMsg, e);
}
srcFileStatusByDestFilePath.put(new Path(destDataFile.path().toString()), srcFileStatus);
});
try {
srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet()
.stream()
.collect(Collectors.toMap(entry -> new Path(entry.getValue().path().toString()),
entry -> getFileStatus.apply(entry.getKey())));
} catch (CheckedExceptionFunction.WrappedIOException wrapper) {
wrapper.rethrowWrapped();
}
return srcFileStatusByDestFilePath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;

import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;

Expand Down Expand Up @@ -237,31 +238,35 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst
* @throws RuntimeException if error occurred while reading the manifest file
*/
public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> icebergPartitionFilterPredicate)
throws TableNotFoundException {
throws IOException {
TableMetadata tableMetadata = accessTableMetadata();
Snapshot currentSnapshot = tableMetadata.currentSnapshot();
long currentSnapshotId = currentSnapshot.snapshotId();
List<DataFile> knownDataFiles = new ArrayList<>();
log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles", tableId, currentSnapshotId,
knownDataFiles.size());
GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker();
//TODO: Add support for deleteManifests as well later
// Currently supporting dataManifests only
List<ManifestFile> dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io());
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
for (ManifestFile manifestFile : dataManifestFiles) {
if (growthMilestoneTracker.isAnotherMilestone(knownDataFiles.size())) {
log.info("~{}~ for snapshot '{}' - before manifest-file '{}' '{}' total known iceberg datafiles", tableId,
currentSnapshotId,
manifestFile.path(),
knownDataFiles.size()
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this makes more sense here, given the synchronous reading of every manifest files happens within this method, rather than in the style of the Iterator<IcebergSnapshotInfo> returned by IcebergTable::getIncrementalSnapshotInfosIterator.

that said, I doubt we should still log tracked growth as this very same list is later transformed in IcebergPartitionDataset::calcDestDataFileBySrcPath. all the network calls are in this method, rather than over there, so the in-process transformation into CopyEntities should be quite fast. maybe just log once at the end of calcDestDataFileBySrcPath

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, seems a valid approach let me remove growthMileStonetracker from that function

try (ManifestReader<DataFile> manifestReader = ManifestFiles.read(manifestFile, this.tableOps.io());
CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) {
dataFiles.forEachRemaining(dataFile -> {
if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
knownDataFiles.add(dataFile.copy());
}
});
log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles", tableId, currentSnapshotId,
knownDataFiles.size());
} catch (IOException e) {
String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read manifest file: %s", tableId,
currentSnapshotId, manifestFile.path());
log.error(errMsg, e);
throw new RuntimeException(errMsg, e);
throw new IOException(errMsg, e);
}
}
return knownDataFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class IcebergDatasetTest {
private static final String MANIFEST_PATH_0 = ROOT_PATH + "metadata/manifest.a";
private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
private static final String REGISTER_COMMIT_STEP = "org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep";
private static final String REGISTER_COMMIT_STEP = IcebergRegisterStep.class.getName();
private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 =
new MockIcebergTable.SnapshotPaths(Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0, Arrays.asList(
new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

Expand Down Expand Up @@ -73,11 +76,9 @@ public class IcebergPartitionDatasetTest {
private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" + DEST_TEST_TABLE + "/data";
private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME = "testPartition";
private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE = "testValue";
private static final String OVERWRITE_COMMIT_STEP = "org.apache.gobblin.data.management.copy.iceberg.IcebergOverwritePartitionsStep";
private static final String OVERWRITE_COMMIT_STEP = IcebergOverwritePartitionsStep.class.getName();
private final Properties copyConfigProperties = new Properties();
private final Properties properties = new Properties();
private static final List<String> srcFilePaths = new ArrayList<>();

private static final URI SRC_FS_URI;
private static final URI DEST_FS_URI;

Expand Down Expand Up @@ -121,15 +122,16 @@ public void setUp() throws Exception {

@AfterMethod
public void cleanUp() {
srcFilePaths.clear();
icebergPartitionFilterPredicateUtil.close();
}

@Test
public void testGenerateCopyEntities() throws IOException {
List<String> srcFilePaths = new ArrayList<>();
srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc");
List<DataFile> mockSrcDataFiles = createDataFileMocks();
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(mockSrcDataFiles);
Map<String, DataFile> mockDataFilesBySrcPath = createDataFileMocksBySrcPath(srcFilePaths);
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(
new ArrayList<>(mockDataFilesBySrcPath.values()));

icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs,
true);
Expand All @@ -140,7 +142,7 @@ public void testGenerateCopyEntities() throws IOException {

Collection<CopyEntity> copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);

verifyCopyEntities(copyEntities, 2, true);
verifyCopyEntities(copyEntities, new ArrayList<>(mockDataFilesBySrcPath.keySet()), true);
}

@Test
Expand All @@ -154,19 +156,21 @@ public void testGenerateCopyEntitiesWithEmptyDataFiles() throws IOException {
Mockito.mock(CopyConfiguration.class));

// Since No data files are present, no copy entities should be generated
verifyCopyEntities(copyEntities, 0, true);
verifyCopyEntities(copyEntities, Collections.emptyList(), true);
}

@Test
public void testMultipleCopyEntitiesGenerated() throws IOException {
List<String> srcFilePaths = new ArrayList<>();
srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc");
srcFilePaths.add(SRC_WRITE_LOCATION + "/file2.orc");
srcFilePaths.add(SRC_WRITE_LOCATION + "/file3.orc");
srcFilePaths.add(SRC_WRITE_LOCATION + "/file4.orc");
srcFilePaths.add(SRC_WRITE_LOCATION + "/file5.orc");

List<DataFile> mockSrcDataFiles = createDataFileMocks();
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(mockSrcDataFiles);
Map<String, DataFile> mockDataFilesBySrcPath = createDataFileMocksBySrcPath(srcFilePaths);
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(
new ArrayList<>(mockDataFilesBySrcPath.values()));

icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs,
true);
Expand All @@ -177,17 +181,19 @@ public void testMultipleCopyEntitiesGenerated() throws IOException {

Collection<CopyEntity> copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);

verifyCopyEntities(copyEntities, 6, true);
verifyCopyEntities(copyEntities, new ArrayList<>(mockDataFilesBySrcPath.keySet()), true);
}

@Test
public void testWithDifferentSrcAndDestTableWriteLocation() throws IOException {
List<String> srcFilePaths = new ArrayList<>();
srcFilePaths.add(SRC_WRITE_LOCATION + "/randomFile--Name.orc");
Mockito.when(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "")).thenReturn(SRC_WRITE_LOCATION);
Mockito.when(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "")).thenReturn(DEST_WRITE_LOCATION);

List<DataFile> mockSrcDataFiles = createDataFileMocks();
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(mockSrcDataFiles);
Map<String, DataFile> mockDataFilesBySrcPath = createDataFileMocksBySrcPath(srcFilePaths);
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(
new ArrayList<>(mockDataFilesBySrcPath.values()));

icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs,
true);
Expand All @@ -199,7 +205,7 @@ public void testWithDifferentSrcAndDestTableWriteLocation() throws IOException {
List<CopyEntity> copyEntities =
(List<CopyEntity>) icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);

verifyCopyEntities(copyEntities, 2, false);
verifyCopyEntities(copyEntities, new ArrayList<>(mockDataFilesBySrcPath.keySet()), false);
}

private static void setupSrcFileSystem() throws IOException {
Expand All @@ -224,23 +230,23 @@ private static void setupDestFileSystem() throws IOException {
Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException());
}

private static List<DataFile> createDataFileMocks() throws IOException {
List<DataFile> dataFiles = new ArrayList<>();
private static Map<String, DataFile> createDataFileMocksBySrcPath(List<String> srcFilePaths) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how returning this Map allows you to be so succinct at every point of use:

Map<String, DataFile> mockDataFilesBySrcPath = createDataFileMocksBySrcPath(srcFilePaths);
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).
    thenReturn(new ArrayList<>(mockDataFilesBySrcPath.values()));

... // (above just a `.values()` and simply a `.keySet()` below)

verifyCopyEntities(copyEntities, new ArrayList<>(mockDataFilesBySrcPath.keySet()), false);

nice work!

Map<String, DataFile> dataFileMocksBySrcPath = new HashMap<>();
for (String srcFilePath : srcFilePaths) {
DataFile dataFile = Mockito.mock(DataFile.class);
Path dataFilePath = new Path(srcFilePath);
Path qualifiedPath = sourceFs.makeQualified(dataFilePath);
String qualifiedPath = sourceFs.makeQualified(dataFilePath).toString();
Mockito.when(dataFile.path()).thenReturn(dataFilePath.toString());
Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString()));
dataFiles.add(dataFile);
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath));
dataFileMocksBySrcPath.put(qualifiedPath, dataFile);
}
return dataFiles;
return dataFileMocksBySrcPath;
}

private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, int expectedCopyEntitiesSize,
private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expectedSrcFilePaths,
boolean sameSrcAndDestWriteLocation) {
Assert.assertEquals(copyEntities.size(), expectedCopyEntitiesSize);
List<String> actualSrcFilePaths = new ArrayList<>();
String srcWriteLocationStart = SRC_FS_URI + SRC_WRITE_LOCATION;
String destWriteLocationStart = DEST_FS_URI + (sameSrcAndDestWriteLocation ? SRC_WRITE_LOCATION : DEST_WRITE_LOCATION);
String srcErrorMsg = String.format("Source Location should start with %s", srcWriteLocationStart);
Expand All @@ -249,6 +255,7 @@ private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, int
String json = copyEntity.toString();
if (IcebergDatasetTest.isCopyableFile(json)) {
String originFilepath = IcebergDatasetTest.CopyEntityDeserializer.getOriginFilePathAsStringFromJson(json);
actualSrcFilePaths.add(originFilepath);
String destFilepath = IcebergDatasetTest.CopyEntityDeserializer.getDestinationFilePathAsStringFromJson(json);
Assert.assertTrue(originFilepath.startsWith(srcWriteLocationStart), srcErrorMsg);
Assert.assertTrue(destFilepath.startsWith(destWriteLocationStart), destErrorMsg);
Expand All @@ -261,6 +268,9 @@ private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, int
IcebergDatasetTest.verifyPostPublishStep(json, OVERWRITE_COMMIT_STEP);
}
}
Assert.assertEquals(actualSrcFilePaths.size(), expectedSrcFilePaths.size(),
"Set" + actualSrcFilePaths + " vs Set" + expectedSrcFilePaths);
Assert.assertEqualsNoOrder(actualSrcFilePaths.toArray(), expectedSrcFilePaths.toArray());
}

/**
Expand Down
Loading