Skip to content

Commit

Permalink
support spark sql drop partition
Browse files Browse the repository at this point in the history
Signed-off-by: fphantam <[email protected]>
  • Loading branch information
F-PHantam committed Nov 6, 2023
1 parent e299795 commit 5cae5db
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ public long getLastedVersionTimestampUptoTime(String tableId, String partitionDe
return partitionInfoDao.getLastedVersionTimestampUptoTime(tableId, partitionDesc, utcMills);
}

public List<String> deleteMetaPartitionInfo(String tableId, String partitionDesc) {
List<DataFileOp> fileOps = new ArrayList<>();
List<String> deleteFilePathList = new ArrayList<>();
deleteSinglePartitionMetaInfo(tableId, partitionDesc, fileOps, deleteFilePathList);
return deleteFilePathList;
}

public void deleteSinglePartitionMetaInfo(String tableId, String partitionDesc,
List<DataFileOp> fileOps, List<String> deleteFilePathList) {
List<PartitionInfo> singlePartitionAllVersionList = getOnePartitionVersions(tableId, partitionDesc);
Set<Uuid> snapshotList = new HashSet<>();
getSnapshotAndFilePathInfo(tableId, partitionDesc, fileOps, deleteFilePathList, singlePartitionAllVersionList, snapshotList);
partitionInfoDao.deleteByTableIdAndPartitionDesc(tableId, partitionDesc);
dataCommitInfoDao.deleteByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList.stream().collect(Collectors.toList()));
}


public List<String> getDeleteFilePath(String tableId, String partitionDesc, long utcMills) {
List<DataFileOp> fileOps = new ArrayList<>();
Expand All @@ -210,14 +226,19 @@ public List<String> getDeleteFilePath(String tableId, String partitionDesc, long
public void deleteSinglePartitionMetaInfo(String tableId, String partitionDesc, long utcMills,
List<DataFileOp> fileOps, List<String> deleteFilePathList) {
List<PartitionInfo> filterPartitionInfo = getFilterPartitionInfo(tableId, partitionDesc, utcMills);
List<Uuid> snapshotList = new ArrayList<>();
Set<Uuid> snapshotList = new HashSet<>();
getSnapshotAndFilePathInfo(tableId, partitionDesc, fileOps, deleteFilePathList, filterPartitionInfo, snapshotList);
partitionInfoDao.deletePreviousVersionPartition(tableId, partitionDesc, utcMills);
dataCommitInfoDao.deleteByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList.stream().collect(Collectors.toList()));
}

private void getSnapshotAndFilePathInfo(String tableId, String partitionDesc, List<DataFileOp> fileOps, List<String> deleteFilePathList,
List<PartitionInfo> filterPartitionInfo, Set<Uuid> snapshotList) {
filterPartitionInfo.forEach(p -> snapshotList.addAll(p.getSnapshotList()));
List<DataCommitInfo> filterDataCommitInfo =
dataCommitInfoDao.selectByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList);
dataCommitInfoDao.selectByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList.stream().collect(Collectors.toList()));
filterDataCommitInfo.forEach(dataCommitInfo -> fileOps.addAll(dataCommitInfo.getFileOpsList()));
fileOps.forEach(fileOp -> deleteFilePathList.add(fileOp.getPath()));
partitionInfoDao.deletePreviousVersionPartition(tableId, partitionDesc, utcMills);
dataCommitInfoDao.deleteByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList);
}

