Skip to content

Commit

Permalink
clang format
Browse files Browse the repository at this point in the history
  • Loading branch information
dblitt committed Dec 2, 2024
1 parent 2c017ea commit ccfd878
Showing 1 changed file with 29 additions and 21 deletions.
50 changes: 29 additions & 21 deletions batch_job/src/batch_queue_flux.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ static batch_queue_id_t batch_queue_flux_submit(struct batch_queue *q, struct ba
return -1;
}
char buffer[BUFSIZ];
while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), archive_remove_pipe) > 0) {}
while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), archive_remove_pipe) > 0) {
}
pclose(archive_remove_pipe);

// Only enable the stage-in option if we have files in the archive
Expand All @@ -92,13 +93,14 @@ static batch_queue_id_t batch_queue_flux_submit(struct batch_queue *q, struct ba
if (!archive_create_pipe) {
return -1;
}
while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), archive_create_pipe) > 0) {}
while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), archive_create_pipe) > 0) {
}
int archive_create_status = WEXITSTATUS(pclose(archive_create_pipe));
if (archive_create_status != EXIT_SUCCESS) {
debug(D_BATCH, "flux failed to create archive with file %s", bf->outer_name);
return -1;
}

free(dirc);
free(basec);
free(command);
Expand All @@ -119,13 +121,13 @@ static batch_queue_id_t batch_queue_flux_submit(struct batch_queue *q, struct ba
jx_export(bt->envlist);
}

char *submit_command = string_format("flux submit %s --flags=waitable --nodes=1 --cores=%"PRId64" --gpus-per-node=%"PRId64" sh -c 'cd $FLUX_JOB_TMPDIR && %s' | flux job id --to=dec", flux_stage_in ? "-o stage-in" : "", cores, gpus, bt->command);
char *submit_command = string_format("flux submit %s --flags=waitable --nodes=1 --cores=%" PRId64 " --gpus-per-node=%" PRId64 " sh -c 'cd $FLUX_JOB_TMPDIR && %s' | flux job id --to=dec", flux_stage_in ? "-o stage-in" : "", cores, gpus, bt->command);
FILE *submit_pipe = popen(submit_command, "r");
free(submit_command);

