Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make wrappers work with sporadic apps #5410

Merged
merged 5 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 98 additions & 6 deletions api/boinc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <sys/time.h>
Expand Down Expand Up @@ -144,6 +146,10 @@
// CPPFLAGS=-DGETRUSAGE_IN_TIMER_THREAD
#endif

// Anything shared between the worker and timer thread
// must be declared volatile to ensure that writes in one thread
// are seen immediately by the other.

const char* api_version = "API_VERSION_" PACKAGE_VERSION;
static APP_INIT_DATA aid;
static FILE_LOCK file_lock;
Expand All @@ -155,7 +161,7 @@
static volatile bool ready_to_checkpoint = false;
static volatile int in_critical_section = 0;
static volatile double last_wu_cpu_time;
static volatile bool standalone = false;
static volatile bool standalone = true;
static volatile double initial_wu_cpu_time;
static volatile bool have_new_trickle_up = false;
static volatile bool have_trickle_down = true;
Expand Down Expand Up @@ -194,7 +200,9 @@
bool send_remote_desktop_addr = false;
int app_min_checkpoint_period = 0;
// min checkpoint period requested by app
SPORADIC_AC_STATE ac_state;
static volatile SPORADIC_AC_STATE ac_state;
static volatile int ac_fd, ca_fd;
static volatile bool do_sporadic_files;

#define TIMER_PERIOD 0.1
// Sleep interval for timer thread;
Expand Down Expand Up @@ -518,6 +526,57 @@
return false;
}

// called once/sec in timer thread.
// Copy sporadic app messages to/from files (for wrappers)
//
static void sporadic_files() {
static time_t last_ac_mod_time = 0;
static SPORADIC_CA_STATE last_ca_state = CA_NONE;
char buf[256];

// if C->A state has changed, write to file
//
if (last_ca_state != boinc_status.ca_state) {
sprintf(buf, "%d\n", boinc_status.ca_state);
lseek(ca_fd, 0, SEEK_SET);

Check warning on line 541 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L539-L541

Added lines #L539 - L541 were not covered by tests
if (write(ca_fd, buf, sizeof(buf))) {}
// one way to avoid warnings
last_ca_state = boinc_status.ca_state;

Check warning on line 544 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L544

Added line #L544 was not covered by tests
}

// check if app has updated file with A->C state
//
struct stat sbuf;
int ret = fstat(ac_fd, &sbuf);

Check warning on line 550 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L550

Added line #L550 was not covered by tests
if (!ret) {
#ifdef _WIN32
time_t t = sbuf.st_mtime;
#elif defined(__APPLE__)
time_t t = sbuf.st_mtimespec.tv_sec;
#else
time_t t = sbuf.st_mtim.tv_sec;

Check warning on line 557 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L557

Added line #L557 was not covered by tests
#endif
if (t != last_ac_mod_time) {
lseek(ac_fd, 0, SEEK_SET);
int nc = read(ac_fd, buf, sizeof(buf));

Check warning on line 561 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L560-L561

Added lines #L560 - L561 were not covered by tests
if (nc>0) {
int val;
buf[nc] = 0;
int n = sscanf(buf, "%d", &val);

Check warning on line 565 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L564-L565

Added lines #L564 - L565 were not covered by tests
if (n == 1) {
ac_state = (SPORADIC_AC_STATE)val;

Check warning on line 567 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L567

Added line #L567 was not covered by tests
} else {
ac_state = AC_NONE;
fprintf(stderr, "API: error parsing AC state: %s\n", buf);

Check warning on line 570 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L569-L570

Added lines #L569 - L570 were not covered by tests
}
last_ac_mod_time = t;

Check warning on line 572 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L572

Added line #L572 was not covered by tests
} else {
fprintf(stderr, "API: error reading AC state: %d\n", nc);

Check warning on line 574 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L574

Added line #L574 was not covered by tests
}
}
}
}

#ifndef _WIN32
// For multithread apps on Unix, the main process executes the following.
//
Expand Down Expand Up @@ -687,6 +746,7 @@
}
}

standalone = false;

Check warning on line 749 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L749

Added line #L749 was not covered by tests
retval = boinc_parse_init_data_file();
if (retval) {
standalone = true;
Expand Down Expand Up @@ -770,8 +830,10 @@
boinc_msg_prefix(buf, sizeof(buf)), status
);
finishing = true;
boinc_sleep(2.0); // let the timer thread send final messages
boinc_disable_timer_thread = true; // then disable it
if (!standalone) {
boinc_sleep(2.0); // let the timer thread send final messages
boinc_disable_timer_thread = true; // then disable it

Check warning on line 835 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L834-L835

Added lines #L834 - L835 were not covered by tests
}

if (options.main_program) {
FILE* f = fopen(BOINC_FINISH_CALLED_FILE, "w");
Expand Down Expand Up @@ -882,6 +944,28 @@
return 0;
}

