Skip to content

Commit

Permalink
Add IT for CloudTrail integration
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Dec 16, 2024
1 parent 988a098 commit 4c695a9
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 7 deletions.
181 changes: 181 additions & 0 deletions integ-test/src/integration/resources/aws-logs/cloud_trail.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
CREATE TABLE {table_name} (
eventVersion STRING,
userIdentity STRUCT<
type:STRING,
principalId:STRING,
arn:STRING,
accountId:STRING,
invokedBy:STRING,
accessKeyId:STRING,
userName:STRING,
sessionContext:STRUCT<
attributes:STRUCT<
mfaAuthenticated:STRING,
creationDate:STRING
>,
sessionIssuer:STRUCT<
type:STRING,
principalId:STRING,
arn:STRING,
accountId:STRING,
userName:STRING
>,
ec2RoleDelivery:STRING,
webIdFederationData:MAP<STRING,STRING>
>
>,
eventTime STRING,
eventSource STRING,
eventName STRING,
awsRegion STRING,
sourceIPAddress STRING,
userAgent STRING,
errorCode STRING,
errorMessage STRING,
requestParameters STRING,
responseElements STRING,
additionalEventData STRING,
requestId STRING,
eventId STRING,
resources ARRAY<STRUCT<
arn:STRING,
accountId:STRING,
type:STRING
>>,
eventType STRING,
apiVersion STRING,
readOnly STRING,
recipientAccountId STRING,
serviceEventDetails STRING,
sharedEventId STRING,
vpcEndpointId STRING,
eventCategory STRING,
tlsDetails STRUCT<
tlsVersion:STRING,
cipherSuite:STRING,
clientProvidedHostHeader:STRING
>
)
USING json
OPTIONS (
recursivefilelookup='true',
multiline 'true'
);

