Skip to content

Commit

Permalink
Merge branch 'main' into honahx_table_metadata_v3_parse_and_test_2
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Jan 10, 2025
2 parents 4d5c87d + a100e6a commit 8f667de
Show file tree
Hide file tree
Showing 25 changed files with 3,242 additions and 183 deletions.
2 changes: 1 addition & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ github:
- marton-bod
- samarthjain
- SreeramGarlapati
- samredai
- gaborkaszab
- bitsondatadev
- ajantha-bhat
- jbonofre
- manuzhang
ghp_branch: gh-pages
ghp_path: /

Expand Down
25 changes: 19 additions & 6 deletions api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
* <ol>
* <li>The name of the latest metadata.json rewritten to staging location. After the files are
* copied, this will be the root of the copied table.
* <li>A list of all files added to the table between startVersion and endVersion, including their
* original and target paths under the target prefix. This list covers both original and
* rewritten files, allowing for copying to the target paths to form the copied table.
* <li>A 'copy-plan'. This is a list of all files added to the table between startVersion and
* endVersion, including their original and target paths under the target prefix. This list
* covers both original and rewritten files, allowing for copying a functioning version of the
* source table to the target prefix.
* </ol>
*/
public interface RewriteTablePath extends Action<RewriteTablePath, RewriteTablePath.Result> {
Expand Down Expand Up @@ -91,9 +92,21 @@ interface Result {
String stagingLocation();

/**
* Path to a comma-separated list of source and target paths for all files added to the table
* between startVersion and endVersion, including original data files and metadata files
* rewritten to staging.
* Result file list location. This file contains a listing of all files added to the table
* between startVersion and endVersion, comma-separated. <br>
* For each file, it will include the source path (either the original path in the table, or in
* the staging location if rewritten), and the target path (under the new prefix).
*
* <p>Example file content:
*
* <pre><code>
* sourcepath/datafile1.parquet,targetpath/datafile1.parquet
* sourcepath/datafile2.parquet,targetpath/datafile2.parquet
* stagingpath/manifest.avro,targetpath/manifest.avro
* </code></pre>
*
* <br>
* This allows for copying a functioning version of the table to the target prefix.
*/
String fileListLocation();

Expand Down
57 changes: 57 additions & 0 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

Expand Down Expand Up @@ -228,4 +235,54 @@ private static BigInteger randomUnscaled(int precision, Random random) {

return new BigInteger(sb.toString());
}

public static List<Object> generateList(
Random random, Types.ListType list, Supplier<Object> elementResult) {
int numElements = random.nextInt(20);

List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
for (int i = 0; i < numElements; i += 1) {
// return null 5% of the time when the value is optional
if (list.isElementOptional() && random.nextInt(20) == 1) {
result.add(null);
} else {
result.add(elementResult.get());
}
}

return result;
}

public static Map<Object, Object> generateMap(
Random random, Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
int numEntries = random.nextInt(20);

Map<Object, Object> result = Maps.newLinkedHashMap();
Supplier<Object> keyFunc;
if (map.keyType() == Types.StringType.get()) {
keyFunc = () -> keyResult.get().toString();
} else {
keyFunc = keyResult;
}

Set<Object> keySet = Sets.newHashSet();
for (int i = 0; i < numEntries; i += 1) {
Object key = keyFunc.get();
// ensure no collisions
while (keySet.contains(key)) {
key = keyFunc.get();
}

keySet.add(key);

// return null 5% of the time when the value is optional
if (map.isValueOptional() && random.nextInt(20) == 1) {
result.put(key, null);
} else {
result.put(key, valueResult.get());
}
}

return result;
}
}
36 changes: 25 additions & 11 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,22 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.DataFileSet;

/** {@link AppendFiles Append} implementation that adds a new manifest file for the write. */
/** {@link AppendFiles Append} implementation that adds new manifest files for writes. */
class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final String tableName;
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final DataFileSet newFiles = DataFileSet.create();
private final Map<Integer, DataFileSet> newDataFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
private final List<ManifestFile> newManifests = Lists.newArrayList();
private boolean hasNewFiles = false;

FastAppend(String tableName, TableOperations ops) {
super(ops);
this.tableName = tableName;
this.spec = ops().current().spec();
}

@Override
Expand Down Expand Up @@ -78,14 +77,28 @@ protected Map<String, String> summary() {
@Override
public FastAppend appendFile(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newFiles.add(file)) {
PartitionSpec spec = spec(file.specId());
Preconditions.checkArgument(
spec != null,
"Cannot find partition spec %s for data file: %s",
file.specId(),
file.location());

DataFileSet dataFiles =
newDataFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DataFileSet.create());

if (dataFiles.add(file)) {
this.hasNewFiles = true;
summaryBuilder.addedFile(spec, file);
}

return this;
}

private PartitionSpec spec(int specId) {
return ops().current().spec(specId);
}

@Override
public FastAppend toBranch(String branch) {
targetBranch(branch);
Expand Down Expand Up @@ -176,7 +189,7 @@ protected void cleanUncommitted(Set<ManifestFile> committed) {
}
}
if (hasDeletes) {
this.newManifests = null;
this.newManifests.clear();
}
}

Expand All @@ -200,13 +213,14 @@ protected boolean cleanupAfterCommit() {
}

private List<ManifestFile> writeNewManifests() throws IOException {
if (hasNewFiles && newManifests != null) {
if (hasNewFiles && !newManifests.isEmpty()) {
newManifests.forEach(file -> deleteFile(file.path()));
newManifests = null;
newManifests.clear();
}

if (newManifests == null && !newFiles.isEmpty()) {
this.newManifests = writeDataManifests(newFiles, spec);
if (newManifests.isEmpty() && !newDataFilesBySpec.isEmpty()) {
newDataFilesBySpec.forEach(
(specId, dataFiles) -> newManifests.addAll(writeDataManifests(dataFiles, spec(specId))));
hasNewFiles = false;
}

Expand Down
Loading

0 comments on commit 8f667de

Please sign in to comment.