-
Notifications
You must be signed in to change notification settings - Fork 0
/
obfarm.py
191 lines (158 loc) · 5.16 KB
/
obfarm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# -*- coding: utf-8 -*-
# owner: niyuhang.nyh
import copy
import os
import sys
import time
import oss2
import json
import requests
from enum import Enum
OUTPUT = {}
RESULT_FILE_KEY = "farm/results/"
TASK_QUEUE_FILE_KEY = "farm/jobs/{}.json"
def _range(start, last):
def to_str(pos):
if pos is None:
return ''
else:
return str(pos)
return to_str(start) + '-' + to_str(last)
def _make_range_string(range):
if range is None:
return ''
start = range[0]
last = range[1]
if start is None and last is None:
return ''
return 'bytes=' + _range(start, last)
class OssProxy:
def __init__(self, endpoint=""):
self.endpoint = endpoint
def get_object(self, key, _range=None):
"https://farm-ce.oss-cn-heyuan.aliyuncs.com/farm/results/6622943107.output"
url = "{}/{}".format(self.endpoint, key)
headers = {}
if _range is not None:
# 只支持从后续字符开始
_range = (_range, None)
headers.update({"range": _make_range_string(_range)})
res = requests.get(url, headers=headers)
result = res.text
print(res.headers)
return result
def get_object_meta(self, key):
"""修改文件内容"""
url = "{}/{}".format(self.endpoint, key)
headers = {}
res = requests.head(url, headers=headers)
return res.headers
class TaskStatusEnum(Enum):
submitting = 0
pending = 1
running = 2
stopping = 3
success = 4
fail = -1
kill = -2
timeout = -3
submit_task_fail = -4
def request(method, url, params=None, payload=None, timeout=10, data=None, without_check_status=False):
params = params or {}
try:
response = requests.request(
method,
url,
params=params,
json=payload,
data=data,
timeout=timeout
)
if not without_check_status and response.status_code >= 300:
try:
msg = response.json()["msg"]
except:
msg = response.text
print("[ERROR] 错误信息:{}".format(msg))
exit(1)
return response
except Exception:
import traceback
traceback.print_exc()
print("请求失败,出现异常,请联系管理人员处理")
if not without_check_status:
exit(1)
def monitor_tasks(oss_proxy: OssProxy, github_pipeline_id, timeout):
end_time = time.time() + int(timeout)
# 监控任务,并且不断的打印输出,直到任务结束
end = 0
end_task = False
print("{}OUTPUT{}".format("-" * 20, "-" * 20))
while time.time() <= end_time:
# 每次刷新20
if end_task is True:
pass
# 检查任务是不是结束了
task_data = get_task_res(oss_proxy, github_pipeline_id)
if task_data:
end_task = True
output = get_task_stage_output(oss_proxy, github_pipeline_id, end)
if output is None:
continue
end += len(output)
need_print_output = output.decode()
if need_print_output and need_print_output.strip():
print(need_print_output, end="")
time.sleep(1)
if task_data is not None:
task_status = int(task_data["status"])
if task_status <= TaskStatusEnum.fail.value:
print(TaskStatusEnum._value2member_map_[task_status])
OUTPUT.update({"success": -1})
return False
elif task_status >= TaskStatusEnum.success.value:
print(TaskStatusEnum._value2member_map_[task_status])
OUTPUT.update({"success": 1})
return True
time.sleep(5)
else:
...
def get_task_res(oss_proxy: OssProxy, github_pipeline_id):
try:
result_key = RESULT_FILE_KEY + "{}.json".format(github_pipeline_id)
origin_task_data = oss_proxy.get_object(result_key)
return json.loads(origin_task_data)
except:
return
def get_task_stage_output(oss_proxy: OssProxy, github_pipeline_id, start):
output_key = RESULT_FILE_KEY + "{}.output".format(github_pipeline_id)
if start:
output_meta = oss_proxy.get_object_meta(output_key)
filesize = int(output_meta["Content-Length"])
if start >= filesize:
# 超出了之后就不获取了
start = filesize - 1
try:
return oss_proxy.get_object(output_key, _range=(start, None))
except:
return b""
def main(pipeline_id, timeout):
print("create a new task")
print("working....")
oss_proxy = OssProxy("https://farm-ce.oss-cn-heyuan.aliyuncs.com")
result = monitor_tasks(oss_proxy, pipeline_id, timeout)
set_output(OUTPUT)
if not result:
exit(1)
def set_output(output):
values = ";".join(["{}={}".format(key, value) for key, value in output.items()])
os.system(
'echo "{}" >> $GITHUB_OUTPUT'.format(values)
)
if __name__ == "__main__":
print(sys.argv)
if len(sys.argv) < 3:
print("缺失相关参数")
OUTPUT.update({"success": -1})
exit(1)
main(sys.argv[1], sys.argv[2])