int boinc_sporadic_dir(const char* dir) {

Check warning on line 947 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L947

Added line #L947 was not covered by tests
char buf[MAXPATHLEN];

do_sporadic_files = true;
sprintf(buf, "%s/ac", dir);
ac_fd = open(buf, O_CREAT|O_RDONLY, 0666);

Check warning on line 952 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L950-L952

Added lines #L950 - L952 were not covered by tests
if (ac_fd < 0) {
fprintf(stderr, "can't open sporadic file %s\n", buf);
do_sporadic_files = false;

Check warning on line 955 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L954-L955

Added lines #L954 - L955 were not covered by tests
}
sprintf(buf, "%s/ca", dir);
ca_fd = open(buf, O_CREAT|O_WRONLY, 0666);

Check warning on line 958 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L957-L958

Added lines #L957 - L958 were not covered by tests
if (ca_fd < 0) {
fprintf(stderr, "can't open sporadic file %s\n", buf);
do_sporadic_files = false;

Check warning on line 961 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L960-L961

Added lines #L960 - L961 were not covered by tests
}
if (!do_sporadic_files) return ERR_FOPEN;
boinc_status.ca_state = CA_DONT_COMPUTE;
ac_state = AC_NONE;
return 0;

Check warning on line 966 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L964-L966

Added lines #L964 - L966 were not covered by tests
}

// called from the timer thread if we need to exit,
// e.g. quit message from client, or client has gone away
//
Expand Down Expand Up @@ -993,6 +1077,10 @@
snprintf(buf, sizeof(buf), "<bytes_received>%f</bytes_received>\n", _bytes_received);
safe_strcat(msg_buf, buf);
}
if (ac_state) {
sprintf(buf, "<sporadic_ac>%d</sporadic_ac>\n", ac_state);
strlcat(msg_buf, buf, sizeof(msg_buf));

Check warning on line 1082 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L1081-L1082

Added lines #L1081 - L1082 were not covered by tests
}
#ifdef MSGS_FROM_FILE
if (fout) {
fputs(msg_buf, fout);
Expand Down Expand Up @@ -1058,7 +1146,7 @@
suspend_or_resume_descendants(false);
}
// if called from worker thread, sleep until suspension is over
// if called from time thread, don't need to do anything;
// if called from timer thread, don't need to do anything;
// suspension is done by signal handler in worker thread
//
if (called_from_worker) {
Expand Down Expand Up @@ -1365,6 +1453,10 @@
app_client_shm->shm->graphics_reply.send_msg(buf);
send_remote_desktop_addr = false;
}

if (do_sporadic_files) {
sporadic_files();

Check warning on line 1458 in api/boinc_api.cpp

View check run for this annotation

Codecov / codecov/patch

api/boinc_api.cpp#L1458

Added line #L1458 was not covered by tests
}
}

#ifdef _WIN32
Expand Down Expand Up @@ -1489,7 +1581,7 @@

