Skip to content

Commit

Permalink
Rebase LI-Iceberg changes on top of Apache Iceberg 1.0.0 release (#131)
Browse files Browse the repository at this point in the history
* Rebase LI-Iceberg changes on top of Apache Iceberg 1.0.0 release

* Hive Catalog: Add a hive catalog that does not override existing Hive metadata (#10)

Add custom hive catalog to not override existing Hive metadata

Fail early with a proper exception if the metadata file is not existing

Simplify CustomHiveCatalog (#22)

* Shading: Add a iceberg-runtime shaded module (#12)

* ORC: Add test for reading files without Iceberg IDs (#16)

* Hive Metadata Scan: Support reading tables with only Hive metadata (#23, #24, #25, #26)

- Support for non string partition columns (#24)
- Support for Hive tables without avro.schema.literal (#25)
- Hive Metadata Scan: Notify ScanEvent listeners on planning (#35)
- Hive Metadata Scan: Do not use table snapshot summary for estimating statistics (#37)
- Hive Metadata Scan: Return empty statistics (#49)
- Hive Metadata Scan: Do not throw an exception on dangling partitions; log warning message (#50)
- Hive Metadata Scan: Fix pushdown of non-partition predicates within NOT (#51)

Co-authored-by: Ratandeep Ratti <[email protected]>
Co-authored-by: Kuai Yu <[email protected]>
Co-authored-by: Walaa Eldin Moustafa <[email protected]>

* Row level filtering: Allow table scans to pass a row level filter for ORC files

- ORC: Support NameMapping with row-level filtering (#53)

* Hive: Made Predicate Pushdown dynamic based on the Hive Version

* Hive: Fix uppercase bug and determine catalog from table properties (#38)

* Hive: Return lowercase fieldname from IcebergRecordStructField
* Hive: Determine catalog from table property

* Hive: Fix schema not forwarded to SerDe on MR jobs (#45) (#47)

* Hive: Use Hive table location in HiveIcebergSplit
* Hive: Fix schema not passed to Serde
* Hive: Refactor tests for tables with unqualified location URI

Co-authored-by: Shardul Mahadik <[email protected]>

* Hive Metadata Scan: Support case insensitive name mapping (#52)

* Hive Metadata Scan: Merge Hive and Avro schemas to fix datatype inconsistencies (#57)

Hive Metadata Scan: Fix Hive primitive to Avro logical type conversion (#58)

Hive Metadata Scan: Fix support for Hive timestamp type (#61)

Co-authored-by: Raymond Zhang <[email protected]>
Co-authored-by: Shardul Mahadik <[email protected]>

Fix HasDuplicateLowercaseColumnNames's visit method to use a new visi… (#67)

* Fix HasDuplicateLowercaseColumnNames's visit method to use a new visitor instance every time

* Trigger CI

(cherry picked from commit b90e838)

* Stop using serdeToFileFormat to unblock formats other than Avro or Orc (#64)

* Stop using serdeToFileFormat to unblock formats other than Avro or Orc

* Fix style check

* Do not delete metadata location when HMS has been successfully updated (#68)

(cherry picked from commit 766407e)

* Support reading Avro complex union types (#73)

Co-authored-by: Wenye Zhang <[email protected]>

* [#2039] Support default value semantic for AVRO (#75)

(cherry picked from commit c18f4c4)

* Support hive non string partition cols (#78)

* Support non-string hive type partition columns in LegacyHiveTableScan

* Leverage eval against partition filter expression to filter non-string columns

* Support default value read for ORC format in spark (#76)

* Support default value read for ORC format in spark

* Refactor common code for ReadBuilder for both non-vectorized and vectorized read

* Fix code style issue

* Add special handling of ROW_POSITION metadata column

* Add corner case check for partition field

* Use BaseDataReader.convertConstant to convert constants, and expand its functionality to support nested-type contants such as array/map/struct

* Support nested type default value for vectorized read

* Support deeply nested type default value for vectorized read

* Support reading ORC complex union types (#74)

* Support reading orc complex union types

* add more tests

* support union in VectorizedSparkOrcReaders and improve tests

* support union in VectorizedSparkOrcReaders and improve tests - continued

* fix checkstyle

Co-authored-by: Wenye Zhang <[email protected]>

* Support avro.schema.literal/hive union types in Hive legacy table to Iceberg conversion (#80)

* Fix ORC schema visitors to support reading ORC files with deeply nest… (#81)

* Fix ORC schema visitors to support reading ORC files with deeply nested union type schema

* Added test for vectorized read

* Disable avro validation for default values

Co-authored-by: Shenoda Guirguis <[email protected]>

* Fix spark avro reader reading union schema data (#83)

* Fix spark avro reader to read correctly structured nested data values

* Make sure field-id mapping is correctly maintained given arbitrary nested schema that contains union

* Avro: Change union read schema from hive to trino (#84)

* [LI] Avro: Refactor union-to-struct schema - Part 1. changes to support reading Avro

* ORC: Change union read schema from hive to trino (#85)

* [LI] ORC: Refactor union-to-struct schema - Part 2. changes to support reading ORC

* Change Hive type to Iceberg type conversion for union

* Recorder hive table properties to align the avro.schema.literal placement contract (#86)

* [#2039] Support default value semantic for AVRO

(cherry picked from commit c18f4c4)

* reverting commits 2c59857 and f362aed (#88)

Co-authored-by: Shenoda Guirguis <[email protected]>

* logically patching PR 2328 on HiveMetadataPreservingTableOperations

* Support timestamp as partition type (#91)

* Support timestamp in partition types

* Address comment

* Separate classes under hive legacy package to new hivelink module (#87)

* separate class under legacy to new hiveberg module

* fix build

* remove hiveberg dependency in iceberg-spark2 module

* Revert "remove hiveberg dependency in iceberg-spark2 module"

This reverts commit 2e8b743.

* rename hiveberg module to hivelink

Co-authored-by: Wenye Zhang <[email protected]>

* [LI] Align default value validation align with avro semantics in terms of nullable (nested) fields (#92)

* Align default value validation align with avro semantics in terms of nullable (nested) fields

* Allow setting null as default value for nested fields in record default

* [LI][Spark][Avro] read avro union using decoder instead of directly returning v… (#94)

* [LI][Spark] read avro union using decoder instead of directly returning value

* Add a comment for the schema

* Improve the logging when the deserailzed index is invalid to read the symbol from enum (#96)

* Move custom hive catalog to hivelink-core (#99)

* Handle non-nullable union of single type for Avro (#98)

* Handle non-nullable union of single type

Co-authored-by: Wenye Zhang <[email protected]>

* Handle null default in nested type default value situations (#100)

* Move 'Hive Metadata Scan: Support case insensitive name mapping' (PR 52) to hivelink-core (#102)

* Remove activeSparkSession (#103)

* Disable default value preserving (#106)

* Disable default value preserving

* [LI][Avro] Do not reorder elements inside a Avro union schema (#93)

* handle single type union properly in AvroSchemaVisitor for deep nested schema (#107)

* Handle non-nullable union of single type for ORC spark non-vectorized reader (#104)

* Handle single type union for non-vectorized reader

* [Avro] Retain the type of field while copying the default values. (#109)

* Retain the type of field while copying the default values.

* [Hivelink] Refactor support hive non string partition cols to rid of … (#110)

* [Hivelink] Refactor support hive non string partition cols to rid of Iceberg-oss code changes

* Release automation overhaul: Sonatype Nexus, Shipkit and GH Actions (#101)

* Add scm and developer info (#111)

* [Core] Fix and refactor schema parser (#112)

* [Core] Fix/Refactor SchemaParser to fix multiple bugs

* Enhance the UT for testing required fields with default values (#113)

* Enhance the UT for testing required fields with default values

* Addressed review comments

* Addressed review comment

* Support single type union for ORC-vectorization reader (#114)

* Support single type union for ORC-vectorization reader

* Support single type union for ORC-vectorization reader

Co-authored-by: Yiqiang Ding <[email protected]>

* Refactor HMS code upon cherry-pick

* Check for schema corruption and fix it on commit (#117)

* Check for schema corruption and fix it on commit

* ORC: Handle query where select and filter only uses default value col… (#118)

* ORC: Handle query where select and filter only use default value columns

* Set ORC columns and fix case-sensitivity issue with schema check (#119)

* Hive: Return null for currentSnapshot() (#121)

* Hive: Return null for currentSnapshot()

* Handle snapshots()

* Fix MergeHiveSchemaWithAvro to make it copy full Avro schema attributes (#120)

* Fix MergeHiveSchemaWithAvro to make it copy full Avro schema attributes

* Add logic to derive partition column id from partition.column.ids pro… (#122)

* Add logic to derive partition column id from partition.column.ids property

* Do not push down filter to ORC for union type schema (#123)

* Bug fix: MergeHiveSchemaWithAvro should retain avro properties for li… (#125)

* Bug fix: MergeHiveSchemaWithAvro should retain avro properties for list and map when they are nullable

* LinkedIn rebase draft

* Refactor hivelink 1

* Make hivelink module test all pass

* Make spark 2.4 module work

* Fix mr module

* Make spark 3.1 module work

* Fix TestSparkMetadataColumns

* Minor fix for spark 2.4

* Update default spark version to 3.1

* Update java ci to only run spark 2.4 and 3.1

* Minor fix HiveTableOperations

* Adapt github CI to 0.14.x branch

* Fix mr module checkstyle

* Fix checkstyle for orc module

* Fix spark2.4 checkstyle

* Refactor catalog loading logic using CatalogUtil

* Minor change to CI/release

Co-authored-by: Shardul Mahadik <[email protected]>
Co-authored-by: Ratandeep Ratti <[email protected]>
Co-authored-by: Shardul Mahadik <[email protected]>
Co-authored-by: Kuai Yu <[email protected]>
Co-authored-by: Walaa Eldin Moustafa <[email protected]>
Co-authored-by: Sushant Raikar <[email protected]>
Co-authored-by: ZihanLi58 <[email protected]>
Co-authored-by: Wenye Zhang <[email protected]>
Co-authored-by: Wenye Zhang <[email protected]>
Co-authored-by: Shenoda Guirguis <[email protected]>
Co-authored-by: Shenoda Guirguis <[email protected]>
Co-authored-by: Shenoda Guirguis <[email protected]>
Co-authored-by: Lei Sun <[email protected]>
Co-authored-by: Jiefan <[email protected]>
Co-authored-by: yiqiangin <[email protected]>
Co-authored-by: Malini Mahalakshmi Venkatachari <[email protected]>
Co-authored-by: Yiqiang Ding <[email protected]>
Co-authored-by: Yiqiang Ding <[email protected]>
Co-authored-by: Jack Moseley <[email protected]>

* Add flink 1.14 artifacts for release

Co-authored-by: Shardul Mahadik <[email protected]>
Co-authored-by: Ratandeep Ratti <[email protected]>
Co-authored-by: Shardul Mahadik <[email protected]>
Co-authored-by: Kuai Yu <[email protected]>
Co-authored-by: Walaa Eldin Moustafa <[email protected]>
Co-authored-by: Sushant Raikar <[email protected]>
Co-authored-by: ZihanLi58 <[email protected]>
Co-authored-by: Wenye Zhang <[email protected]>
Co-authored-by: Wenye Zhang <[email protected]>
Co-authored-by: Shenoda Guirguis <[email protected]>
Co-authored-by: Shenoda Guirguis <[email protected]>
Co-authored-by: Shenoda Guirguis <[email protected]>
Co-authored-by: Lei Sun <[email protected]>
Co-authored-by: Jiefan <[email protected]>
Co-authored-by: yiqiangin <[email protected]>
Co-authored-by: Malini Mahalakshmi Venkatachari <[email protected]>
Co-authored-by: Yiqiang Ding <[email protected]>
Co-authored-by: Yiqiang Ding <[email protected]>
Co-authored-by: Jack Moseley <[email protected]>
  • Loading branch information
20 people authored Dec 17, 2022
1 parent e2bb9ad commit 1a5b94e
Show file tree
Hide file tree
Showing 124 changed files with 13,979 additions and 367 deletions.
78 changes: 53 additions & 25 deletions .github/workflows/java-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ name: "Java CI"
on:
push:
branches:
- 'master'
- '0.**'
tags:
- 'apache-iceberg-**'
- 'li-1.0.x'
tags-ignore: [v*] # release tags are autogenerated after a successful CI, no need to run CI against them
pull_request:
branches:
- 'li-1.0.x'
paths-ignore:
- '.github/workflows/python-ci.yml'
- '.github/workflows/spark-ci.yml'
Expand Down Expand Up @@ -53,28 +53,28 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
jvm: [8, 11]
jvm: [ 8, 11 ]
env:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v3
- uses: actions/setup-java@v3
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- uses: actions/cache@v3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
restore-keys: ${{ runner.os }}-gradle
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew check -DsparkVersions= -DhiveVersions= -DflinkVersions= -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
name: test logs
path: |
**/build/testlogs
- uses: actions/checkout@v3
- uses: actions/setup-java@v3
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- uses: actions/cache@v3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
restore-keys: ${{ runner.os }}-gradle
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew check -DsparkVersions= -DhiveVersions= -DflinkVersions= -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
name: test logs
path: |
**/build/testlogs
build-checks:
runs-on: ubuntu-20.04
Expand All @@ -84,7 +84,7 @@ jobs:
with:
distribution: zulu
java-version: 8
- run: ./gradlew -DflinkVersions=1.13,1.14,1.15 -DsparkVersions=2.4,3.0,3.1,3.2,3.3 -DhiveVersions=2,3 build -x test -x javadoc -x integrationTest
- run: ./gradlew -DflinkVersions=1.13,1.14,1.15 -DsparkVersions=2.4,3.1 -DhiveVersions=2,3 build -x test -x javadoc -x integrationTest

build-javadoc:
runs-on: ubuntu-20.04
Expand All @@ -94,4 +94,32 @@ jobs:
with:
distribution: zulu
java-version: 8
- run: ./gradlew -Pquick=true javadoc
- run: ./gradlew -P=true javadoc

release:
if: ${{ github.event_name == 'push' }}
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v3
with:
fetch-depth: '0' # https://github.com/shipkit/shipkit-changelog#fetch-depth-on-ci
- uses: actions/setup-java@v3
with:
distribution: zulu
java-version: 8
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew build -DflinkVersions=1.14 -DsparkVersions=2.4,3.1 -DhiveVersions= -Pquick=true build -x javadoc
- name: Perform release
# Release job, only for pushes to the main development branch
if: ${{ github.event_name == 'push'
&& github.ref == 'refs/heads/li-1.0.x'
&& github.repository == 'linkedin/iceberg'
&& !contains(toJSON(github.event.commits.*.message), '[skip release]') }}

run: ./gradlew -DflinkVersions=1.14 -DsparkVersions=2.4,3.1 -DhiveVersions= githubRelease publishToSonatype closeAndReleaseStagingRepository
env:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
SONATYPE_USER: ${{secrets.SONATYPE_USER}}
SONATYPE_PWD: ${{secrets.SONATYPE_PWD}}
PGP_KEY: ${{secrets.PGP_KEY}}
PGP_PWD: ${{secrets.PGP_PWD}}
6 changes: 3 additions & 3 deletions .github/workflows/spark-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ name: "Spark CI"
on:
push:
branches:
- 'master'
- '0.**'
- 'li-1.0.x'
tags:
- 'apache-iceberg-**'
pull_request:
Expand Down Expand Up @@ -83,7 +82,7 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
spark: ['3.0', '3.1', '3.2', '3.3']
spark: ['3.1']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand All @@ -107,6 +106,7 @@ jobs:
**/build/testlogs
spark-3x-scala-2-13-tests:
if: ${{ false }}
runs-on: ubuntu-20.04
strategy:
matrix:
Expand Down
12 changes: 10 additions & 2 deletions api/src/main/java/org/apache/iceberg/types/PruneColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,19 @@ public Type struct(Types.StructType struct, List<Type> fieldResults) {
if (field.isOptional()) {
selectedFields.add(
Types.NestedField.optional(
field.fieldId(), field.name(), projectedType, field.doc()));
field.fieldId(),
field.name(),
projectedType,
field.getDefaultValue(),
field.doc()));
} else {
selectedFields.add(
Types.NestedField.required(
field.fieldId(), field.name(), projectedType, field.doc()));
field.fieldId(),
field.name(),
projectedType,
field.getDefaultValue(),
field.doc()));
}
}
}
Expand Down
117 changes: 106 additions & 11 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,42 +412,124 @@ public int hashCode() {

public static class NestedField implements Serializable {
public static NestedField optional(int id, String name, Type type) {
return new NestedField(true, id, name, type, null);
return new NestedField(true, id, name, type, null, null);
}

public static NestedField optional(int id, String name, Type type, String doc) {
return new NestedField(true, id, name, type, doc);
return new NestedField(true, id, name, type, null, doc);
}

public static NestedField optional(
int id, String name, Type type, Object defaultValue, String doc) {
return new NestedField(true, id, name, type, defaultValue, doc);
}

public static NestedField required(int id, String name, Type type) {
return new NestedField(false, id, name, type, null);
return new NestedField(false, id, name, type, null, null);
}

public static NestedField required(int id, String name, Type type, String doc) {
return new NestedField(false, id, name, type, doc);
return new NestedField(false, id, name, type, null, doc);
}

public static NestedField required(
int id, String name, Type type, Object defaultValue, String doc) {
return new NestedField(false, id, name, type, defaultValue, doc);
}

public static NestedField of(int id, boolean isOptional, String name, Type type) {
return new NestedField(isOptional, id, name, type, null);
return new NestedField(isOptional, id, name, type, null, null);
}

public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) {
return new NestedField(isOptional, id, name, type, doc);
return new NestedField(isOptional, id, name, type, null, doc);
}

public static NestedField of(
int id, boolean isOptional, String name, Type type, Object defaultValue, String doc) {
return new NestedField(isOptional, id, name, type, defaultValue, doc);
}

private static void validateDefaultValue(Object defaultValue, Type type) {
if (defaultValue == null) {
return;
}
switch (type.typeId()) {
case STRUCT:
Preconditions.checkArgument(
defaultValue instanceof Map,
"defaultValue should be a Map from fields names to values, for StructType");
Map<String, Object> defaultStruct = (Map<String, Object>) defaultValue;
if (defaultStruct.isEmpty()) {
return;
}
for (NestedField field : type.asStructType().fields()) {
validateDefaultValue(
defaultStruct.getOrDefault(field.name(), field.getDefaultValue()), field.type());
}
break;

case LIST:
Preconditions.checkArgument(
defaultValue instanceof List,
"defaultValue should be an List of Objects, for ListType");
List<Object> defaultList = (List<Object>) defaultValue;
if (defaultList.size() == 0) {
return;
}
defaultList.forEach(
dv -> NestedField.validateDefaultValue(dv, type.asListType().elementField.type));
break;

case MAP:
Preconditions.checkArgument(
defaultValue instanceof Map, "defaultValue should be an instance of Map for MapType");
Map<Object, Object> defaultMap = (Map<Object, Object>) defaultValue;
if (defaultMap.isEmpty()) {
return;
}
for (Map.Entry<Object, Object> e : defaultMap.entrySet()) {
NestedField.validateDefaultValue(e.getKey(), type.asMapType().keyField.type);
NestedField.validateDefaultValue(e.getValue(), type.asMapType().valueField.type);
}
break;

case FIXED:
case BINARY:
Preconditions.checkArgument(
defaultValue instanceof byte[],
"defaultValue should be an instance of byte[] for TypeId.%s, but defaultValue.class = %s",
type.typeId().name(),
defaultValue.getClass().getCanonicalName());
break;

default:
Preconditions.checkArgument(
type.typeId().javaClass().isInstance(defaultValue),
"defaultValue should be and instance of %s for TypeId.%s, but defaultValue.class = %s",
type.typeId().javaClass(),
type.typeId().name(),
defaultValue.getClass().getCanonicalName());
}
}

private final boolean isOptional;
private final int id;
private final String name;
private final Type type;
private final Object defaultValue;
private final String doc;

private NestedField(boolean isOptional, int id, String name, Type type, String doc) {
private NestedField(
boolean isOptional, int id, String name, Type type, Object defaultValue, String doc) {
Preconditions.checkNotNull(name, "Name cannot be null");
Preconditions.checkNotNull(type, "Type cannot be null");
validateDefaultValue(defaultValue, type);
this.isOptional = isOptional;
this.id = id;
this.name = name;
this.type = type;
this.defaultValue = defaultValue;
this.doc = doc;
}

Expand All @@ -459,7 +541,7 @@ public NestedField asOptional() {
if (isOptional) {
return this;
}
return new NestedField(true, id, name, type, doc);
return new NestedField(true, id, name, type, defaultValue, doc);
}

public boolean isRequired() {
Expand All @@ -470,7 +552,15 @@ public NestedField asRequired() {
if (!isOptional) {
return this;
}
return new NestedField(false, id, name, type, doc);
return new NestedField(false, id, name, type, defaultValue, doc);
}

public boolean hasDefaultValue() {
return defaultValue != null;
}

public Object getDefaultValue() {
return defaultValue;
}

public int fieldId() {
Expand All @@ -492,6 +582,7 @@ public String doc() {
@Override
public String toString() {
return String.format("%d: %s: %s %s", id, name, isOptional ? "optional" : "required", type)
+ (hasDefaultValue() ? ", default value: " + defaultValue + ", " : "")
+ (doc != null ? " (" + doc + ")" : "");
}

Expand All @@ -510,6 +601,9 @@ public boolean equals(Object o) {
return false;
} else if (!name.equals(that.name)) {
return false;
} else if (!Objects.equals(defaultValue, that.defaultValue)
&& !Arrays.equals((byte[]) defaultValue, (byte[]) that.defaultValue)) {
return false;
} else if (!Objects.equals(doc, that.doc)) {
return false;
}
Expand All @@ -518,7 +612,9 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(NestedField.class, id, isOptional, name, type);
return hasDefaultValue()
? Objects.hash(NestedField.class, id, isOptional, name, type, defaultValue)
: Objects.hash(NestedField.class, id, isOptional, name, type);
}
}

Expand Down Expand Up @@ -736,7 +832,6 @@ public boolean equals(Object o) {
} else if (!(o instanceof ListType)) {
return false;
}

ListType listType = (ListType) o;
return elementField.equals(listType.elementField);
}
Expand Down
Loading

0 comments on commit 1a5b94e

Please sign in to comment.