forked from opensearch-project/opensearch-build
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_cluster.py
128 lines (100 loc) · 3.43 KB
/
test_cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
import abc
import os
from contextlib import contextmanager
from test_workflow.test_recorder.test_result_data import TestResultData
class TestCluster(abc.ABC):
"""
Abstract base class for all types of test clusters.
"""
def __init__(
self,
work_dir,
component_name,
component_test_config,
security_enabled,
additional_cluster_config,
save_logs
):
self.work_dir = os.path.join(work_dir, "local-test-cluster")
self.component_name = component_name
self.component_test_config = component_test_config
self.security_enabled = security_enabled
self.additional_cluster_config = additional_cluster_config
self.save_logs = save_logs
self.all_services = []
self.termination_result = None
@classmethod
@contextmanager
def create(cls, *args):
"""
Set up the cluster. When this method returns, the cluster must be available to take requests.
Throws ClusterCreationException if the cluster could not start for some reason. If this exception is thrown, the caller does not need to call "destroy".
"""
cluster = cls(*args)
try:
cluster.start()
yield cluster.endpoint(), cluster.port()
finally:
cluster.terminate()
def start(self):
os.makedirs(self.work_dir, exist_ok=True)
self.all_services = [self.service] + self.dependencies
for service in self.all_services:
service.start()
for service in self.all_services:
service.wait_for_service()
def terminate(self):
if self.service:
self.termination_result = self.service.terminate()
for service in self.dependencies:
termination_result = service.terminate()
self.__save_test_result_data(termination_result)
if not self.termination_result:
raise ClusterServiceNotInitializedException()
self.__save_test_result_data(self.termination_result)
def __save_test_result_data(self, termination_result):
test_result_data = TestResultData(
self.component_name,
self.component_test_config,
termination_result.return_code,
termination_result.stdout_data,
termination_result.stderr_data,
termination_result.log_files
)
self.save_logs.save_test_result_data(test_result_data)
def endpoint(self):
return "localhost"
@abc.abstractmethod
def port(self):
"""
Get the port that this cluster is listening on.
"""
pass
@abc.abstractproperty
def service(self):
"""
The main service running in this cluster.
"""
pass
@abc.abstractproperty
def dependencies(self):
"""
The dependencies running in this cluster.
"""
pass
class ClusterCreationException(Exception):
"""
Indicates that cluster creation failed for some reason.
"""
pass
class ClusterServiceNotInitializedException(Exception):
"""
Indicates that the service running in the cluster is not initialized.
"""
def __init__(self):
super().__init__("Service is not initialized")