-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate_jobs.py
127 lines (112 loc) · 5.55 KB
/
create_jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from bullmq import Queue as bullQueue
import asyncio
import requests
import base64 # type: ignore
import os
from zipfile import ZipFile
from requests_toolbelt import MultipartEncoder
from dotenv import dotenv_values
config = dotenv_values(".env")
FIELD_ID = 167
MINUTES = 30
opts = {
"connection": {
"host": config['REDIS_HOST'],
'port': config['REDIS_PORT'],
'username': config['REDIS_USER'],
'password': config['REDIS_PASSWORD']
},
}
queue = bullQueue(config['REDIS_QUEUE'], opts=opts)
login_url = f"{config['BREEDBASE_URL']}/ajax/user/login?username={config['BREEDBASE_USER']}&password={config['BREEDBASE_PASSWORD']}"
brapi_login_url = f"{config['BREEDBASE_URL']}/brapi/v2/token?username={config['BREEDBASE_USER']}&password={config['BREEDBASE_PASSWORD']}"
brapi_images_url = f"{config['BREEDBASE_URL']}/brapi/v2/images?pageSize=1000"
images_delete_queries = "?object_id={}&action=confirm_delete"
images_delete_url = f"{config['BREEDBASE_URL']}/image/ajax/image_ajax_form.pl"
upload_additional_file_url = f"{config['BREEDBASE_URL']}/ajax/breeders/trial/{FIELD_ID}/upload_additional_file"
def upload_additional_file(session, tmp_out_dir, rel_file_path):
m = MultipartEncoder(
fields={'trial_upload_additional_file': (
rel_file_path, open(os.path.join(tmp_out_dir, rel_file_path), 'rb'))}
)
res = session.post(upload_additional_file_url, data=m, headers={
'Content-Type': m.content_type})
file_id = res.json()['file_id']
return (file_id, f"{config['BREEDBASE_URL']}/breeders/phenotyping/download/{file_id}")
async def processor():
with requests.Session() as s_brapi:
with requests.Session() as s:
s.get(login_url)
res = s_brapi.get(brapi_login_url)
access_token = res.json().get("access_token")
# Sample Job
await queue.add("job_sample", {
"audio_url": "https://demo.breedbase.org/breeders/phenotyping/download/274",
"log_url": "https://demo.breedbase.org/breeders/phenotyping/download/273",
"trait_url": "https://demo.breedbase.org/breeders/phenotyping/download/272",
"field_id": 167}, {})
with s_brapi.get(brapi_images_url) as res:
images = res.json()['result']['data']
for image_data in images:
if image_data.get('description'):
audio_url = ""
print(image_data['imageDbId'], "processing..")
# Extract files
imgDbId = image_data['imageDbId']
description = bytes(image_data['description'], 'utf-8')
os.makedirs("tmp", exist_ok=True)
try:
with open("tmp/decode_outfile.zip", 'wb') as f:
decoded = base64.b64decode(description)
f.write(decoded)
with ZipFile("tmp/decode_outfile.zip", 'r') as zObject:
zObject.extractall(path=f"tmp/{imgDbId}")
except Exception:
print(f"{imgDbId} skipping")
continue
OUTPUT_ZIP_DIR = f"tmp/{imgDbId}"
# DEPRECATE: This can use tmp/{imgDbId}/Output folder as output folder of files
if not os.path.exists(OUTPUT_ZIP_DIR + "/Output"):
OUTPUT_ZIP_DIR += "/Output"
# Upload extracted file to Breedbase trial additional files section
files = os.listdir(OUTPUT_ZIP_DIR)
audio_url = ""
log_url = ""
trait_url = ""
for file in files:
if file.endswith(".wav") or file.endswith('mp4') or file.endswith('mp3'):
print("Audio", file)
audio_file_id, audio_url = upload_additional_file(
s, OUTPUT_ZIP_DIR, file)
if file.endswith('csv'):
if "log" in file:
print("Geonav Log", file)
log_file_id, log_url = upload_additional_file(
s, OUTPUT_ZIP_DIR, file)
if "trait" in file:
print("Trait file", file)
trait_file_id, trait_url = upload_additional_file(
s, OUTPUT_ZIP_DIR, file)
job_data = {
'audio_url': audio_url,
'log_url': log_url,
'trait_url': trait_url,
'field_id': FIELD_ID,
}
await queue.add(f"job_{imgDbId}", job_data, {})
print(f"{imgDbId} clearing...")
s.get(images_delete_url +
images_delete_queries.format(imgDbId))
async def main():
try:
while True:
print("Processor awake..")
await processor()
print("Processor asleep..")
await asyncio.sleep(60*MINUTES)
except Exception as e:
print(e)
# Close when done adding jobs
await queue.close()
if __name__ == "__main__":
asyncio.run(main())