-
Notifications
You must be signed in to change notification settings - Fork 0
/
boot.py
executable file
·144 lines (112 loc) · 4.55 KB
/
boot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
from datetime import datetime, timedelta
from urllib.parse import urlunparse
from jinja2 import Template
from cachetools.func import lru_cache
import logging
import release
import os.path
from cryptography.hazmat.primitives import serialization
from app import app
from data.model import ServiceKeyDoesNotExist
from data.model.release import set_region_release
from data.model.service_keys import get_service_key
from util.config.database import sync_database_with_config
from util.generatepresharedkey import generate_key
from _init import CONF_DIR
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def get_audience():
audience = app.config.get("JWTPROXY_AUDIENCE")
if audience:
return audience
scheme = app.config.get("PREFERRED_URL_SCHEME")
hostname = app.config.get("SERVER_HOSTNAME")
# hostname includes port, use that
if ":" in hostname:
return urlunparse((scheme, hostname, "", "", "", ""))
# no port, guess based on scheme
if scheme == "https":
port = "443"
else:
port = "80"
return urlunparse((scheme, hostname + ":" + port, "", "", "", ""))
def _verify_service_key():
try:
with open(app.config["INSTANCE_SERVICE_KEY_KID_LOCATION"]) as f:
quay_key_id = f.read()
try:
get_service_key(quay_key_id, approved_only=False)
assert os.path.exists(app.config["INSTANCE_SERVICE_KEY_LOCATION"])
return quay_key_id
except ServiceKeyDoesNotExist:
logger.exception(
"Could not find non-expired existing service key %s; creating a new one",
quay_key_id,
)
return None
# Found a valid service key, so exiting.
except IOError:
logger.exception("Could not load existing service key; creating a new one")
return None
def setup_jwt_proxy():
"""
Creates a service key for quay to use in the jwtproxy and generates the JWT proxy configuration.
"""
if os.path.exists(os.path.join(CONF_DIR, "jwtproxy_conf.yaml")):
# Proxy is already setup. Make sure the service key is still valid.
quay_key_id = _verify_service_key()
if quay_key_id is not None:
return
# Ensure we have an existing key if in read-only mode.
if app.config.get("REGISTRY_STATE", "normal") == "readonly":
quay_key_id = _verify_service_key()
if quay_key_id is None:
raise Exception("No valid service key found for read-only registry.")
else:
# Generate the key for this Quay instance to use.
minutes_until_expiration = app.config.get("INSTANCE_SERVICE_KEY_EXPIRATION", 120)
expiration = datetime.utcnow() + timedelta(minutes=minutes_until_expiration)
quay_key, quay_key_id = generate_key(
app.config["INSTANCE_SERVICE_KEY_SERVICE"], get_audience(), expiration_date=expiration
)
with open(app.config["INSTANCE_SERVICE_KEY_KID_LOCATION"], mode="w") as f:
f.truncate(0)
f.write(quay_key_id)
with open(app.config["INSTANCE_SERVICE_KEY_LOCATION"], mode="wb") as f:
f.truncate(0)
f.write(
quay_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
# Generate the JWT proxy configuration.
audience = get_audience()
registry = audience + "/keys"
security_issuer = app.config.get("SECURITY_SCANNER_ISSUER_NAME", "security_scanner")
with open(os.path.join(CONF_DIR, "jwtproxy_conf.yaml.jnj")) as f:
template = Template(f.read())
rendered = template.render(
conf_dir=CONF_DIR,
audience=audience,
registry=registry,
key_id=quay_key_id,
security_issuer=security_issuer,
service_key_location=app.config["INSTANCE_SERVICE_KEY_LOCATION"],
)
with open(os.path.join(CONF_DIR, "jwtproxy_conf.yaml"), "w") as f:
f.write(rendered)
def main():
if not app.config.get("SETUP_COMPLETE", False):
raise Exception(
"Your configuration bundle is either not mounted or setup has not been completed"
)
sync_database_with_config(app.config)
setup_jwt_proxy()
# Record deploy
if release.REGION and release.GIT_HEAD:
set_region_release(release.SERVICE, release.REGION, release.GIT_HEAD)
if __name__ == "__main__":
main()