Skip to content

Commit

Permalink
Add IT to test version tracking
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 17, 2024
1 parent a6f1d8e commit fc4ef8d
Showing 1 changed file with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIn
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.{ExplainSuiteHelper, Row}
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_COVERING_INDEX_ENABLED

class FlintSparkIcebergMetadataITSuite
Expand Down Expand Up @@ -58,6 +59,46 @@ class FlintSparkIcebergMetadataITSuite
awaitStreamingComplete(job.get.id.toString)
}

/**
* Logging after V4 Snapshot ID:
* {"version":1,"snapshot_id":5955838147460350333,"position":1,"scan_all_files":false}
*
* Logging after V5 Snapshot ID:
* {"version":1,"snapshot_id":7393048775934035924,"position":1,"scan_all_files":false}
*
* Verify if correct in Iceberg history metadata table
* | made_current_at | snapshot_id | parent_id | is_current_ancestor |
* |---------------------:|--------------------:|--------------------:|:--------------------|
* | 2024-06-17 15:30:... | 1552180198061740807 | null | true |
* | 2024-06-17 15:30:... | 4538411166290920590 | 1552180198061740807 | true |
* | 2024-06-17 15:30:... | 8681526725408438495 | 4538411166290920590 | true |
* | 2024-06-17 15:30:... | 5955838147460350333 | 8681526725408438495 | true |
* | 2024-06-17 15:30:... | 7393048775934035924 | 5955838147460350333 | true |
*/
test("version tracking") {
val job = spark.streams.active.find(_.name == testFlintIndex)
val query = job.get match {
case wrapper: StreamingQueryWrapper => wrapper.streamingQuery
case other: StreamExecution => other
}

def snapshotId: String = query.committedOffsets.headOption.get._2.json()

// v4
sql(
s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-04 00:01:00', '200', 'Accept')")
awaitStreamingComplete(job.get.id.toString)
logInfo(s"Snapshot ID: $snapshotId")

// v5
sql(
s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-05 00:01:00', '200', 'Accept')")
awaitStreamingComplete(job.get.id.toString)
logInfo(s"Snapshot ID: $snapshotId")

sql(s"SELECT * FROM $testIcebergTable.history").show
}

test("data expiration") {
flint.queryIndex(testFlintIndex).count() shouldBe 3

Expand Down

0 comments on commit fc4ef8d

Please sign in to comment.