forked from redhat-qe-security/SC-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fixtures.py
211 lines (166 loc) · 6.65 KB
/
fixtures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import http.server
import ssl
import subprocess as subp
import sys
import threading
from time import sleep
import pexpect
import pytest
import python_freeipa as pipa
from SCAutolib.src import read_config, LIB_CERTS, LIB_KEYS
from SCAutolib.src.authselect import Authselect
from SCAutolib.src.utils import (run_cmd, check_output, edit_config_,
restart_service, backup_, restore_file_)
from SCAutolib.src.virt_card import VirtCard
import python_freeipa as pipa
class User:
ROOT_PASSWD = read_config("root_passwd")
USERNAME_LOCAL = None
PASSWD_LOCAL = None
PIN_LOCAL = None
def su_login_local_with_sc(self):
with Authselect(required=False):
with VirtCard(self.USERNAME_LOCAL, insert=True):
cmd = f'su - {self.USERNAME_LOCAL} -c "su - ' \
f'{self.USERNAME_LOCAL} -c whoami"'
output = run_cmd(cmd, passwd=self.PIN_LOCAL, pin=True)
check_output(output, expect=self.USERNAME_LOCAL,
zero_rc=True, check_rc=True)
def su_login_local_with_passwd(self):
with Authselect(required=False):
with VirtCard(self.USERNAME_LOCAL, insert=True):
cmd = f'su - {self.USERNAME_LOCAL} -c "su - ' \
f'{self.USERNAME_LOCAL} -c whoami"'
output = run_cmd(cmd, passwd=self.PASSWD_LOCAL, pin=False)
check_output(output, expect=self.USERNAME_LOCAL,
zero_rc=True, check_rc=True)
class LocalUser(User):
def __init__(self):
self.USERNAME_LOCAL = read_config("local_user.name")
self.PASSWD_LOCAL = read_config("local_user.passwd")
self.PIN_LOCAL = read_config("local_user.pin")
class IPAUser(User):
def __init__(self):
self.USERNAME = read_config("ipa_user.name")
self.PASSWD = read_config("ipa_user.passwd")
self.PIN = read_config("ipa_user.pin")
@pytest.fixture()
def edit_config(file_path, target, restore, restart):
"""Used for editing given configuration file. Arguments are based through
the pytest.mark.parametrize decorator"""
destination_path = backup_(file_path)
if type(target) == dict:
target = [target]
for t in target:
edit_config_(file_path, t["section"], t["key"], t["val"])
for service in restart:
restart_service(service)
yield
if restore:
restore_file_(destination_path, file_path)
for service in restart:
restart_service(service)
def local_user():
return LocalUser()
def ipa_user_():
return IPAUser()
@pytest.fixture(name="user")
def user_indirect():
"""Returns an object of local user"""
return local_user()
@pytest.fixture()
def ipa_user():
"""Returns an object of IPA user"""
return ipa_user_()
@pytest.fixture()
def backup(file_path, restore, restart):
assert type(file_path) == str
assert type(restore) == bool
assert (type(restart) == list) or (type(restart) == str)
target = backup_(file_path)
if type(restart) == str:
restart = [restart]
for service in restart:
restart_service(service)
yield
if restore:
restore_file_(target, file_path)
for service in restart:
restart_service(service)
@pytest.fixture(scope="function")
def user_shell():
"""Creates shell with some local user as a starting point for test."""
shell = pexpect.spawn("/usr/bin/sh -c 'su base-user'", encoding="utf-8")
shell.logfile = sys.stdout
return shell
@pytest.fixture(scope="function")
def root_shell():
"""Creates root shell."""
shell = pexpect.spawn("/usr/bin/sh ", encoding="utf-8")
shell.logfile = sys.stdout
return shell
@pytest.fixture
def ipa_meta_client():
"""Ready-to-user admin Meta Client for IPA server."""
hostname, passwd = read_config("ipa_server_hostname", "ipa_server_admin_passwd")
client = pipa.ClientMeta(hostname, verify_ssl=False)
client.login("admin", passwd)
return client
def _https_server(principal, ca, ipa_meta_client, *args, **kwargs):
server_address = ("127.0.0.1", 8888)
key = f"{LIB_KEYS}/key-{principal}.pem"
csr = f"{LIB_CERTS}/{principal}.csr"
cert_path = f"{LIB_CERTS}/cert-{principal}.pem"
subp.check_output(["openssl", "req", "-new", "-days", "365",
"-nodes", "-newkey", "rsa:4096", "-keyout", key,
"-out", csr, "-subj", f"/CN={principal}"],
encoding='utf-8')
with open(csr, "r") as f:
csr_content = f.read()
ca_cert = None
if ca == "ipa":
ca_cert = "/etc/ipa/ca.crt"
resp = ipa_meta_client.cert_request(a_csr=csr_content, o_principal=principal)
cert = resp["result"]["certificate"]
begin = "-----BEGIN CERTIFICATE-----"
end = "-----END CERTIFICATE-----"
cert = f"{begin}\n{cert}\n{end}"
with open(cert_path, "w") as f:
f.write(cert)
else:
raise Exception("Other then IPA CA is not implemented yet")
httpd = http.server.HTTPServer(server_address, http.server.SimpleHTTPRequestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket,
server_side=True,
certfile=cert_path,
keyfile=key,
ca_certs=ca_cert,
ssl_version=ssl.PROTOCOL_TLSv1_2,
cert_reqs=ssl.CERT_REQUIRED,
do_handshake_on_connect=True)
httpd.serve_forever()
@pytest.fixture
def https_server(principal, ca, ipa_meta_client):
"""Start HTTPS server """
try:
ipa_meta_client.user_add(principal, "https", "server", principal,
o_userpassword='redhat')
except pipa.exceptions.DuplicateEntry:
pass
server_t = threading.Thread(name='daemon_server',
args=(principal, ca, ipa_meta_client,),
daemon=True,
target=_https_server)
server_t.start()
sleep(5)
yield principal
server_t.join(timeout=1)
resp = ipa_meta_client.cert_find(user=principal)["result"]
assert len(resp) == 1, "Only one certificate should be matched. " \
f"Number of matched certs is {len(resp)}"
resp = resp[0]
assert resp["status"].lower() == "valid", "Certificate is not valid"
assert not resp["revoked"], "Certificate is revoked"
cert_base64 = resp["certificate"]
ipa_meta_client.user_remove_cert(a_uid=principal,
o_usercertificate=cert_base64)