// called in the worker thread.
// set up a handler for SIGALRM.
// If Android, we'll get signals from the time thread.
// If Android, we'll get signals from the timer thread.
// otherwise, set an interval timer to deliver signals
//
static int start_worker_signals() {
Expand Down
1 change: 1 addition & 0 deletions api/boinc_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ extern int boinc_finish_message(
);
extern void boinc_sporadic_set_ac_state(SPORADIC_AC_STATE);
extern SPORADIC_CA_STATE boinc_sporadic_get_ca_state();
extern int boinc_sporadic_dir(const char*);

/////////// API ENDS HERE

Expand Down
4 changes: 2 additions & 2 deletions client/app_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,8 @@ void ACTIVE_TASK_SET::send_heartbeats() {
if (log_flags.heartbeat_debug) {
if (sent) {
msg_printf(atp->result->project, MSG_INFO,
"[heartbeat] Heartbeat sent to task %s",
atp->result->name
"[heartbeat] Heartbeat sent to task %s: %s",
atp->result->name, buf
);
} else {
msg_printf(atp->result->project, MSG_INFO,
Expand Down
3 changes: 3 additions & 0 deletions client/app_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ void CLIENT_STATE::app_test_init() {
// can put other stuff here like
av->avg_ncpus = 1;
av->flops = 1e9;
#if 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this changed by intention?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

av->gpu_ram = 1e7;
av->gpu_usage.rsc_type = PROC_TYPE_NVIDIA_GPU;
av->gpu_usage.usage = 1;
#endif
app_versions.push_back(av);

WORKUNIT *wu = new WORKUNIT;
Expand All @@ -77,6 +79,7 @@ void CLIENT_STATE::app_test_init() {
wu->rsc_fpops_bound = 1e12;
wu->rsc_memory_bound = 1e9;
wu->rsc_disk_bound = 1e9;
wu->command_line = "--sporadic";
workunits.push_back(wu);

RESULT *res = new RESULT;
Expand Down
10 changes: 6 additions & 4 deletions client/cpu_sched.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1626,10 +1626,12 @@ ACTIVE_TASK* CLIENT_STATE::get_task(RESULT* rp) {
ACTIVE_TASK *atp = lookup_active_task_by_result(rp);
if (!atp) {
atp = new ACTIVE_TASK;
int retval = atp->get_free_slot(rp);
if (retval) {
delete atp;
return NULL;
if (!rp->project->app_test) {
int retval = atp->get_free_slot(rp);
if (retval) {
delete atp;
return NULL;
}
}
atp->init(rp);
active_tasks.active_tasks.push_back(atp);
Expand Down
1 change: 1 addition & 0 deletions client/cs_cmdline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ static void print_options(char* prog) {
" --abort_jobs_on_exit when client exits, abort and report jobs\n"
" --allow_remote_gui_rpc allow remote GUI RPC connections\n"
" --allow_multiple_clients allow >1 instances per host\n"
" --app_test F run a simulated job with the given app\n"
" --attach_project <URL> <key> attach to a project\n"
" --check_all_logins for idle detection, check remote logins too\n"
" --daemon run as daemon (Unix)\n"
Expand Down
1 change: 1 addition & 0 deletions client/cs_sporadic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
SPORADIC_RESOURCES sporadic_resources;

void SPORADIC_RESOURCES::print() {
if (!ncpus_used) return;
msg_printf(NULL, MSG_INFO, "Sporadic resources:");
msg_printf(NULL, MSG_INFO, " %f CPUs", ncpus_used);
msg_printf(NULL, MSG_INFO, " %f MB RAM", mem_used/MEGA);
Expand Down
77 changes: 68 additions & 9 deletions samples/sporadic/sporadic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,29 @@
// when OK, compute for NCOMP secs
// suspend as needed
//
// computing is embedded in loop.
// computing is embedded in the loop.
// in a real app you'd want to use threads

// by default this uses the BOINC API for communicating sporadic state.
// --wrapped: use files instead (run under wrapper)

#define NWAIT 10
#define NCOMP 10

#include <sys/types.h>
#include <sys/stat.h>
#ifndef _WIN32
#include <unistd.h>
#endif
#include <fcntl.h>
#include <stdio.h>

#include "boinc_api.h"
#include "util.h"
#include "common_defs.h"

void boinc_sporadic_set_ac_state(SPORADIC_AC_STATE);
SPORADIC_CA_STATE boinc_sporadic_get_ca_state();
bool wrapped = false;
int ac_fd, ca_fd;

void compute_one_sec() {
double start = dtime();
Expand All @@ -31,22 +42,70 @@ void compute_one_sec() {
}
}

int main(int, char**) {
boinc_init();
void set_ac_state(SPORADIC_AC_STATE ac_state) {
static SPORADIC_AC_STATE last = AC_NONE;
if (wrapped) {
if (ac_state != last) {
char buf[256];
sprintf(buf, "%d\n", ac_state);
lseek(ac_fd, 0, SEEK_SET);
write(ac_fd, buf, strlen(buf));
}
last = ac_state;
} else {
boinc_sporadic_set_ac_state(ac_state);
}
}

SPORADIC_CA_STATE get_ca_state() {
if (wrapped) {
// could check mod time; don't bother
char buf[256];
lseek(ca_fd, 0, SEEK_SET);
read(ca_fd, buf, sizeof(buf));
int s;
int n = sscanf(buf, "%d", &s);
if (n==1) return (SPORADIC_CA_STATE)s;
fprintf(stderr, "can't read CA state\n");
exit(1);
} else {
return boinc_sporadic_get_ca_state();
}
}

int main(int argc, char** argv) {
SPORADIC_CA_STATE ca_state;
SPORADIC_AC_STATE ac_state;

for (int i=1; i<argc; i++) {
if (!strcmp(argv[i], "--wrapped")) {
wrapped = true;
}
}

if (wrapped) {
ca_fd = open("ca", O_RDONLY);
ac_fd = open("ac", O_WRONLY);
if (ca_fd<0 || ac_fd<0) {
fprintf(stderr, "can't open files\n");
exit(1);
}
} else {
boinc_init();
}

fprintf(stderr, "starting\n");
while (true) {
// wait for a bit
ac_state = AC_DONT_WANT_COMPUTE;
boinc_sporadic_set_ac_state(ac_state);
set_ac_state(ac_state);
for (int i=0; i<NWAIT; i++) {
fprintf(stderr, "sleep - don't want to compute\n");
boinc_sleep(1);
}
// wait until client says we can possibly compute
while (1) {
ca_state = boinc_sporadic_get_ca_state();
ca_state = get_ca_state();
if (ca_state != CA_COULD_COMPUTE) {
fprintf(stderr, "sleep - waiting for COULD_COMPUTE\n");
boinc_sleep(1);
Expand All @@ -56,11 +115,11 @@ int main(int, char**) {
}
// tell the client we want to compute
ac_state = AC_WANT_COMPUTE;
boinc_sporadic_set_ac_state(ac_state);
set_ac_state(ac_state);
int n = NCOMP;
while (true) {
// compute only if client says so
ca_state = boinc_sporadic_get_ca_state();
ca_state = get_ca_state();
fprintf(stderr, "CA state: %d\n", ca_state);
if (ca_state == CA_COMPUTING) {
fprintf(stderr, "computing 1 sec\n");
Expand Down
Loading
Loading