public List<PartitionInfo> getFilterPartitionInfo(String tableId, String partitionDesc, long utcMills) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,9 @@ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPa
}
TableInfo tableInfo =
dbManager.getTableInfoByNameAndNamespace(tablePath.getObjectName(), tablePath.getDatabaseName());
List<PartitionInfo> allPartitionInfo = dbManager.getAllPartitionInfo(tableInfo.getTableId());
HashSet<String> partitions = new HashSet<>(100);
for (PartitionInfo pif : allPartitionInfo) {
partitions.add(pif.getPartitionDesc());
}
List<String> tableAllPartitionDesc = dbManager.getTableAllPartitionDesc(tableInfo.getTableId());
ArrayList<CatalogPartitionSpec> al = new ArrayList<>(100);
for (String item : partitions) {
for (String item : tableAllPartitionDesc) {
if (null == item || "".equals(item)) {
throw new CatalogException("partition not exist");
} else {
Expand Down Expand Up @@ -421,7 +417,22 @@ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPa
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, boolean ignoreIfExists)
throws CatalogException {
throw new CatalogException("not supported now");

TableInfo tableInfo =
dbManager.getTableInfoByNameAndNamespace(tablePath.getObjectName(), tablePath.getDatabaseName());
if (tableInfo == null) {
throw new CatalogException(tablePath + " does not exist");
}
String partitionDesc = DBUtil.formatPartitionDesc(catalogPartitionSpec.getPartitionSpec());
List<String> deleteFilePath = dbManager.deleteMetaPartitionInfo(tableInfo.getTableId(), partitionDesc);
deleteFilePath.forEach(filePath -> {
Path path = new Path(filePath);
try {
path.getFileSystem().delete(path, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ public void dropView() throws ExecutionException, InterruptedException {
tEnv.executeSql("SHOW VIEWS");
}

@Test
public void dropTablePartition() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createDropPartitionTable(tEnv);
tEnv.executeSql(
"INSERT INTO user_info VALUES" +
"(1, 'Bob', 90, TO_DATE('1995-10-01'))," +
"(2, 'Alice', 80, TO_DATE('1995-10-01')), " +
// "(3, 'Jack', 75, TO_DATE('1995-10-15'))," +
// "(3, 'Amy', 95, TO_DATE('1995-10-10')), " +
// "(5, 'Tom', 75, TO_DATE('1995-10-01'))," +
"(4, 'Mike', 70, TO_DATE('1995-10-02'))")
.await();
tEnv.executeSql("select * from user_info").print();
tEnv.executeSql("alter table user_info drop partition `date`='1995-10-01'");
}

@Test
public void alterTableNotSupported() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
Expand Down Expand Up @@ -98,10 +115,25 @@ private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws Execut
tEnvs.executeSql(createUserSql);
}


private void createLakeSoulSourceTableViewUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException {
String createUserSql = "create view if not exists user_info_view as select * from user_info";
tEnvs.executeSql("DROP view if exists user_info_view");
tEnvs.executeSql(createUserSql);
}

private void createDropPartitionTable(TableEnvironment tEnvs) {
String createUserSql = "create table user_info (" +
" order_id INT PRIMARY KEY NOT ENFORCED, " +
" name STRING, " +
" score FLOAT, " +
" `date` DATE" +
") PARTITIONED BY (`date`) WITH (" +
" 'format'='lakesoul'," +
" 'hashBucketNum'='2'," +
// " 'path'='" + getTempDirUri("/lakeSource/user") +
" 'path'='" + getTempDirUri("/Users/dudongfeng/work/zehy/lakesoul") +
"' )";
tEnvs.executeSql("DROP TABLE if exists user_info");
tEnvs.executeSql(createUserSql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ object SparkMetaVersion {
dbManager.deletePartitionInfoByTableId(table_id)
}

def dropPartitionInfoByRangeId(table_id: String, range_value: String): Unit = {
dbManager.deletePartitionInfoByTableAndPartition(table_id, range_value)
def dropPartitionInfoByRangeId(table_id: String, range_value: String): List[String] = {
dbManager.deleteMetaPartitionInfo(table_id, range_value).asScala.toList
}

def deleteShortTableName(short_table_name: String, table_name: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@

package org.apache.spark.sql.lakesoul.catalog

import com.dmetasoul.lakesoul.meta.{PartitionInfoScala, SparkMetaVersion}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
Expand All @@ -20,6 +24,8 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}

import java.util
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable

Expand All @@ -29,7 +35,7 @@ case class LakeSoulTableV2(spark: SparkSession,
tableIdentifier: Option[String] = None,
userDefinedFileIndex: Option[LakeSoulFileIndexV2] = None,
var mergeOperatorInfo: Option[Map[String, String]] = None)
extends Table with SupportsWrite with SupportsRead {
extends Table with SupportsWrite with SupportsRead with SupportsPartitionManagement {

val path: Path = SparkUtil.makeQualifiedTablePath(path_orig)

Expand Down Expand Up @@ -65,6 +71,9 @@ case class LakeSoulTableV2(spark: SparkSession,

private lazy val snapshot: Snapshot = snapshotManagement.snapshot

private val mapTablePartitionSpec: util.Map[InternalRow, PartitionInfoScala] =
new ConcurrentHashMap[InternalRow, PartitionInfoScala]()

override def schema(): StructType =
StructType(snapshot.getTableInfo.data_schema ++ snapshot.getTableInfo.range_partition_schema)

Expand Down Expand Up @@ -141,6 +150,74 @@ case class LakeSoulTableV2(spark: SparkSession,
SparkUtil.createRelation(partitionPredicates, snapshotManagement, spark)
}

override def partitionSchema(): StructType = {
snapshot.getTableInfo.range_partition_schema
}

override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = {
throw new UnsupportedOperationException(
"Cannot create partition: partitions are created implicitly when inserting new rows into iceberg tables")
}

override def dropPartition(ident: InternalRow): Boolean = {
if (mapTablePartitionSpec.containsKey(ident)) {
val partitionInfoScala = mapTablePartitionSpec.get(ident)
val deleteFilePaths = SparkMetaVersion.dropPartitionInfoByRangeId(partitionInfoScala.table_id, partitionInfoScala.range_value)
if (null != deleteFilePaths && deleteFilePaths.length > 0) {
val sessionHadoopConf = SparkSession.active.sessionState.newHadoopConf()
val fs = path.getFileSystem(sessionHadoopConf)
for (item <- deleteFilePaths) {
fs.delete(new Path(item), true)
}
}
mapTablePartitionSpec.remove(ident)
true
} else {
false
}
}

override def replacePartitionMetadata(ident: InternalRow, properties: util.Map[String, String]): Unit = {
throw new UnsupportedOperationException(
"Cannot replace partition metadata: iceberg partitions do not support metadata")
}

override def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = {
throw new UnsupportedOperationException(
"Cannot load partition metadata: iceberg partitions do not support metadata")
}

override def listPartitionIdentifiers(names: Array[String], ident: InternalRow): Array[InternalRow] = {
assert(names.length == ident.numFields,
s"Number of partition names (${names.length}) must be equal to " +
s"the number of partition values (${ident.numFields}).")
val schema = partitionSchema
assert(names.forall(fieldName => schema.fieldNames.contains(fieldName)),
s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to " +
s"the partition schema '${schema.sql}'.")
val indexes = names.map(schema.fieldIndex)
val dataTypes = names.map(schema(_).dataType)
val currentRow = new GenericInternalRow(new Array[Any](names.length))
val partitionInfoArray = snapshot.getPartitionInfoArray

partitionInfoArray.foreach(partition => {
val range_value = partition.range_value
val map = range_value.split(",").map(p => {
val strings = p.split("=")
strings(0) -> strings(1)
}).toMap
val row = ResolvePartitionSpec.convertToPartIdent(map, schema)
mapTablePartitionSpec.put(row, partition)
})

val ss = mapTablePartitionSpec.keySet().asScala.filter { key =>
for (i <- 0 until names.length) {
currentRow.values(i) = key.get(indexes(i), dataTypes(i))
}
currentRow == ident
}.toArray
ss
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ case class LakeSoulUnsupportedOperationsCheck(spark: SparkSession)
case a: AlterTableAddPartitionCommand =>
fail(operation = "ALTER TABLE ADD PARTITION", a.tableName)

case a: AlterTableDropPartitionCommand =>
fail(operation = "ALTER TABLE DROP PARTITION", a.tableName)
// case a: AlterTableDropPartitionCommand =>
// fail(operation = "ALTER TABLE DROP PARTITION", a.tableName)

case a: AlterTableSerDePropertiesCommand =>
fail(operation = "ALTER TABLE table SET SERDEPROPERTIES", a.tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ abstract class NotSupportedDDLBase extends QueryTest
"can not alter partitions")
}

test("ALTER TABLE DROP PARTITION") {
assertUnsupported(s"ALTER TABLE $partitionedTableName DROP PARTITION (p1=2)",
"can not alter partitions")
}

test("ALTER TABLE RECOVER PARTITIONS") {
assertUnsupported(s"ALTER TABLE $partitionedTableName RECOVER PARTITIONS")
assertUnsupported(s"MSCK REPAIR TABLE $partitionedTableName")
Expand Down
Loading

0 comments on commit 5cae5db

Please sign in to comment.