Skip to content

Commit

Permalink
support iceberg sql extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
melin committed Aug 30, 2024
1 parent e26e83a commit 9846212
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.1.0-jre</version>
<version>33.2.0-jre</version>
<scope>provided</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,17 @@ enum class AlterActionType : Serializable {
ATTACH_PARTITION,
TRUNCATE_PARTITION,
REFRESH_MV,

// Iceberg SQL Extensions
CREATE_TAG,
CREATE_BRANCH,
DROP_TAG,
DROP_BRANCH,
ADD_PARTITION_FIELD,
DROP_PARTITION_FIELD,
REPLACE_PARTITION_FIELD,
SET_WRITE_DISTRIBUTION_AND_ORDERING,
SET_IDENTIFIER_FIELDS,
DROP_IDENTIFIER_FIELDS,
UNKOWN
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.github.melin.superior.common.relational.alter

import io.github.melin.superior.common.AlterActionType

data class AlterCreateTagAction(val tagName: String) : AlterAction() {
override var alterType: AlterActionType = AlterActionType.CREATE_TAG
}

data class AlterDropTagAction(val tagName: String) : AlterAction() {
override var alterType: AlterActionType = AlterActionType.DROP_TAG
}

data class AlterCreateBranchAction(val branchName: String) : AlterAction() {
override var alterType: AlterActionType = AlterActionType.CREATE_BRANCH
}

data class AlterDropBranchAction(val branchName: String) : AlterAction() {
override var alterType: AlterActionType = AlterActionType.DROP_BRANCH
}

data class AlterSetIdentifierFieldsAction(val fields: List<String>) : AlterAction() {
override var alterType: AlterActionType = AlterActionType.SET_IDENTIFIER_FIELDS
}

data class AlterDropIdentifierFieldsAction(val fields: List<String>) : AlterAction() {
override var alterType: AlterActionType = AlterActionType.DROP_IDENTIFIER_FIELDS
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ BIGINT: 'BIGINT';
BINARY: 'BINARY';
BOOLEAN: 'BOOLEAN';
BOTH: 'BOTH';
BRANCH: 'BRANCH';
BUCKET: 'BUCKET';
BUCKETS: 'BUCKETS';
BY: 'BY';
Expand Down Expand Up @@ -178,6 +179,7 @@ DIRECTORY: 'DIRECTORY';
DISTCP: 'DISTCP';
DISTINCT: 'DISTINCT';
DISTRIBUTE: 'DISTRIBUTE';
DISTRIBUTED: 'DISTRIBUTED';
DIV: 'DIV';
DOUBLE: 'DOUBLE';
DROP: 'DROP';
Expand All @@ -196,6 +198,7 @@ EXTERNAL: 'EXTERNAL';
EXTRACT: 'EXTRACT';
FALSE: 'FALSE';
FETCH: 'FETCH';
FIELD: 'FIELD';
FIELDS: 'FIELDS';
FILTER: 'FILTER';
FILE: 'FILE';
Expand Down Expand Up @@ -260,6 +263,7 @@ LOCK: 'LOCK';
LOCKS: 'LOCKS';
LOGICAL: 'LOGICAL';
LONG: 'LONG';
LOCALLY: 'LOCALLY';
MACRO: 'MACRO';
MAP: 'MAP';
MATCHED: 'MATCHED';
Expand Down Expand Up @@ -293,6 +297,7 @@ OPTION: 'OPTION';
OPTIONS: 'OPTIONS';
OR: 'OR';
ORDER: 'ORDER';
ORDERED: 'ORDERED';
OUT: 'OUT';
OUTER: 'OUTER';
OUTPUTFORMAT: 'OUTPUTFORMAT';
Expand Down Expand Up @@ -332,6 +337,8 @@ REPLACE: 'REPLACE';
RESET: 'RESET';
RESPECT: 'RESPECT';
RESTRICT: 'RESTRICT';
RETAIN: 'RETAIN';
RETENTION: 'RETENTION';
REVOKE: 'REVOKE';
RIGHT: 'RIGHT';
RLIKE: 'RLIKE' | 'REGEXP';
Expand All @@ -341,6 +348,8 @@ ROLLBACK: 'ROLLBACK';
ROLLUP: 'ROLLUP';
ROW: 'ROW';
ROWS: 'ROWS';
SNAPSHOT: 'SNAPSHOT';
SNAPSHOTS: 'SNAPSHOTS';
SECOND: 'SECOND';
SECONDS: 'SECONDS';
SCHEMA: 'SCHEMA';
Expand Down Expand Up @@ -379,6 +388,7 @@ SYSTEM_VERSION: 'SYSTEM_VERSION';
TABLE: 'TABLE';
TABLES: 'TABLES';
TABLESAMPLE: 'TABLESAMPLE';
TAG: 'TAG';
TARGET: 'TARGET';
TBLPROPERTIES: 'TBLPROPERTIES';
TEMPORARY: 'TEMPORARY' | 'TEMP';
Expand Down Expand Up @@ -412,6 +422,7 @@ UNLOCK: 'UNLOCK';
UNPIVOT: 'UNPIVOT';
UNSET: 'UNSET';
UPDATE: 'UPDATE';
UNORDERED: 'UNORDERED';
USE: 'USE';
USER: 'USER';
USING: 'USING';
Expand All @@ -428,6 +439,7 @@ WHERE: 'WHERE';
WINDOW: 'WINDOW';
WITH: 'WITH';
WITHIN: 'WITHIN';
WRITE: 'WRITE';
YEAR: 'YEAR';
YEARS: 'YEARS';
ZONE: 'ZONE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,18 @@ statement
| SYNC dtType=(DATABASE|TABLE) FROM source=multipartIdentifier
(SET OWNER principal=identifier)? #syncTableMeta

// iceberg sql extensions
| ALTER TABLE multipartIdentifier ADD PARTITION FIELD transform (AS name=identifier)? #addPartitionField
| ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField
| ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH transform (AS name=identifier)? #replacePartitionField
| ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering
| ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields
| ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields
| ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier #dropBranch
| ALTER TABLE multipartIdentifier DROP TAG (IF EXISTS)? identifier #dropTag

| unsupportedHiveNativeCommands .*? #failNativeCommand
;

Expand Down Expand Up @@ -335,6 +347,78 @@ exportTableClauses
(SINGLE single = booleanValue))*
;

createReplaceTagClause
: (CREATE OR)? REPLACE TAG identifier tagOptions
| CREATE TAG (IF NOT EXISTS)? identifier tagOptions
;

createReplaceBranchClause
: (CREATE OR)? REPLACE BRANCH identifier branchOptions
| CREATE BRANCH (IF NOT EXISTS)? identifier branchOptions
;

tagOptions
: (AS OF VERSION snapshotId)? (refRetain)?
;

branchOptions
: (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)?
;

snapshotId
: number
;

timeUnit
: DAYS
| HOURS
| MINUTES
;

snapshotRetention
: WITH SNAPSHOT RETENTION minSnapshotsToKeep
| WITH SNAPSHOT RETENTION maxSnapshotAge
| WITH SNAPSHOT RETENTION minSnapshotsToKeep maxSnapshotAge
;

refRetain
: RETAIN number timeUnit
;

maxSnapshotAge
: number timeUnit
;

minSnapshotsToKeep
: number SNAPSHOTS
;

writeSpec
: (writeDistributionSpec | writeOrderingSpec)*
;

writeDistributionSpec
: DISTRIBUTED BY PARTITION
;

writeOrderingSpec
: LOCALLY? ORDERED BY orderExpr
| UNORDERED
;

orderExpr
: fields+=orderField (',' fields+=orderField)*
| '(' fields+=orderField (',' fields+=orderField)* ')'
;

orderField
: transform direction=(ASC | DESC)? (NULLS nullOrder=(FIRST | LAST))?
;

fieldList
: fields+=multipartIdentifier (',' fields+=multipartIdentifier)*
;

unsupportedHiveNativeCommands
: kw1=CREATE kw2=ROLE
| kw1=DROP kw2=ROLE
Expand Down Expand Up @@ -1454,6 +1538,7 @@ ansiNonReserved
| EXTENDED
| EXTERNAL
| EXTRACT
| FIELD
| FIELDS
| FILEFORMAT
| FIRST
Expand Down Expand Up @@ -1700,6 +1785,7 @@ nonReserved
| BINARY_HEX
| BOOLEAN
| BOTH
| BRANCH
| BUCKET
| BUCKETS
| BY
Expand Down Expand Up @@ -1762,6 +1848,7 @@ nonReserved
| DIRECTORY
| DISTINCT
| DISTRIBUTE
| DISTRIBUTED
| DIV
| DOUBLE
| DROP
Expand All @@ -1780,6 +1867,7 @@ nonReserved
| FALSE
| FETCH
| FILTER
| FIELD
| FIELDS
| FILEFORMAT
| FIRST
Expand Down Expand Up @@ -1834,6 +1922,7 @@ nonReserved
| LOCKS
| LOGICAL
| LONG
| LOCALLY
| MACRO
| MAP
| MATCHED
Expand Down Expand Up @@ -1864,6 +1953,7 @@ nonReserved
| OPTIONS
| OR
| ORDER
| ORDERED
| OUT
| OUTER
| OUTPUTFORMAT
Expand Down Expand Up @@ -1902,6 +1992,8 @@ nonReserved
| RESET
| RESPECT
| RESTRICT
| RETAIN
| RETENTION
| REVOKE
| RLIKE
| ROLE
Expand All @@ -1910,6 +2002,8 @@ nonReserved
| ROLLUP
| ROW
| ROWS
| SNAPSHOT
| SNAPSHOTS
| SCHEMA
| SCHEMAS
| SECOND
Expand Down Expand Up @@ -1943,6 +2037,7 @@ nonReserved
| TABLE
| TABLES
| TABLESAMPLE
| TAG
| TARGET
| TBLPROPERTIES
| TEMPORARY
Expand Down Expand Up @@ -1977,6 +2072,7 @@ nonReserved
| UPDATE
| USE
| USER
| UNORDERED
| VALUES
| VARCHAR
| VERSION
Expand All @@ -1990,6 +2086,7 @@ nonReserved
| WINDOW
| WITH
| WITHIN
| WRITE
| YEAR
| YEARS
| ZONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,69 @@ class SparkSqlAntlr4Visitor(val splitSql: Boolean = false, val command: String?)
val functionId = parseTableName(ctx.identifierReference())
return DropFunction(FunctionId(functionId.schemaName, functionId.tableName))
}