uint64_t flux_job_id;
while (fgets(buffer, sizeof(buffer), submit_pipe)) {
if (sscanf(buffer, "%"PRIu64, &flux_job_id) == 1) {
if (sscanf(buffer, "%" PRIu64, &flux_job_id) == 1) {
batch_queue_id_t job_id = count++;
struct batch_job_info *info = calloc(1, sizeof(*info));
info->submitted = time(0);
Expand All @@ -138,20 +140,21 @@ static batch_queue_id_t batch_queue_flux_submit(struct batch_queue *q, struct ba

pclose(submit_pipe);

debug(D_BATCH, "created job_id %"PRId64" with flux_job_id %"PRIu64, job_id, flux_job_id);
debug(D_BATCH, "created job_id %" PRId64 " with flux_job_id %" PRIu64, job_id, flux_job_id);
return job_id;
}
}

return -1;
}

static void fill_batch_job_info(struct batch_job_info *info_out, uint64_t flux_job_id) {
static void fill_batch_job_info(struct batch_job_info *info_out, uint64_t flux_job_id)
{
if (!info_out) {
return;
}

char *command = string_format("flux jobs --json %"PRIu64" 2> /dev/null", flux_job_id);
char *command = string_format("flux jobs --json %" PRIu64 " 2> /dev/null", flux_job_id);
FILE *pipe = popen(command, "r");
free(command);
command = NULL;
Expand Down Expand Up @@ -194,7 +197,7 @@ static batch_queue_id_t batch_queue_flux_wait_jobid(struct batch_queue *q, struc

char *wait_command;
if (wait_flux_job_id != 0) {
wait_command = string_format("timeout %ds flux job wait %"PRIu64" 2>&1", timeout, wait_flux_job_id);
wait_command = string_format("timeout %ds flux job wait %" PRIu64 " 2>&1", timeout, wait_flux_job_id);
} else {
wait_command = string_format("timeout %ds flux job wait 2>&1", timeout);
}
Expand All @@ -206,7 +209,8 @@ static batch_queue_id_t batch_queue_flux_wait_jobid(struct batch_queue *q, struc
}

char wait_output[BUFSIZ];
while (fread(wait_output, sizeof(char), sizeof(wait_output) / sizeof(char), wait_pipe) > 0) {}
while (fread(wait_output, sizeof(char), sizeof(wait_output) / sizeof(char), wait_pipe) > 0) {
}
string_chomp(wait_output);
int wait_status = WEXITSTATUS(pclose(wait_pipe));

Expand All @@ -229,7 +233,7 @@ static batch_queue_id_t batch_queue_flux_wait_jobid(struct batch_queue *q, struc
char convert_output[BUFSIZ];
uint64_t flux_job_id;
while (fgets(convert_output, sizeof(convert_output), convert_pipe)) {
if (sscanf(convert_output, "%"PRIu64, &flux_job_id) == 1) {
if (sscanf(convert_output, "%" PRIu64, &flux_job_id) == 1) {
struct flux_job_info *job_info = itable_lookup(flux_job_info_table, flux_job_id);
if (job_info) {
pclose(convert_pipe);
Expand All @@ -243,7 +247,8 @@ static batch_queue_id_t batch_queue_flux_wait_jobid(struct batch_queue *q, struc
}
}

static batch_queue_id_t batch_queue_flux_wait(struct batch_queue *q, struct batch_job_info *info_out, time_t stoptime) {
static batch_queue_id_t batch_queue_flux_wait(struct batch_queue *q, struct batch_job_info *info_out, time_t stoptime)
{
return batch_queue_flux_wait_jobid(q, info_out, stoptime, 0);
}

Expand All @@ -254,15 +259,16 @@ static int batch_queue_flux_remove(struct batch_queue *q, batch_queue_id_t jobid
return 0;
}

char *kill_command = string_format("flux job kill %"PRIu64" 2>&1", info->flux_job_id);
char *kill_command = string_format("flux job kill %" PRIu64 " 2>&1", info->flux_job_id);
FILE *kill_pipe = popen(kill_command, "r");
free(kill_command);
if (!kill_pipe) {
return 0;
}

char buffer[BUFSIZ];
while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), kill_pipe) > 0) {}
while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), kill_pipe) > 0) {
}

int kill_status = WEXITSTATUS(pclose(kill_pipe));
if (kill_status == EXIT_SUCCESS) {
Expand All @@ -275,14 +281,15 @@ static int batch_queue_flux_remove(struct batch_queue *q, batch_queue_id_t jobid
}

// Wait timed out, so kill it for real
kill_command = string_format("flux job kill -s SIGKILL %"PRIu64" 2>&1", info->flux_job_id);
kill_command = string_format("flux job kill -s SIGKILL %" PRIu64 " 2>&1", info->flux_job_id);
kill_pipe = popen(kill_command, "r");
free(kill_command);
if (!kill_pipe) {
return 0;
}

while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), kill_pipe) > 0) {}
while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), kill_pipe) > 0) {
}
pclose(kill_pipe);

// Wait on job, then return
Expand All @@ -307,11 +314,11 @@ static int batch_queue_flux_create(struct batch_queue *q)
}

char buffer[BUFSIZ];
while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), uptime_pipe) > 0) {}
while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), uptime_pipe) > 0) {
}

int uptime_status = WEXITSTATUS(pclose(uptime_pipe));
if (uptime_status != EXIT_SUCCESS)
{
if (uptime_status != EXIT_SUCCESS) {
debug(D_BATCH, "batch_queue_flux_create failed: not connected to flux environment");
return -1;
}
Expand All @@ -322,12 +329,13 @@ static int batch_queue_flux_create(struct batch_queue *q)
return 0;
}

static int batch_queue_flux_free (struct batch_queue *Q)
static int batch_queue_flux_free(struct batch_queue *Q)
{
if (flux_job_info_table) {
struct flux_job_info *info;
uint64_t flux_job_id;
ITABLE_ITERATE(flux_job_info_table, flux_job_id, info) {
ITABLE_ITERATE(flux_job_info_table, flux_job_id, info)
{
delete_flux_job_info(info);
}
itable_delete(flux_job_info_table);
Expand Down

0 comments on commit ccfd878

Please sign in to comment.