Skip to content

Commit

Permalink
aws - ses - identity has-statement filter (cloud-custodian#8640)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahlgreno authored Jun 20, 2023
1 parent 32d8cc6 commit 145c11f
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 1 deletion.
56 changes: 55 additions & 1 deletion c7n/resources/ses.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# Copyright The Cloud Custodian Authors.
# SPDX-License-Identifier: Apache-2.0
import json

from c7n.actions import BaseAction
import c7n.filters.policystatement as polstmt_filter
from c7n.manager import resources
from c7n.query import DescribeSource, QueryResourceManager, TypeInfo
from c7n.utils import local_session, type_schema
from c7n.utils import local_session, type_schema, format_string_values
from c7n.tags import universal_augment


Expand Down Expand Up @@ -88,3 +91,54 @@ class resource_type(TypeInfo):
permission_prefix = 'ses'
arn_service = 'ses'
cfn_type = 'AWS::SES::EmailIdentity'


@SESEmailIdentity.filter_registry.register('has-statement')
class HasStatementFilter(polstmt_filter.HasStatementFilter):

def __init__(self, data, manager=None):
super().__init__(data, manager)
self.policy_attribute = 'Policies'

def get_std_format_args(self, email_identity):
return {
'account_id': self.manager.config.account_id,
'region': self.manager.config.region,
'email_identity_name': email_identity['IdentityName'],
}

def process_resource(self, email_identity):
policies = email_identity.get(self.policy_attribute)
if not policies:
return None

for policy in policies.values():
p = json.loads(policy)

required = list(self.data.get('statement_ids', []))
statements = p.get('Statement', [])
for s in list(statements):
if s.get('Sid') in required:
required.remove(s['Sid'])

required_statements = format_string_values(list(self.data.get('statements', [])),
**self.get_std_format_args(email_identity))

for required_statement in required_statements:
for statement in statements:
found = 0
for key, value in required_statement.items():
if key in ['Action', 'NotAction']:
if key in statement and self.action_resource_case_insensitive(value) \
== self.action_resource_case_insensitive(statement[key]):
found += 1
else:
if key in statement and value == statement[key]:
found += 1
if found and found == len(required_statement):
required_statements.remove(required_statement)
break

if (self.data.get('statement_ids', []) and not required) or \
(self.data.get('statements', []) and not required_statements):
return email_identity
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"status_code": 200,
"data": {
"ResponseMetadata": {},
"IdentityType": "EMAIL_ADDRESS",
"FeedbackForwardingStatus": true,
"VerifiedForSendingStatus": false,
"DkimAttributes": {
"SigningEnabled": false,
"Status": "NOT_STARTED",
"SigningAttributesOrigin": "AWS_SES",
"NextSigningKeyLength": "RSA_1024_BIT"
},
"MailFromAttributes": {
"BehaviorOnMxFailure": "USE_DEFAULT_VALUE"
},
"Policies": {
"Policy2": "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Sid\":\"AllowStatement\",\"Effect\":\"Allow\",\"Principal\":\"*\",\"Action\":\"ses:GetEmailIdentity\",\"Resource\":\"arn:aws:ses:us-west-2:644160558196:identity/[email protected]\",\"Condition\":{}}]}",
"PolicyTestHasStatement": "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Sid\":\"stmt1686693628910\",\"Effect\":\"Deny\",\"Principal\":{\"AWS\":\"*\"},\"Action\":\"ses:SendEmail\",\"Resource\":\"arn:aws:ses:us-west-2:644160558196:identity/[email protected]\",\"Condition\":{\"StringNotEquals\":{\"ses:FromAddress\":\"*test*\"}}},{\"Sid\":\"DenyStatement\",\"Effect\":\"Deny\",\"Principal\":\"*\",\"Action\":\"ses:SendEmail\",\"Resource\":\"arn:aws:ses:us-west-2:644160558196:identity/[email protected]\",\"Condition\":{\"StringNotLike\":{\"ses:FromAddress\":\"*test*\"}}}]}"
},
"Tags": [],
"VerificationStatus": "PENDING"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"status_code": 200,
"data": {
"ResponseMetadata": {},
"EmailIdentities": [
{
"IdentityType": "EMAIL_ADDRESS",
"IdentityName": "[email protected]",
"SendingEnabled": false,
"VerificationStatus": "PENDING"
}
]
}
}
75 changes: 75 additions & 0 deletions tests/test_ses.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,78 @@ def test_ses_email_identity_query(self):
)
resources = p.run()
self.assertEqual(len(resources), 1)

def test_ses_email_identity_has_statement_definition(self):
session_factory = self.replay_flight_data("test_ses_email_identity_has_statement")
p = self.load_policy(
{
"name": "test_ses_email_identity_has_statement_definition",
"resource": "ses-email-identity",
"filters": [
{
"type": "has-statement",
"statements": [
{
"Effect": "Deny",
"Action": "ses:SendEmail",
"Principal": {"AWS": "*"},
"Condition":
{"StringNotEquals": {"ses:FromAddress": "*test*"}},
"Resource": "arn:aws:ses:us-west-2:644160558196:identity/[email protected]"
}
]
}
],
}, session_factory=session_factory,
config={'region': 'us-west-2'},
)
resources = p.run()
self.assertEqual(1, len(resources))
self.assertEqual(resources[0]["IdentityName"], "[email protected]")

def test_ses_email_identity_has_statement_star_definition(self):
session_factory = self.replay_flight_data("test_ses_email_identity_has_statement")
p = self.load_policy(
{
"name": "test_ses_email_identity_has_statement_star_definition",
"resource": "ses-email-identity",
"filters": [
{
"type": "has-statement",
"statements": [
{
"Effect": "Deny",
"Action": "ses:SendEmail",
"Principal": "*",
"Condition":
{"StringNotLike": {"ses:FromAddress": "*test*"}},
"Resource": "arn:aws:ses:us-west-2:644160558196:identity/[email protected]"
}
]
}
],
}, session_factory=session_factory,
config={'region': 'us-west-2'},
)
resources = p.run()
self.assertEqual(1, len(resources))
self.assertEqual(resources[0]["IdentityName"], "[email protected]")

def test_ses_email_identity_has_statement_id(self):
session_factory = self.replay_flight_data("test_ses_email_identity_has_statement")
p = self.load_policy(
{
"name": "test_ses_email_identity_has_statement_id",
"resource": "ses-email-identity",
"filters": [
{
"type": "has-statement",
"statement_ids": ["AllowStatement"]
}
],
}, session_factory=session_factory,
config={'region': 'us-west-2'},
)
resources = p.run()
self.assertEqual(1, len(resources))
self.assertEqual(resources[0]["IdentityName"], "[email protected]")

0 comments on commit 145c11f

Please sign in to comment.