// -----------------------------------iceberg sql start -------------------------------------

override fun visitAddPartitionField(ctx: SparkSqlParser.AddPartitionFieldContext): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
return AlterTable(tableId, AlterTableAction(ADD_PARTITION_FIELD))
}

override fun visitDropPartitionField(ctx: SparkSqlParser.DropPartitionFieldContext): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
return AlterTable(tableId, AlterTableAction(DROP_PARTITION_FIELD))
}

override fun visitReplacePartitionField(ctx: SparkSqlParser.ReplacePartitionFieldContext): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
return AlterTable(tableId, AlterTableAction(REPLACE_PARTITION_FIELD))
}

override fun visitSetWriteDistributionAndOrdering(
ctx: SparkSqlParser.SetWriteDistributionAndOrderingContext
): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
return AlterTable(tableId, AlterTableAction(SET_WRITE_DISTRIBUTION_AND_ORDERING))
}

override fun visitSetIdentifierFields(ctx: SparkSqlParser.SetIdentifierFieldsContext): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
val fields = ctx.fieldList().fields.map { field -> field.text }.toList()
return AlterTable(tableId, AlterSetIdentifierFieldsAction(fields))
}

override fun visitDropIdentifierFields(ctx: SparkSqlParser.DropIdentifierFieldsContext): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
val fields = ctx.fieldList().fields.map { field -> field.text }.toList()
return AlterTable(tableId, AlterDropIdentifierFieldsAction(fields))
}

