From fc4ef8dc62e1c9cfa6793df930b134920c811643 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 17 Jun 2024 15:33:54 -0700 Subject: [PATCH] Add IT to test version tracking Signed-off-by: Chen Dai --- .../FlintSparkIcebergMetadataITSuite.scala | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMetadataITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMetadataITSuite.scala index 5a6120294..4972ff21d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMetadataITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMetadataITSuite.scala @@ -10,6 +10,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIn import org.scalatest.matchers.should.Matchers import org.apache.spark.sql.{ExplainSuiteHelper, Row} +import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_COVERING_INDEX_ENABLED class FlintSparkIcebergMetadataITSuite @@ -58,6 +59,46 @@ class FlintSparkIcebergMetadataITSuite awaitStreamingComplete(job.get.id.toString) } + /** + * Logging after V4 Snapshot ID: + * {"version":1,"snapshot_id":5955838147460350333,"position":1,"scan_all_files":false} + * + * Logging after V5 Snapshot ID: + * {"version":1,"snapshot_id":7393048775934035924,"position":1,"scan_all_files":false} + * + * Verify if correct in Iceberg history metadata table + * | made_current_at | snapshot_id | parent_id | is_current_ancestor | + * |---------------------:|--------------------:|--------------------:|:--------------------| + * | 2024-06-17 15:30:... | 1552180198061740807 | null | true | + * | 2024-06-17 15:30:... | 4538411166290920590 | 1552180198061740807 | true | + * | 2024-06-17 15:30:... | 8681526725408438495 | 4538411166290920590 | true | + * | 2024-06-17 15:30:... | 5955838147460350333 | 8681526725408438495 | true | + * | 2024-06-17 15:30:... | 7393048775934035924 | 5955838147460350333 | true | + */ + test("version tracking") { + val job = spark.streams.active.find(_.name == testFlintIndex) + val query = job.get match { + case wrapper: StreamingQueryWrapper => wrapper.streamingQuery + case other: StreamExecution => other + } + + def snapshotId: String = query.committedOffsets.headOption.get._2.json() + + // v4 + sql( + s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-04 00:01:00', '200', 'Accept')") + awaitStreamingComplete(job.get.id.toString) + logInfo(s"Snapshot ID: $snapshotId") + + // v5 + sql( + s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-05 00:01:00', '200', 'Accept')") + awaitStreamingComplete(job.get.id.toString) + logInfo(s"Snapshot ID: $snapshotId") + + sql(s"SELECT * FROM $testIcebergTable.history").show + } + test("data expiration") { flint.queryIndex(testFlintIndex).count() shouldBe 3