forked from iobush/aws-s3-bruteforce
-
Notifications
You must be signed in to change notification settings - Fork 0
/
check_bucket.py
executable file
·130 lines (114 loc) · 5.1 KB
/
check_bucket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python
import random, requests, time, re
from constants import base_url
from logger import *
no_bucket_responses = [
"NoSuchBucket",
"InvalidBucketName",
]
denied_responses = [
"AccessDenied",
"AllAccessDisabled",
]
#S3 Connector
from boto.s3.connection import S3Connection
explained = {
'READ': 'readable',
'WRITE': 'writable',
'READ_ACP': 'permissions readable',
'WRITE_ACP': 'permissions writeable',
'FULL_CONTROL': 'Full Control'
}
groups_to_check = {
'http://acs.amazonaws.com/groups/global/AllUsers': 'Everyone',
'http://acs.amazonaws.com/groups/global/AuthenticatedUsers': 'Authenticated AWS users'
}
def check_s3_bucket(bucket_name, access_key, secret_key, output_file, redirect=False):
#Initialize initial bucket variables
bucket_result = {
"name":bucket_name,
"url":"{base_url}{bucket_name}".format(base_url=base_url, bucket_name=bucket_name),
"exists":False,
"public":None,
"authenticated_access":False,
"empty":None,
"error":False,
"redirected":False,
}
#Check if you are in a redirect or are supposed to be redirected.
if redirect:
bucket_result["redirected"] = True
bucket_result["url"] = "https://{bucket_name}.s3.amazonaws.com".format(bucket_name=bucket_name)
request = get_bucket(url=bucket_result["url"])
else:
request = get_bucket(url=bucket_result["url"])
#If a redirect is seen, go to it
if "<Endpoint>" in request.text or "PermanentRedirect" in request.text:
return check_s3_bucket(
bucket_name=re.search("<Endpoint>(.+?)</Endpoint>", request.text).group(1).replace(".s3.amazonaws.com",""),
access_key=access_key,
secret_key=secret_key,
output_file=output_file,
redirect=True
)
#Check to see if the bucket does not exist
for no_bucket_response in no_bucket_responses:
if "<Code>{message}</Code>".format(message=no_bucket_response) in request.text:
bucket_result["error"] = no_bucket_response
# log_bucket_found(bucket_result=bucket_result, output_file=output_file) #Not going to log non-existant buckets
return
for denied_response in denied_responses:
if "<Code>{message}</Code>".format(message=denied_response) in request.text:
bucket_result["exists"] = True
bucket_result["public"] = False
bucket_result["error"] = denied_response
if denied_response == "AccessDenied":
if access_key and secret_key:
try:
conn = S3Connection(access_key, secret_key)
bucket = conn.get_bucket(bucket_name)
issues = check_acl(bucket)
if issues:
bucket_result["authenticated_access"] = True
print '''
************************************************************************************
AUTHENTICATED ACCESS - %s
************************************************************************************
''' % (bucket_result["url"])
# This is how you can get the keys if you want it. Using it to test to see if there are any files
#This might take a while and seem like it's paused
for key in bucket.list():
bucket_result["empty"] = False
except Exception as e:
pass
#Denied response seen so break from the check
break
#At this point the bucket exists, just seeing if it is empty
else:
bucket_result["exists"] = True
bucket_result["public"] = True
if "<Key>" in request.text:
bucket_result["empty"] = False
else:
bucket_result["empty"] = True
#Log the final result for the bucket
log_bucket_found(bucket_result=bucket_result, output_file=output_file)
def get_bucket(url):
#Get the response to the bucket's access, returning if there was an error
try:
return requests.get(url, verify=False)
except:
return None
def check_acl(bucket):
issues = []
acp = bucket.get_acl()
for grant in acp.acl.grants:
if grant.type == 'Group' and grant.uri in groups_to_check:
issues.append(
{
"permission" : grant.permission,
"explained" : explained[grant.permission],
"grantee" : groups_to_check[grant.uri]
}
)
return issues