override fun visitCreateOrReplaceTag(ctx: SparkSqlParser.CreateOrReplaceTagContext): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
val tagName = CommonUtils.cleanQuote(ctx.createReplaceTagClause().identifier().text)
return AlterTable(tableId, AlterCreateTagAction(tagName))
}

override fun visitDropTag(ctx: SparkSqlParser.DropTagContext): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
val tagName = CommonUtils.cleanQuote(ctx.identifier().text)
return AlterTable(tableId, AlterDropTagAction(tagName))
}

override fun visitCreateOrReplaceBranch(ctx: SparkSqlParser.CreateOrReplaceBranchContext): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
val branchName = CommonUtils.cleanQuote(ctx.createReplaceBranchClause().identifier().text)
return AlterTable(tableId, AlterCreateBranchAction(branchName))
}

override fun visitDropBranch(ctx: SparkSqlParser.DropBranchContext): Statement {
val tableId = parseTableName(ctx.multipartIdentifier())
val branchName = CommonUtils.cleanQuote(ctx.identifier().text)
return AlterTable(tableId, AlterDropBranchAction(branchName))
}

// -----------------------------------iceberg sql end -------------------------------------

// -----------------------------------cache-------------------------------------------------

override fun visitCacheTable(ctx: SparkSqlParser.CacheTableContext): Statement {
Expand Down
Loading

0 comments on commit 9846212

Please sign in to comment.