diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewIntegrationsITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewIntegrationsITSuite.scala index d1273dae7..c355aaa97 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewIntegrationsITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewIntegrationsITSuite.scala @@ -17,6 +17,27 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.resourceToString import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +/** + * A sanity test for verifying Observability Integration dashboard with Flint MV. The + * integration_name is used to load the corresponding SQL statement from the resource folder. + * + * Example: + * {{{ + * test("create aggregated materialized view for {integration_name} integration") { + * withIntegration("{integration_name}") { integration => + * integration + * .createSourceTable("catalog.default.{integration_name}_test") + * .createMaterializedView( + * s""" + * |SELECT ... + * |FROM ... + * |GROUP BY ... + * |""".stripMargin) + * .assertIndexData(...) + * } + * } + * }}} + */ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with Matchers { test("create aggregated materialized view for VPC flow integration") { @@ -209,24 +230,44 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with } } + /** + * Executes a block of code within the context of a specific integration test. This method sets + * up the required temporary directory and passes an `IntegrationHelper` instance to the code + * block to facilitate actions like creating source tables, materialized views, and asserting + * results. + * + * @param name + * the name of the integration (e.g., "waf", "cloud_trail") + * @param codeBlock + * the block of code to execute with the integration setup + */ private def withIntegration(name: String)(codeBlock: IntegrationHelper => Unit): Unit = { withTempDir { checkpointDir => - val tableName = s"$catalogName.default.${name}_test" - - withTable(tableName) { - codeBlock(new IntegrationHelper(name, tableName, checkpointDir)) + val integration = new IntegrationHelper(name, checkpointDir) + try { + codeBlock(integration) + } finally { + sql(s"DROP TABLE ${integration.tableName}") } } } - private class IntegrationHelper( - integrationName: String, - tableName: String, - checkpointDir: File) { + /** + * A helper class to facilitate actions like creating source tables, materialized views, and + * asserting results. + * + * @param integrationName + * the name of the integration (e.g., "waf", "cloud_trail") + * @param checkpointDir + * the directory for Spark Streaming checkpointing + */ + private class IntegrationHelper(integrationName: String, checkpointDir: File) { + var tableName: String = _ private var mvName: String = _ private var mvQuery: String = _ def createSourceTable(tableName: String): IntegrationHelper = { + this.tableName = tableName val sqlTemplate = resourceToString(s"aws-logs/$integrationName.sql").mkString val sqlStatements = sqlTemplate @@ -255,6 +296,7 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with |) |""".stripMargin) + // Wait for all data processed val job = spark.streams.active .find(_.name == getFlintIndexName(mvName)) .getOrElse(fail(s"Streaming job not found for integration: $integrationName"))