INSERT INTO {table_name} VALUES
(
'1.08',
NAMED_STRUCT(
'type', 'IAMUser',
'principalId', 'AWS123456789012',
'arn', 'arn:aws:iam::123456789012:user/ExampleUser',
'accountId', '123456789012',
'invokedBy', null,
'accessKeyId', 'AKIA1234567890',
'userName', 'ExampleUser',
'sessionContext', NAMED_STRUCT(
'attributes', NAMED_STRUCT(
'mfaAuthenticated', 'true',
'creationDate', '2023-11-01T05:00:00Z'
),
'sessionIssuer', NAMED_STRUCT(
'type', 'Role',
'principalId', 'ARO123456789012',
'arn', 'arn:aws:iam::123456789012:role/MyRole',
'accountId', '123456789012',
'userName', 'MyRole'
),
'ec2RoleDelivery', 'true',
'webIdFederationData', MAP()
)
),
'2023-11-01T05:00:00Z',
'sts.amazonaws.com',
'AssumeRole',
'us-east-1',
'198.51.100.45',
'AWS CLI',
null,
null,
null,
null,
null,
'request-id-1',
'event-id-1',
ARRAY(NAMED_STRUCT(
'arn', 'arn:aws:iam::123456789012:role/MyRole',
'accountId', '123456789012',
'type', 'AWS::IAM::Role'
)),
'AwsApiCall',
'2015-03-31',
'true',
'123456789012',
null,
null,
null,
'Management',
NAMED_STRUCT(
'tlsVersion', 'TLSv1.2',
'cipherSuite', 'ECDHE-RSA-AES128-GCM-SHA256',
'clientProvidedHostHeader', null
)
),
(
'1.08',
NAMED_STRUCT(
'type', 'IAMUser',
'principalId', 'AWS123456789012',
'arn', 'arn:aws:iam::123456789012:user/ExampleUser',
'accountId', '123456789012',
'invokedBy', null,
'accessKeyId', 'AKIA1234567890',
'userName', 'ExampleUser',
'sessionContext', NAMED_STRUCT(
'attributes', NAMED_STRUCT(
'mfaAuthenticated', 'true',
'creationDate', '2023-11-01T05:06:00Z'
),
'sessionIssuer', NAMED_STRUCT(
'type', 'Role',
'principalId', 'ARO123456789012',
'arn', 'arn:aws:iam::123456789012:role/MyRole',
'accountId', '123456789012',
'userName', 'MyRole'
),
'ec2RoleDelivery', 'true',
'webIdFederationData', MAP()
)
),
'2023-11-01T05:06:00Z',
'sts.amazonaws.com',
'AssumeRole',
'us-east-1',
'198.51.100.45',
'AWS CLI',
null,
null,
null,
null,
null,
'request-id-2',
'event-id-2',
ARRAY(NAMED_STRUCT(
'arn', 'arn:aws:iam::123456789012:role/MyRole',
'accountId', '123456789012',
'type', 'AWS::IAM::Role'
)),
'AwsApiCall',
'2015-03-31',
'true',
'123456789012',
null,
null,
null,
'Management',
NAMED_STRUCT(
'tlsVersion', 'TLSv1.2',
'cipherSuite', 'ECDHE-RSA-AES128-GCM-SHA256',
'clientProvidedHostHeader', null
)
);
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with
test("create materialized view for VPC flow integration") {
withIntegration("vpc_flow") { integration =>
integration
.createSourceTable(s"$catalogName.default.vpc_low_logs")
.createSourceTable(s"$catalogName.default.vpc_low_test")
.createMaterializedView(s"""
|SELECT
| TUMBLE(`@timestamp`, '5 Minute').start AS `start_time`,
Expand All @@ -41,7 +41,7 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with
| protocol,
| CAST(FROM_UNIXTIME(start) AS TIMESTAMP) AS `@timestamp`
| FROM
| $catalogName.default.vpc_low_logs
| $catalogName.default.vpc_low_test
|)
|GROUP BY
| TUMBLE(`@timestamp`, '5 Minute'),
Expand Down Expand Up @@ -81,15 +81,75 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with
}
}

test("create materialized view for CloudTrail integration") {
withIntegration("cloud_trail") { integration =>
integration
.createSourceTable(s"$catalogName.default.cloud_trail_test")
.createMaterializedView(s"""
|SELECT
| TUMBLE(`@timestamp`, '5 Minute').start AS `start_time`,
| `userIdentity.type` AS `aws.cloudtrail.userIdentity.type`,
| `userIdentity.accountId` AS `aws.cloudtrail.userIdentity.accountId`,
| `userIdentity.sessionContext.sessionIssuer.userName` AS `aws.cloudtrail.userIdentity.sessionContext.sessionIssuer.userName`,
| `userIdentity.sessionContext.sessionIssuer.arn` AS `aws.cloudtrail.userIdentity.sessionContext.sessionIssuer.arn`,
| `userIdentity.sessionContext.sessionIssuer.type` AS `aws.cloudtrail.userIdentity.sessionContext.sessionIssuer.type`,
| awsRegion AS `aws.cloudtrail.awsRegion`,
| sourceIPAddress AS `aws.cloudtrail.sourceIPAddress`,
| eventSource AS `aws.cloudtrail.eventSource`,
| eventName AS `aws.cloudtrail.eventName`,
| eventCategory AS `aws.cloudtrail.eventCategory`,
| COUNT(*) AS `aws.cloudtrail.event_count`
|FROM (
| SELECT
| CAST(eventTime AS TIMESTAMP) AS `@timestamp`,
| userIdentity.`type` AS `userIdentity.type`,
| userIdentity.`accountId` AS `userIdentity.accountId`,
| userIdentity.sessionContext.sessionIssuer.userName AS `userIdentity.sessionContext.sessionIssuer.userName`,
| userIdentity.sessionContext.sessionIssuer.arn AS `userIdentity.sessionContext.sessionIssuer.arn`,
| userIdentity.sessionContext.sessionIssuer.type AS `userIdentity.sessionContext.sessionIssuer.type`,
| awsRegion,
| sourceIPAddress,
| eventSource,
| eventName,
| eventCategory
| FROM
| $catalogName.default.cloud_trail_test
|)
|GROUP BY
| TUMBLE(`@timestamp`, '5 Minute'),
| `userIdentity.type`,
| `userIdentity.accountId`,
| `userIdentity.sessionContext.sessionIssuer.userName`,
| `userIdentity.sessionContext.sessionIssuer.arn`,
| `userIdentity.sessionContext.sessionIssuer.type`,
| awsRegion,
| sourceIPAddress,
| eventSource,
| eventName,
| eventCategory
|""".stripMargin)
.assertIndexData(Row(
Timestamp.valueOf("2023-10-31 22:00:00"),
"IAMUser",
"123456789012",
"MyRole",
"arn:aws:iam::123456789012:role/MyRole",
"Role",
"us-east-1",
"198.51.100.45",
"sts.amazonaws.com",
"AssumeRole",
"Management",
1))
}
}

private def withIntegration(name: String)(codeBlock: IntegrationHelper => Unit): Unit = {
withTempDir { checkpointDir =>
val tableName = s"$catalogName.default.${name}_test"

withTable(tableName) {
val integration = new IntegrationHelper(name, tableName, checkpointDir)
// integration.createSourceTable()

codeBlock(integration)
codeBlock(new IntegrationHelper(name, tableName, checkpointDir))
}
}
}
Expand Down Expand Up @@ -140,7 +200,8 @@ class FlintSparkMaterializedViewIntegrationsITSuite extends FlintSparkSuite with
}

def assertIndexData(expectedRows: Row*): Unit = {
val flintIndexName = spark.streams.active.find(_.name == getFlintIndexName(mvName)).get.name
val flintIndexName =
spark.streams.active.find(_.name == getFlintIndexName(mvName)).get.name
val actualRows = spark.read
.format(FLINT_DATASOURCE)
.options(openSearchOptions)
Expand Down

0 comments on commit 4c695a9

Please sign in to comment.