Skip to content

Commit

Permalink
Add more comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Dec 20, 2024
1 parent e65870c commit 9a0d213
Showing 1 changed file with 50 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.resourceToString
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE

/**
* A sanity test for verifying Observability Integration dashboard with Flint MV. The
* integration_name is used to load the corresponding SQL statement from the resource folder.
*
* Example:
* {{{
* test("create aggregated materialized view for {integration_name} integration") {
* withIntegration("{integration_name}") { integration =>
* integration
* .createSourceTable("catalog.default.{integration_name}_test")
* .createMaterializedView(
* s"""
* |SELECT ...
* |FROM ...
* |GROUP BY ...
* |""".stripMargin)
* .assertIndexData(...)
* }
* }
* }}}
*/
class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with Matchers {

test("create aggregated materialized view for VPC flow integration") {
Expand Down Expand Up @@ -209,24 +230,44 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with
}
}

/**
* Executes a block of code within the context of a specific integration test. This method sets
* up the required temporary directory and passes an `IntegrationHelper` instance to the code
* block to facilitate actions like creating source tables, materialized views, and asserting
* results.
*
* @param name
* the name of the integration (e.g., "waf", "cloud_trail")
* @param codeBlock
* the block of code to execute with the integration setup
*/
private def withIntegration(name: String)(codeBlock: IntegrationHelper => Unit): Unit = {
withTempDir { checkpointDir =>
val tableName = s"$catalogName.default.${name}_test"

withTable(tableName) {
codeBlock(new IntegrationHelper(name, tableName, checkpointDir))
val integration = new IntegrationHelper(name, checkpointDir)
try {
codeBlock(integration)
} finally {
sql(s"DROP TABLE ${integration.tableName}")
}
}
}

private class IntegrationHelper(
integrationName: String,
tableName: String,
checkpointDir: File) {
/**
* A helper class to facilitate actions like creating source tables, materialized views, and
* asserting results.
*
* @param integrationName
* the name of the integration (e.g., "waf", "cloud_trail")
* @param checkpointDir
* the directory for Spark Streaming checkpointing
*/
private class IntegrationHelper(integrationName: String, checkpointDir: File) {
var tableName: String = _
private var mvName: String = _
private var mvQuery: String = _

def createSourceTable(tableName: String): IntegrationHelper = {
this.tableName = tableName
val sqlTemplate = resourceToString(s"aws-logs/$integrationName.sql").mkString
val sqlStatements =
sqlTemplate
Expand Down Expand Up @@ -255,6 +296,7 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with
|)
|""".stripMargin)

// Wait for all data processed
val job = spark.streams.active
.find(_.name == getFlintIndexName(mvName))
.getOrElse(fail(s"Streaming job not found for integration: $integrationName"))
Expand Down

0 comments on commit 9a0d213

Please sign in to comment.