-
Notifications
You must be signed in to change notification settings - Fork 2
/
task_scheduler.py
159 lines (141 loc) · 4.92 KB
/
task_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: Tal Golan
"""
## Manage HPC jobs with sqlite
#
#
## A simple example (uncomment the code below to try it out)
#
## We start a new database.
## Here, we define that if a job runs for more than 6 hours it is considered lost and is restarted.
#
# import numpy as np
# sch=TaskScheduler(db_path='/home/tal/scheduler.db',max_job_time_in_seconds=6*3600)
#
# for gamma in [-1e3,1,1e5]:
# for category in ['cats','dogs']:
# job_id={'gamme':gamma,'category':category} # any JSONable type will do (e.g. numbers, strings, dictionaries, lists...)
# success=sch.start_job(job_id)
# if not success:
# # the job already started / completed. skip to the next loop
# continue
#
# # computation comes here
# results=np.random.uniform(size=(4,2))
#
# # we are done. let's set the 'done' flag.
# sch.job_done(job_id,results=results) # saving results is optional. they are converted to string.
#
## if we are done, read the table to pandas:
# DF=sch.to_pandas()
# print(DF)
import sqlite3
import json, os, time, math
import pandas as pd
class TaskScheduler:
def __init__(self, db_path="scheduler.db", max_job_time_in_seconds=12 * 3600):
self.db_path = db_path
self.max_job_time_in_seconds = max_job_time_in_seconds
if not os.path.isfile(self.db_path):
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
# Create table
c.execute(
"""CREATE TABLE jobs
(job_id TEXT UNIQUE, is_completed BOOLEAN, time_started INTEGER, results TEXT)"""
)
conn.commit() # Save (commit) the changes
conn.close()
def _execute_sqlite(self, query, parameters=None):
if parameters is None:
parameters = ()
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
try:
c.execute(query, parameters)
conn.commit() # Save (commit) the changes
success = True
except Exception as e:
if (
str(e) != "UNIQUE constraint failed: jobs.job_id"
): # show error, unless it's a duplicate unique job_id
print("sqlite error:", str(e))
print(str(e))
success = False
finally:
conn.close()
return success
def _sqlite_fetchone(self, query, parameters=None):
if parameters is None:
parameters = ()
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
try:
c.execute(query, parameters)
fetched = c.fetchone()[0]
except Exception as e:
print("sqlite error:", str(e))
fetched = None
finally:
conn.close()
return fetched
def start_job(self, job_id):
job_id_json = json.dumps(job_id)
time_started = math.ceil(time.time())
# delete old incompleted jobs
self._execute_sqlite(
"DELETE FROM jobs WHERE (job_id = ? AND time_started < ? AND is_completed = 0)",
(job_id_json, time_started - self.max_job_time_in_seconds),
)
# try inserting new job
success = self._execute_sqlite(
"INSERT INTO jobs (job_id, is_completed, time_started) VALUES (?, 0, ?)",
(job_id_json, time_started),
)
# when failed, provide feedback
if success == False:
is_completed = self._sqlite_fetchone(
"SELECT is_completed FROM jobs WHERE job_id = ?", (job_id_json,)
)
if is_completed:
print("{} job already completed.".format(job_id_json))
else:
print("{} job already started.".format(job_id_json))
return success
def job_done(self, job_id, results=None):
if results is None:
self._execute_sqlite(
"INSERT OR REPLACE INTO jobs (job_id,is_completed) VALUES(?, 1)",
(json.dumps(job_id),),
)
else:
self._execute_sqlite(
"INSERT OR REPLACE INTO jobs (job_id,is_completed,results) VALUES(?, 1,?)",
(json.dumps(job_id), str(results)),
)
def delete_job(self, job_id):
self._execute_sqlite(
"DELETE FROM jobs WHERE (job_id = ? )", (json.dumps(job_id),)
)
def delete_all_running_jobs(
self,
):
self._execute_sqlite("DELETE FROM jobs WHERE (is_completed = 0 )")
def to_pandas(
self,
):
conn = sqlite3.connect(self.db_path)
try:
df = pd.read_sql_query("SELECT * FROM jobs", conn)
except Exception as e:
print("sqlite error:", str(e))
df = None
finally:
conn.close()
try:
df["job_id"] = [json.loads(s) for s in df["job_id"]]
except:
pass
return df