-
Notifications
You must be signed in to change notification settings - Fork 0
/
jobfork.c
162 lines (135 loc) · 3.89 KB
/
jobfork.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include "jobfork.h"
#include <ctype.h>
#ifdef CMD_MPI
#include <mpi.h>
#else
#ifdef CMD_OMP
#include <omp.h>
#endif
#endif
int main(int argc, char *argv[]) {
char *cmd;
int ncmd, ret;
CHILD_INFO ci;
if (argc != 2) {
fprintf(stderr, "Usage: %s job_list\n"
"joblist: a text file with each line being a job (command)\n",
argv[0]);
return ERR_ARG;
}
term = 0;
#ifdef CMD_MPI
int myrank, tasknum;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
MPI_Comm_size(MPI_COMM_WORLD, &tasknum);
if (tasknum < 2) {
MSG_ERR("unable to distribute jobs with %d MPI tasks.\n", tasknum);
MPI_Finalize();
return ERR_OTHER;
}
if (myrank == 0) { /* manager */
#endif
if ((ret = read_jobs(argv[1]))) return ret;
ncmd = cstat.num;
printf("%d jobs are found in the list file.\n", ncmd);
ret = snprintf(cstat.fname_rst, CMD_BUF, "%s.rst", argv[1]);
if (ret < 0 || ret >= CMD_BUF) {
MSG_ERR("the filename is too long to create the restart file.\n");
return ERR_STRING;
}
cstat.status = calloc(ncmd, sizeof(char));
if (!(cstat.status)) {
MSG_ERR("failed to allocate memory for recording the job status.\n");
return ERR_MEMORY;
}
#ifdef CMD_MPI
printf("Parallelising jobs with MPI: %d tasks, %d workers.\n",
tasknum, tasknum - 1);
}
MPI_Bcast(&cstat.len, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&cstat.num, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
#endif
#ifdef CMD_MPI
if (myrank == 0) { /* signal handling only by manager */
#endif
if (atexit(save_jobs)) {
MSG_ERR("unable to register exit functions.\n"
"Restart file may not be created.\n");
}
struct sigaction sa;
sa.sa_handler = &terminate;
sa.sa_flags = SA_RESTART;
sigfillset(&sa.sa_mask);
if (sigaction(SIGHUP, &sa, NULL) == -1 ||
sigaction(SIGINT, &sa, NULL) == -1 ||
sigaction(SIGTERM, &sa, NULL) == -1 ||
sigaction(SIGQUIT, &sa, NULL) == -1) {
MSG_ERR("unable to catch signals.\n"
"Restart file may not be created.\n");
}
#ifdef CMD_MPI
/* jobs distributed by manager */
int *nsent;
nsent = calloc(tasknum - 1, sizeof(int));
if (!nsent) {
MSG_ERR("failed to allocate memory for the manager.\n");
MPI_Abort(MPI_COMM_WORLD, ERR_MEMORY);
return ERR_MEMORY;
}
mpi_manager(tasknum - 1, nsent);
free(nsent);
}
else { /* workers */
cmd = calloc(cstat.len, sizeof(char));
if (!cmd) {
MSG_ERR("failed to allocate memory for the workers.\n");
MPI_Abort(MPI_COMM_WORLD, ERR_MEMORY);
return ERR_MEMORY;
}
mpi_worker(cmd, &ci);
free(cmd);
}
MPI_Finalize();
#else
#ifdef CMD_OMP
printf("Parallelising jobs with OpenMP: %d threads.\n",
omp_get_max_threads());
int id = 0;
char line[CMD_BUF];
#pragma omp parallel for private(ci,line,cmd) firstprivate(id) schedule(dynamic)
for (int i = 0; i < ncmd; i++) {
cmd = cstat.cmd + i * cstat.len;
printf("-> Allocating command to thread %d (job index: %d):\n %s\n",
omp_get_thread_num(), id, cmd);
cstat.status[i] = JOB_START;
if ((ret = create_child(cmd, &ci))) {
MSG_ERR("failed to execute command `%s'.\n", cmd);
cstat.status[i] = JOB_FAIL;
}
else {
memset(line, 0, CMD_BUF);
while (fgets(line, CMD_BUF, ci.out) != NULL) {
MSG_CHILD_STDOUT(omp_get_thread_num(), id, line);
memset(line, 0, CMD_BUF);
}
while (fgets(line, CMD_BUF, ci.err) != NULL) {
MSG_CHILD_STDERR(omp_get_thread_num(), id, line);
memset(line, 0, CMD_BUF);
}
if ((ret = close_child(&ci))) {
MSG_ERR("unable to finish command `%s'.\n", cmd);
cstat.status[i] = JOB_FAIL;
}
else cstat.status[i] = JOB_DONE;
}
id++;
}
#endif
#endif
return 0;
}
void terminate(int sig) {
save_jobs();
}