forked from openedx-unsupported/configuration
-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3_acl.py
193 lines (168 loc) · 6.89 KB
/
s3_acl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/python3
"""
Get current ACL of all objects in given S3 bucket or set them to private or revert back.
Script supports 3 operations
1- getacl
2- setaclprivate
3- revertacl
1 optional parameter
exclude (optional) (provide multiple parameters to filter out)
It saves current ACL in a file named bucketname.txt for updating or reverting purposes.
python s3_acl.py --bucketname <name-of-bucket> --operation getacl --exclude <prefix_to_avoid>
Should assume role to run this script.
"""
import boto3
from botocore.exceptions import ClientError
import backoff
import sys
import json
import click
import logging
MAX_TRIES = 5
region = "us-east-1"
# Set logging configuration
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# create file handler that logs messages
filehandler = logging.FileHandler('s3_acl.log')
filehandler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
filehandler.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(filehandler)
class S3BotoWrapper:
def __init__(self, **kwargs):
self.client = boto3.client("s3", **kwargs)
@backoff.on_exception(backoff.expo,
ClientError,
max_tries=MAX_TRIES)
def get_object(self, *args, **kwargs):
return self.client.list_objects_v2(*args, **kwargs)
@backoff.on_exception(backoff.expo,
ClientError,
max_tries=MAX_TRIES)
def get_acl(self, *args, **kwargs):
return self.client.get_object_acl(*args, **kwargs)
@backoff.on_exception(backoff.expo,
ClientError,
max_tries=MAX_TRIES)
def put_acl(self, *args, **kwargs):
return self.client.put_object_acl(*args, **kwargs)
def get_all_s3_keys(s3_bucket, region, exclude):
"""Get a list of all keys in an S3 bucket."""
keys = []
kwargs = {'Bucket': s3_bucket}
while True:
s3_client = S3BotoWrapper(region_name=region)
resp = s3_client.get_object(**kwargs)
for obj in resp['Contents']:
# Filter out directories, you can add more filters here if required.
if obj['Key'][-1] == '/' or any(obj['Key'].startswith(filter_object) for filter_object in exclude):
continue
else:
keys.append(obj['Key'])
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
return keys
def set_acl_private(acl_list, bucket_name, exclude):
s3_client = S3BotoWrapper(region_name=region)
for item in acl_list:
for key, value in item.items():
if any(key.startswith(filter_object) for filter_object in exclude):
continue
else:
try:
s3_client.put_acl(
ACL='private',
Bucket=bucket_name,
Key=key,
)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
logger.warning("No such key in S3: " + key) # Will send the errors to the file
else:
logger.error(f"Unexpected error :{e}")
sys.exit(1)
def revert_s3_acl(acl_list, bucket_name, exclude):
s3_client = S3BotoWrapper(region_name=region)
for item in acl_list:
for key, value in item.items():
if any(key.startswith(filter_object) for filter_object in exclude):
continue
else:
try:
value.pop('ResponseMetadata', None)
s3_client.put_acl(
AccessControlPolicy=value,
Bucket=bucket_name,
Key=key,
)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
logger.warning("No such key in S3: " + key) # Will send the errors to the file
else:
logger.error(f"Unexpected error :{e}")
sys.exit(1)
def get_s3_acl(s3_bucket, exclude):
s3_client = S3BotoWrapper(region_name=region)
response_list = []
try:
s3_objects_key = get_all_s3_keys(s3_bucket, region, exclude)
except ClientError as e:
logger.error(f"Unable to connect to AWS with error :{e}")
sys.exit(1)
for object_key in s3_objects_key:
try:
temp = {}
response = s3_client.get_acl(Bucket=s3_bucket, Key=object_key)
temp[object_key] = response
response_list.append(temp)
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
logger.warning("You Don't have permission to access this object: " + object_key)
elif e.response['Error']['Code'] == 'NoSuchKey':
logger.warning("No such key in S3: " + object_key) # Will send the errors to the file
else:
logger.error(f"Unexpected error :{e}")
sys.exit(1)
return response_list
@click.command()
@click.option('--bucketname', required=True, help='S3 bucket name')
@click.option('--operation', required=True, help='Operation name to perform i.e 1- getacl 2- setaclprivate 3- revertacl')
@click.option('--exclude', '-i', multiple=True, help='S3 objects name to avoid')
def controller(bucketname, operation, exclude):
file_to_write = bucketname + ".txt"
if operation == 'getacl':
objects_acl = get_s3_acl(bucketname, exclude)
with open(file_to_write, 'w') as fout:
json.dump(objects_acl, fout)
logger.info("Task completed. Total numbers of objects read are: " + str(len(objects_acl)))
elif operation == 'setaclprivate':
try:
data = []
with open(file_to_write) as inFile:
data = json.load(inFile)
set_acl_private(data, bucketname, exclude)
logger.info("Task completed. ACL of " + bucketname + " objects set to private.")
except OSError:
logger.error("File not accessible")
sys.exit(1)
elif operation == 'revertacl':
try:
data = []
with open(file_to_write) as inFile:
data = json.load(inFile)
revert_s3_acl(data, bucketname, exclude)
logger.info("Task completed. ACL of " + bucketname + " objects reverted to given state")
except OSError:
logger.error("File not accessible")
sys.exit(1)
else:
logger.error("Invalid Operation. Please enter valid operation. Operation supported are i.e 1- getacl "
"2- setaclprivate 3- revertacl ") # Will send the errors to the file
sys.exit(0)
if __name__ == '__main__':
controller()