From 5cae5db4a9f04cffc2142ac5bb1db72b71b054d9 Mon Sep 17 00:00:00 2001 From: fphantam Date: Mon, 6 Nov 2023 10:44:39 +0800 Subject: [PATCH] support spark sql drop partition Signed-off-by: fphantam --- .../dmetasoul/lakesoul/meta/DBManager.java | 29 ++++- .../lakesoul/metadata/LakeSoulCatalog.java | 25 +++- .../lakesoul/test/flinkSource/DDLSuite.java | 34 ++++- .../lakesoul/meta/SparkMetaVersion.scala | 4 +- .../lakesoul/catalog/LakeSoulTableV2.scala | 79 +++++++++++- .../LakeSoulUnsupportedOperationsCheck.scala | 4 +- .../sql/lakesoul/NotSupportedDDLSuite.scala | 5 - .../sql/lakesoul/SQLDropPartitionSuite.scala | 122 ++++++++++++++++++ rust/lakesoul-metadata/src/lib.rs | 2 +- 9 files changed, 281 insertions(+), 23 deletions(-) create mode 100644 lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SQLDropPartitionSuite.scala diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java index d42bb88d4..c2e9385e7 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java @@ -193,6 +193,22 @@ public long getLastedVersionTimestampUptoTime(String tableId, String partitionDe return partitionInfoDao.getLastedVersionTimestampUptoTime(tableId, partitionDesc, utcMills); } + public List deleteMetaPartitionInfo(String tableId, String partitionDesc) { + List fileOps = new ArrayList<>(); + List deleteFilePathList = new ArrayList<>(); + deleteSinglePartitionMetaInfo(tableId, partitionDesc, fileOps, deleteFilePathList); + return deleteFilePathList; + } + + public void deleteSinglePartitionMetaInfo(String tableId, String partitionDesc, + List fileOps, List deleteFilePathList) { + List singlePartitionAllVersionList = getOnePartitionVersions(tableId, partitionDesc); + Set snapshotList = new HashSet<>(); + getSnapshotAndFilePathInfo(tableId, partitionDesc, fileOps, deleteFilePathList, singlePartitionAllVersionList, snapshotList); + partitionInfoDao.deleteByTableIdAndPartitionDesc(tableId, partitionDesc); + dataCommitInfoDao.deleteByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList.stream().collect(Collectors.toList())); + } + public List getDeleteFilePath(String tableId, String partitionDesc, long utcMills) { List fileOps = new ArrayList<>(); @@ -210,14 +226,19 @@ public List getDeleteFilePath(String tableId, String partitionDesc, long public void deleteSinglePartitionMetaInfo(String tableId, String partitionDesc, long utcMills, List fileOps, List deleteFilePathList) { List filterPartitionInfo = getFilterPartitionInfo(tableId, partitionDesc, utcMills); - List snapshotList = new ArrayList<>(); + Set snapshotList = new HashSet<>(); + getSnapshotAndFilePathInfo(tableId, partitionDesc, fileOps, deleteFilePathList, filterPartitionInfo, snapshotList); + partitionInfoDao.deletePreviousVersionPartition(tableId, partitionDesc, utcMills); + dataCommitInfoDao.deleteByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList.stream().collect(Collectors.toList())); + } + + private void getSnapshotAndFilePathInfo(String tableId, String partitionDesc, List fileOps, List deleteFilePathList, + List filterPartitionInfo, Set snapshotList) { filterPartitionInfo.forEach(p -> snapshotList.addAll(p.getSnapshotList())); List filterDataCommitInfo = - dataCommitInfoDao.selectByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList); + dataCommitInfoDao.selectByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList.stream().collect(Collectors.toList())); filterDataCommitInfo.forEach(dataCommitInfo -> fileOps.addAll(dataCommitInfo.getFileOpsList())); fileOps.forEach(fileOp -> deleteFilePathList.add(fileOp.getPath())); - partitionInfoDao.deletePreviousVersionPartition(tableId, partitionDesc, utcMills); - dataCommitInfoDao.deleteByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList); } public List getFilterPartitionInfo(String tableId, String partitionDesc, long utcMills) { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java index 402dfc7cb..17216fefe 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java @@ -332,13 +332,9 @@ public List listPartitions(ObjectPath tablePath, CatalogPa } TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(tablePath.getObjectName(), tablePath.getDatabaseName()); - List allPartitionInfo = dbManager.getAllPartitionInfo(tableInfo.getTableId()); - HashSet partitions = new HashSet<>(100); - for (PartitionInfo pif : allPartitionInfo) { - partitions.add(pif.getPartitionDesc()); - } + List tableAllPartitionDesc = dbManager.getTableAllPartitionDesc(tableInfo.getTableId()); ArrayList al = new ArrayList<>(100); - for (String item : partitions) { + for (String item : tableAllPartitionDesc) { if (null == item || "".equals(item)) { throw new CatalogException("partition not exist"); } else { @@ -421,7 +417,22 @@ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPa @Override public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, boolean ignoreIfExists) throws CatalogException { - throw new CatalogException("not supported now"); + + TableInfo tableInfo = + dbManager.getTableInfoByNameAndNamespace(tablePath.getObjectName(), tablePath.getDatabaseName()); + if (tableInfo == null) { + throw new CatalogException(tablePath + " does not exist"); + } + String partitionDesc = DBUtil.formatPartitionDesc(catalogPartitionSpec.getPartitionSpec()); + List deleteFilePath = dbManager.deleteMetaPartitionInfo(tableInfo.getTableId(), partitionDesc); + deleteFilePath.forEach(filePath -> { + Path path = new Path(filePath); + try { + path.getFileSystem().delete(path, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } @Override diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DDLSuite.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DDLSuite.java index 40cd339d7..27d3828c9 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DDLSuite.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DDLSuite.java @@ -36,6 +36,23 @@ public void dropView() throws ExecutionException, InterruptedException { tEnv.executeSql("SHOW VIEWS"); } + @Test + public void dropTablePartition() throws ExecutionException, InterruptedException { + TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); + createDropPartitionTable(tEnv); + tEnv.executeSql( + "INSERT INTO user_info VALUES" + + "(1, 'Bob', 90, TO_DATE('1995-10-01'))," + + "(2, 'Alice', 80, TO_DATE('1995-10-01')), " + +// "(3, 'Jack', 75, TO_DATE('1995-10-15'))," + +// "(3, 'Amy', 95, TO_DATE('1995-10-10')), " + +// "(5, 'Tom', 75, TO_DATE('1995-10-01'))," + + "(4, 'Mike', 70, TO_DATE('1995-10-02'))") + .await(); + tEnv.executeSql("select * from user_info").print(); + tEnv.executeSql("alter table user_info drop partition `date`='1995-10-01'"); + } + @Test public void alterTableNotSupported() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); @@ -98,10 +115,25 @@ private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws Execut tEnvs.executeSql(createUserSql); } - private void createLakeSoulSourceTableViewUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { String createUserSql = "create view if not exists user_info_view as select * from user_info"; tEnvs.executeSql("DROP view if exists user_info_view"); tEnvs.executeSql(createUserSql); } + + private void createDropPartitionTable(TableEnvironment tEnvs) { + String createUserSql = "create table user_info (" + + " order_id INT PRIMARY KEY NOT ENFORCED, " + + " name STRING, " + + " score FLOAT, " + + " `date` DATE" + + ") PARTITIONED BY (`date`) WITH (" + + " 'format'='lakesoul'," + + " 'hashBucketNum'='2'," + +// " 'path'='" + getTempDirUri("/lakeSource/user") + + " 'path'='" + getTempDirUri("/Users/dudongfeng/work/zehy/lakesoul") + + "' )"; + tEnvs.executeSql("DROP TABLE if exists user_info"); + tEnvs.executeSql(createUserSql); + } } diff --git a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala index 939640dbd..22e7f27e7 100644 --- a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala +++ b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala @@ -235,8 +235,8 @@ object SparkMetaVersion { dbManager.deletePartitionInfoByTableId(table_id) } - def dropPartitionInfoByRangeId(table_id: String, range_value: String): Unit = { - dbManager.deletePartitionInfoByTableAndPartition(table_id, range_value) + def dropPartitionInfoByRangeId(table_id: String, range_value: String): List[String] = { + dbManager.deleteMetaPartitionInfo(table_id, range_value).asScala.toList } def deleteShortTableName(short_table_name: String, table_name: String): Unit = { diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulTableV2.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulTableV2.scala index a420f5c49..baddeef5b 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulTableV2.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulTableV2.scala @@ -4,8 +4,12 @@ package org.apache.spark.sql.lakesoul.catalog +import com.dmetasoul.lakesoul.meta.{PartitionInfoScala, SparkMetaVersion} import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} @@ -20,6 +24,8 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession} +import java.util +import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable @@ -29,7 +35,7 @@ case class LakeSoulTableV2(spark: SparkSession, tableIdentifier: Option[String] = None, userDefinedFileIndex: Option[LakeSoulFileIndexV2] = None, var mergeOperatorInfo: Option[Map[String, String]] = None) - extends Table with SupportsWrite with SupportsRead { + extends Table with SupportsWrite with SupportsRead with SupportsPartitionManagement { val path: Path = SparkUtil.makeQualifiedTablePath(path_orig) @@ -65,6 +71,9 @@ case class LakeSoulTableV2(spark: SparkSession, private lazy val snapshot: Snapshot = snapshotManagement.snapshot + private val mapTablePartitionSpec: util.Map[InternalRow, PartitionInfoScala] = + new ConcurrentHashMap[InternalRow, PartitionInfoScala]() + override def schema(): StructType = StructType(snapshot.getTableInfo.data_schema ++ snapshot.getTableInfo.range_partition_schema) @@ -141,6 +150,74 @@ case class LakeSoulTableV2(spark: SparkSession, SparkUtil.createRelation(partitionPredicates, snapshotManagement, spark) } + override def partitionSchema(): StructType = { + snapshot.getTableInfo.range_partition_schema + } + + override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = { + throw new UnsupportedOperationException( + "Cannot create partition: partitions are created implicitly when inserting new rows into iceberg tables") + } + + override def dropPartition(ident: InternalRow): Boolean = { + if (mapTablePartitionSpec.containsKey(ident)) { + val partitionInfoScala = mapTablePartitionSpec.get(ident) + val deleteFilePaths = SparkMetaVersion.dropPartitionInfoByRangeId(partitionInfoScala.table_id, partitionInfoScala.range_value) + if (null != deleteFilePaths && deleteFilePaths.length > 0) { + val sessionHadoopConf = SparkSession.active.sessionState.newHadoopConf() + val fs = path.getFileSystem(sessionHadoopConf) + for (item <- deleteFilePaths) { + fs.delete(new Path(item), true) + } + } + mapTablePartitionSpec.remove(ident) + true + } else { + false + } + } + + override def replacePartitionMetadata(ident: InternalRow, properties: util.Map[String, String]): Unit = { + throw new UnsupportedOperationException( + "Cannot replace partition metadata: iceberg partitions do not support metadata") + } + + override def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = { + throw new UnsupportedOperationException( + "Cannot load partition metadata: iceberg partitions do not support metadata") + } + + override def listPartitionIdentifiers(names: Array[String], ident: InternalRow): Array[InternalRow] = { + assert(names.length == ident.numFields, + s"Number of partition names (${names.length}) must be equal to " + + s"the number of partition values (${ident.numFields}).") + val schema = partitionSchema + assert(names.forall(fieldName => schema.fieldNames.contains(fieldName)), + s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to " + + s"the partition schema '${schema.sql}'.") + val indexes = names.map(schema.fieldIndex) + val dataTypes = names.map(schema(_).dataType) + val currentRow = new GenericInternalRow(new Array[Any](names.length)) + val partitionInfoArray = snapshot.getPartitionInfoArray + + partitionInfoArray.foreach(partition => { + val range_value = partition.range_value + val map = range_value.split(",").map(p => { + val strings = p.split("=") + strings(0) -> strings(1) + }).toMap + val row = ResolvePartitionSpec.convertToPartIdent(map, schema) + mapTablePartitionSpec.put(row, partition) + }) + + val ss = mapTablePartitionSpec.keySet().asScala.filter { key => + for (i <- 0 until names.length) { + currentRow.values(i) = key.get(indexes(i), dataTypes(i)) + } + currentRow == ident + }.toArray + ss + } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulUnsupportedOperationsCheck.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulUnsupportedOperationsCheck.scala index 269ef3286..8bcb8b9b7 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulUnsupportedOperationsCheck.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulUnsupportedOperationsCheck.scala @@ -43,8 +43,8 @@ case class LakeSoulUnsupportedOperationsCheck(spark: SparkSession) case a: AlterTableAddPartitionCommand => fail(operation = "ALTER TABLE ADD PARTITION", a.tableName) - case a: AlterTableDropPartitionCommand => - fail(operation = "ALTER TABLE DROP PARTITION", a.tableName) +// case a: AlterTableDropPartitionCommand => +// fail(operation = "ALTER TABLE DROP PARTITION", a.tableName) case a: AlterTableSerDePropertiesCommand => fail(operation = "ALTER TABLE table SET SERDEPROPERTIES", a.tableName) diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/NotSupportedDDLSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/NotSupportedDDLSuite.scala index ecb132d8d..a4d9fd493 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/NotSupportedDDLSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/NotSupportedDDLSuite.scala @@ -122,11 +122,6 @@ abstract class NotSupportedDDLBase extends QueryTest "can not alter partitions") } - test("ALTER TABLE DROP PARTITION") { - assertUnsupported(s"ALTER TABLE $partitionedTableName DROP PARTITION (p1=2)", - "can not alter partitions") - } - test("ALTER TABLE RECOVER PARTITIONS") { assertUnsupported(s"ALTER TABLE $partitionedTableName RECOVER PARTITIONS") assertUnsupported(s"MSCK REPAIR TABLE $partitionedTableName") diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SQLDropPartitionSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SQLDropPartitionSuite.scala new file mode 100644 index 000000000..3a7b807a8 --- /dev/null +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SQLDropPartitionSuite.scala @@ -0,0 +1,122 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package org.apache.spark.sql.lakesoul + +import com.dmetasoul.lakesoul.tables.LakeSoulTable +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.lakesoul.sources.LakeSoulSourceUtils +import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils} +import org.apache.spark.sql.test.SharedSparkSession +import org.junit.runner.RunWith +import org.scalatestplus.junit.JUnitRunner + +import scala.util.control.NonFatal + +@RunWith(classOf[JUnitRunner]) +class SQLDropPartitionSuite extends SQLDropPartitionBase with LakeSoulSQLCommandTest + +abstract class SQLDropPartitionBase extends QueryTest + with SharedSparkSession + with LakeSoulTestUtils { + + import testImplicits._ + + val format = "lakesoul" + + val singlePartitionedTableName = "singlePartitionedLakeSoulTbl" + + val multiPartitionedTableName = "multiPartitionedLakeSoulTbl" + + protected override def beforeEach(): Unit = { + super.beforeEach() + try { + println(SQLConf.get.getConf(SQLConf.DEFAULT_CATALOG)) + sql( + s""" + |CREATE TABLE $singlePartitionedTableName (a INT, b STRING, p INT) + |USING $format + |PARTITIONED BY (p) + """.stripMargin) + + sql(s"INSERT INTO $singlePartitionedTableName values (1, 'A', 1), (2, 'B', 2)") + + sql( + s""" + |CREATE TABLE $multiPartitionedTableName (a INT, b STRING, p1 INT, p2 INT) + |USING $format + |PARTITIONED BY (p1, p2) + """.stripMargin) + + sql(s"INSERT INTO $multiPartitionedTableName values (1, 'A', 1, 1), (2, 'B', 1, 2)") + } catch { + case NonFatal(e) => + afterAll() + throw e + } + } + + protected override def afterEach(): Unit = { + try { + val location = Seq(singlePartitionedTableName, multiPartitionedTableName).map(tbl => { + try { + LakeSoulSourceUtils.getLakeSoulPathByTableIdentifier( + TableIdentifier(tbl, Some("default")) + ) + } catch { + case _: Exception => None + } + }) + + location.foreach(loc => { + if (loc.isDefined) { + try { + LakeSoulTable.forPath(loc.get).dropTable() + } catch { + case e: Exception => { + println(e.getMessage) + } + } + } + }) + } finally { + super.afterEach() + } + } + + protected def verifyTable(tableName: String, expected: DataFrame, colNames: Seq[String]): Unit = { + checkAnswer(spark.table(tableName).select(colNames.map(col): _*), expected) + waitForTasksToFinish() + } + + test("ALTER TABLE DROP SINGLE PARTITION") { + sql(s"ALTER TABLE $singlePartitionedTableName DROP PARTITION (p=1)") + val df = Seq((2, "B", 2)).toDF("a", "b", "p") + verifyTable(singlePartitionedTableName, df, Seq("a", "b", "p")) + } + + test("ALTER TABLE DROP SINGLE PARTITION WHEN PARTITION NAME NOT EXISTS") { + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $singlePartitionedTableName DROP PARTITION (p=3)") + } + assert(e.getMessage.contains("The following partitions not found")) + } + + test("ALTER TABLE DROP MUlTI-LEVEL PARTITION") { + sql(s"ALTER TABLE $multiPartitionedTableName DROP PARTITION (p1=1, p2=1)") + val df = Seq((2, "B", 1, 2)).toDF("a", "b", "p1", "p2") + verifyTable(multiPartitionedTableName, df, Seq("a", "b", "p1", "p2")) + } + + test("ALTER TABLE DROP SINGLE-LEVEL PARTITION WHEN TABLE WITH SINGLE-LEVEL PARTITION") { + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $multiPartitionedTableName DROP PARTITION (p1=1)") + } + assert(e.getMessage.contains("Partition spec is invalid")) + assert(e.getMessage.contains("The spec (p1) must match the partition spec (p1, p2)")) + } +} diff --git a/rust/lakesoul-metadata/src/lib.rs b/rust/lakesoul-metadata/src/lib.rs index 4696a33b9..1937bfd05 100644 --- a/rust/lakesoul-metadata/src/lib.rs +++ b/rust/lakesoul-metadata/src/lib.rs @@ -1214,7 +1214,7 @@ pub fn execute_update( let statement = format!( "delete from data_commit_info - where table_id = $1::TEXT and partition_desc = $2::TEXT and commit_id in commit_id in ({}) ", uuid_str_list); + where table_id = $1::TEXT and partition_desc = $2::TEXT and commit_id in ({}) ", uuid_str_list); runtime.block_on(async{ let statement = client.prepare(&statement).await?;