diff --git a/integ-test/src/integration/resources/aws-logs/cloud_trail.sql b/integ-test/src/integration/resources/aws-logs/cloud_trail.sql index 41a11e8e0..d3c509196 100644 --- a/integ-test/src/integration/resources/aws-logs/cloud_trail.sql +++ b/integ-test/src/integration/resources/aws-logs/cloud_trail.sql @@ -63,119 +63,119 @@ OPTIONS ( ); INSERT INTO {table_name} VALUES - ( - '1.08', - NAMED_STRUCT( - 'type', 'IAMUser', - 'principalId', 'AWS123456789012', - 'arn', 'arn:aws:iam::123456789012:user/ExampleUser', - 'accountId', '123456789012', - 'invokedBy', null, - 'accessKeyId', 'AKIA1234567890', - 'userName', 'ExampleUser', - 'sessionContext', NAMED_STRUCT( - 'attributes', NAMED_STRUCT( - 'mfaAuthenticated', 'true', - 'creationDate', '2023-11-01T05:00:00Z' - ), - 'sessionIssuer', NAMED_STRUCT( - 'type', 'Role', - 'principalId', 'ARO123456789012', - 'arn', 'arn:aws:iam::123456789012:role/MyRole', - 'accountId', '123456789012', - 'userName', 'MyRole' - ), - 'ec2RoleDelivery', 'true', - 'webIdFederationData', MAP() - ) - ), - '2023-11-01T05:00:00Z', - 'sts.amazonaws.com', - 'AssumeRole', - 'us-east-1', - '198.51.100.45', - 'AWS CLI', - null, - null, - null, - null, - null, - 'request-id-1', - 'event-id-1', - ARRAY(NAMED_STRUCT( - 'arn', 'arn:aws:iam::123456789012:role/MyRole', - 'accountId', '123456789012', - 'type', 'AWS::IAM::Role' - )), - 'AwsApiCall', - '2015-03-31', - 'true', - '123456789012', - null, - null, - null, - 'Management', - NAMED_STRUCT( - 'tlsVersion', 'TLSv1.2', - 'cipherSuite', 'ECDHE-RSA-AES128-GCM-SHA256', - 'clientProvidedHostHeader', null +( + '1.08', + NAMED_STRUCT( + 'type', 'IAMUser', + 'principalId', 'AWS123456789012', + 'arn', 'arn:aws:iam::123456789012:user/ExampleUser', + 'accountId', '123456789012', + 'invokedBy', null, + 'accessKeyId', 'AKIA1234567890', + 'userName', 'ExampleUser', + 'sessionContext', NAMED_STRUCT( + 'attributes', NAMED_STRUCT( + 'mfaAuthenticated', 'true', + 'creationDate', '2023-11-01T05:00:00Z' + ), + 'sessionIssuer', NAMED_STRUCT( + 'type', 'Role', + 'principalId', 'ARO123456789012', + 'arn', 'arn:aws:iam::123456789012:role/MyRole', + 'accountId', '123456789012', + 'userName', 'MyRole' + ), + 'ec2RoleDelivery', 'true', + 'webIdFederationData', MAP() ) ), - ( - '1.08', - NAMED_STRUCT( - 'type', 'IAMUser', - 'principalId', 'AWS123456789012', - 'arn', 'arn:aws:iam::123456789012:user/ExampleUser', - 'accountId', '123456789012', - 'invokedBy', null, - 'accessKeyId', 'AKIA1234567890', - 'userName', 'ExampleUser', - 'sessionContext', NAMED_STRUCT( - 'attributes', NAMED_STRUCT( - 'mfaAuthenticated', 'true', - 'creationDate', '2023-11-01T05:06:00Z' - ), - 'sessionIssuer', NAMED_STRUCT( - 'type', 'Role', - 'principalId', 'ARO123456789012', - 'arn', 'arn:aws:iam::123456789012:role/MyRole', - 'accountId', '123456789012', - 'userName', 'MyRole' - ), - 'ec2RoleDelivery', 'true', - 'webIdFederationData', MAP() - ) - ), - '2023-11-01T05:06:00Z', - 'sts.amazonaws.com', - 'AssumeRole', - 'us-east-1', - '198.51.100.45', - 'AWS CLI', - null, - null, - null, - null, - null, - 'request-id-2', - 'event-id-2', - ARRAY(NAMED_STRUCT( - 'arn', 'arn:aws:iam::123456789012:role/MyRole', - 'accountId', '123456789012', - 'type', 'AWS::IAM::Role' - )), - 'AwsApiCall', - '2015-03-31', - 'true', - '123456789012', - null, - null, - null, - 'Management', - NAMED_STRUCT( - 'tlsVersion', 'TLSv1.2', - 'cipherSuite', 'ECDHE-RSA-AES128-GCM-SHA256', - 'clientProvidedHostHeader', null + '2023-11-01T05:00:00Z', + 'sts.amazonaws.com', + 'AssumeRole', + 'us-east-1', + '198.51.100.45', + 'AWS CLI', + null, + null, + null, + null, + null, + 'request-id-1', + 'event-id-1', + ARRAY(NAMED_STRUCT( + 'arn', 'arn:aws:iam::123456789012:role/MyRole', + 'accountId', '123456789012', + 'type', 'AWS::IAM::Role' + )), + 'AwsApiCall', + '2015-03-31', + 'true', + '123456789012', + null, + null, + null, + 'Management', + NAMED_STRUCT( + 'tlsVersion', 'TLSv1.2', + 'cipherSuite', 'ECDHE-RSA-AES128-GCM-SHA256', + 'clientProvidedHostHeader', null + ) +), +( + '1.08', + NAMED_STRUCT( + 'type', 'IAMUser', + 'principalId', 'AWS123456789012', + 'arn', 'arn:aws:iam::123456789012:user/ExampleUser', + 'accountId', '123456789012', + 'invokedBy', null, + 'accessKeyId', 'AKIA1234567890', + 'userName', 'ExampleUser', + 'sessionContext', NAMED_STRUCT( + 'attributes', NAMED_STRUCT( + 'mfaAuthenticated', 'true', + 'creationDate', '2023-11-01T05:06:00Z' + ), + 'sessionIssuer', NAMED_STRUCT( + 'type', 'Role', + 'principalId', 'ARO123456789012', + 'arn', 'arn:aws:iam::123456789012:role/MyRole', + 'accountId', '123456789012', + 'userName', 'MyRole' + ), + 'ec2RoleDelivery', 'true', + 'webIdFederationData', MAP() ) - ); + ), + '2023-11-01T05:06:00Z', + 'sts.amazonaws.com', + 'AssumeRole', + 'us-east-1', + '198.51.100.45', + 'AWS CLI', + null, + null, + null, + null, + null, + 'request-id-2', + 'event-id-2', + ARRAY(NAMED_STRUCT( + 'arn', 'arn:aws:iam::123456789012:role/MyRole', + 'accountId', '123456789012', + 'type', 'AWS::IAM::Role' + )), + 'AwsApiCall', + '2015-03-31', + 'true', + '123456789012', + null, + null, + null, + 'Management', + NAMED_STRUCT( + 'tlsVersion', 'TLSv1.2', + 'cipherSuite', 'ECDHE-RSA-AES128-GCM-SHA256', + 'clientProvidedHostHeader', null + ) +); diff --git a/integ-test/src/integration/resources/aws-logs/waf.sql b/integ-test/src/integration/resources/aws-logs/waf.sql new file mode 100644 index 000000000..10da3d87c --- /dev/null +++ b/integ-test/src/integration/resources/aws-logs/waf.sql @@ -0,0 +1,91 @@ +CREATE TABLE {table_name} ( + timestamp STRING, + webaclId STRING, + action STRING, + formatVersion INT, + httpRequest STRUCT< + clientIp: STRING, + country: STRING, + headers: ARRAY>, + uri: STRING, + args: STRING, + httpVersion: STRING, + httpMethod: STRING, + requestId: STRING + >, + httpSourceId STRING, + httpSourceName STRING, + requestBodySize INT, + requestBodySizeInspectedByWAF INT, + terminatingRuleId STRING, + terminatingRuleType STRING, + ruleGroupList ARRAY>, + rateBasedRuleList ARRAY>, + nonTerminatingMatchingRules ARRAY> +) +USING json +OPTIONS ( + recursivefilelookup = 'true' +); + +INSERT INTO {table_name} VALUES +( + 1698814800000, -- 2023-11-01T05:00:00Z + 'webacl-12345', + 'ALLOW', + 1, + NAMED_STRUCT( + 'clientIp', '192.0.2.1', + 'country', 'US', + 'headers', ARRAY(NAMED_STRUCT('name', 'User-Agent', 'value', 'Mozilla/5.0')), + 'uri', '/index.html', + 'args', 'query=example', + 'httpVersion', 'HTTP/1.1', + 'httpMethod', 'GET', + 'requestId', 'req-1' + ), + 'source-1', + 'http-source', + 500, + 450, + 'rule-1', + 'REGULAR', + ARRAY(NAMED_STRUCT('ruleId', 'group-rule-1', 'ruleAction', 'ALLOW')), + ARRAY(), + ARRAY() +), +( + 1698815400000, -- 2023-11-01T05:10:00Z + 'webacl-67890', + 'BLOCK', + 1, + NAMED_STRUCT( + 'clientIp', '192.0.2.2', + 'country', 'CA', + 'headers', ARRAY(NAMED_STRUCT('name', 'Referer', 'value', 'example.com')), + 'uri', '/login.html', + 'args', '', + 'httpVersion', 'HTTP/2', + 'httpMethod', 'POST', + 'requestId', 'req-2' + ), + 'source-2', + 'http-source', + 750, + 600, + 'rule-2', + 'RATE_BASED', + ARRAY(NAMED_STRUCT('ruleId', 'group-rule-2', 'ruleAction', 'BLOCK')), + ARRAY(), + ARRAY() +); diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewIntegrationsITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewIntegrationsITSuite.scala index 1c9e3a867..d1273dae7 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewIntegrationsITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewIntegrationsITSuite.scala @@ -19,7 +19,7 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with Matchers { - test("create materialized view for VPC flow integration") { + test("create aggregated materialized view for VPC flow integration") { withIntegration("vpc_flow") { integration => integration .createSourceTable(s"$catalogName.default.vpc_low_test") @@ -83,7 +83,7 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with } } - test("create materialized view for CloudTrail integration") { + test("create aggregated materialized view for CloudTrail integration") { withIntegration("cloud_trail") { integration => integration .createSourceTable(s"$catalogName.default.cloud_trail_test") @@ -146,6 +146,69 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with } } + test("create aggregated materialized view for WAF integration") { + withIntegration("waf") { integration => + integration + .createSourceTable(s"$catalogName.default.waf_test") + .createMaterializedView(s""" + |SELECT + | TUMBLE(`@timestamp`, '5 Minute').start AS `start_time`, + | webaclId AS `aws.waf.webaclId`, + | action AS `aws.waf.action`, + | `httpRequest.clientIp` AS `aws.waf.httpRequest.clientIp`, + | `httpRequest.country` AS `aws.waf.httpRequest.country`, + | `httpRequest.uri` AS `aws.waf.httpRequest.uri`, + | `httpRequest.httpMethod` AS `aws.waf.httpRequest.httpMethod`, + | httpSourceId AS `aws.waf.httpSourceId`, + | terminatingRuleId AS `aws.waf.terminatingRuleId`, + | terminatingRuleType AS `aws.waf.RuleType`, + | `ruleGroupList.ruleId` AS `aws.waf.ruleGroupList.ruleId`, + | COUNT(*) AS `aws.waf.event_count` + |FROM ( + | SELECT + | CAST(FROM_UNIXTIME(`timestamp`/1000) AS TIMESTAMP) AS `@timestamp`, + | webaclId, + | action, + | httpRequest.clientIp AS `httpRequest.clientIp`, + | httpRequest.country AS `httpRequest.country`, + | httpRequest.uri AS `httpRequest.uri`, + | httpRequest.httpMethod AS `httpRequest.httpMethod`, + | httpSourceId, + | terminatingRuleId, + | terminatingRuleType, + | ruleGroupList.ruleId AS `ruleGroupList.ruleId` + | FROM + | $catalogName.default.waf_test + |) + |GROUP BY + | TUMBLE(`@timestamp`, '5 Minute'), + | webaclId, + | action, + | `httpRequest.clientIp`, + | `httpRequest.country`, + | `httpRequest.uri`, + | `httpRequest.httpMethod`, + | httpSourceId, + | terminatingRuleId, + | terminatingRuleType, + | `ruleGroupList.ruleId` + |""".stripMargin) + .assertIndexData(Row( + timestampFromUTC("2023-11-01T05:00:00Z"), + "webacl-12345", + "ALLOW", + "192.0.2.1", + "US", + "/index.html", + "GET", + "source-1", + "rule-1", + "REGULAR", + Array("group-rule-1"), + 1)) + } + } + private def withIntegration(name: String)(codeBlock: IntegrationHelper => Unit): Unit = { withTempDir { checkpointDir => val tableName = s"$catalogName.default.${name}_test"