diff --git a/app/tests/port_tests.c b/app/tests/port_tests.c index 33cd305e4..aa855af6f 100644 --- a/app/tests/port_tests.c +++ b/app/tests/port_tests.c @@ -291,6 +291,248 @@ static int ping_pong_thread(void *arg) return __LINE__; } +static const char *kStatusPortNames[] = { + "status0", + "status1", +}; +static const char *kRacePortName = "racer_port"; + +static const port_packet_t kRepeat = + {{'R', 'E', 'P', 'E', 'A', 'T', 0, 0}}; +static const port_packet_t kQuit = + {{'Q', 'U', 'I', 'T', 0, 0, 0, 0}}; + +event_t race_evt; + +static int race_thread(void *arg) +{ + port_t r_port; + int tid = (int)arg; + + printf("thread %d: connecting to control port\n", tid); + status_t st = port_open("race_ctl", NULL, &r_port); + if (st < 0) { + printf("thread %d: could not open control port, status = %d\n", tid, st); + return __LINE__; + } + + printf("thread %d: creating status port\n", tid); + port_t w_port; + st = port_create(kStatusPortNames[tid], PORT_MODE_UNICAST, &w_port); + if (st < 0) { + printf("thread %d: could not create status port, status = %d\n", tid, st); + port_close(r_port); + return __LINE__; + } + + // Loop is meant to coordinate a port_create() race. + // The event triggers the race. + // The thread sleeps briefly then cleans up + // The thread reports its claim to its status port. + // It then waits for a repeat or quit message. + int ret = -1; + while (ret < 0) { + LTRACEF_LEVEL(1, "thread %d: waiting at the starting line\n", tid); + if (event_wait_timeout(&race_evt, INFINITE_TIME) != NO_ERROR) { + ret = __LINE__; + break; + } + + port_t race_port; + while(true) { + st = port_create(kRacePortName, PORT_MODE_UNICAST, &race_port); + if (st != ERR_BUSY) + break; + thread_sleep(25); + } // EINTR all over again . . . + LTRACEF_LEVEL(1, "thread %d: sampling chronochip (%x)\n", tid, race_port); + if (st == ERR_ALREADY_EXISTS) { + // lost the race to create the port. + } else if (st < 0) { + LTRACEF_LEVEL(1, "thread %d: could not open port, status = %d\n", tid, st); + ret = __LINE__; + break; + } else { // Dispose of it now. + thread_sleep(25); + port_close(race_port); + port_destroy(race_port); + } + + // Now send the stale pointer address as a status. + port_packet_t claimed_port = {{0}}; + int len = sizeof(claimed_port.value); + if (sizeof(race_port) < (size_t)len) + len = sizeof(race_port); + for (int i = 0; i < len; ++i) { + claimed_port.value[i] = 0xff & ((int)race_port) >> (i * 8); + } + LTRACEF_LEVEL(1, "thread %d: reporting status\n", tid); + st = port_write(w_port, &claimed_port, 1); + if (st < 0) { + printf("thread %d: could not write port, status = %d\n", tid, st); + ret = __LINE__; + break; + } + + LTRACEF_LEVEL(1, "thread %d: awaiting instructions\n", tid); + port_result_t pr; + st = port_read(r_port, INFINITE_TIME, &pr); + if (st == ERR_CANCELLED) { + printf("thread %d: could not read port, status = %d (CANCELLED)\n", tid, st); + ret = __LINE__; + break; + } else if (st < 0) { + printf("thread %d: could not read port, status = %d\n", tid, st); + ret = __LINE__; + break; + } + if (memcmp(pr.packet.value, kQuit.value, sizeof(pr.packet.value)) == 0) { + ret = 0; + break; + } + if (memcmp(pr.packet.value, kRepeat.value, sizeof(pr.packet.value)) == 0) { + continue; + } + printf("thread %d: got a weird message from the control port\n", tid); + ret = __LINE__; + } + thread_sleep((1+tid) * 5); // Make console output orderly. + printf("thread %d: shutting down (ret=%d)\n", tid, ret); + + port_close(r_port); + port_close(w_port); + port_destroy(w_port); + return ret; +} + + +int two_threads_race(void) +{ + printf("two_threads_race test . . .\n"); + // Used to tell the threads what to do. + port_t w_port; + status_t st = port_create("race_ctl", PORT_MODE_BROADCAST, &w_port); + if (st < 0) { + printf("could not create port, status = %d\n", st); + return __LINE__; + } + + event_init(&race_evt, false, 0); + + thread_t *t1 = thread_create( + "rt0", &race_thread, (void *)0, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE); + thread_t *t2 = thread_create( + "rt1", &race_thread, (void *)1, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE); + thread_set_real_time(t1); + thread_set_real_time(t2); + thread_resume(t1); + thread_resume(t2); + + // wait for each status port to be created so we can + // track behavior. + port_t r_port0, r_port1; + printf("control: connecting to thread 0 . . .\n"); + while (true) { + status_t st = port_open(kStatusPortNames[0], NULL, &r_port0); + if (st == NO_ERROR) { + break; + } else if (st == ERR_NOT_FOUND) { + thread_sleep(100); + } else { + printf("could not open port, status = %d\n", st); + // XXX: clean up... + break; + } + } + printf("control: connecting to thread 1 . . .\n"); + while (true) { + status_t st = port_open(kStatusPortNames[1], NULL, &r_port1); + if (st == NO_ERROR) { + break; + } else if (st == ERR_NOT_FOUND) { + thread_sleep(100); + } else { + printf("could not open port, status = %d\n", st); + return __LINE__; + } + } + + // control port says: 0 "REPEAT, or 1 "QUIT" + int ret = 0; + int count = 0; + while (ret == 0) { + LTRACEF_LEVEL(1, "Go!\n"); + printf("."); + event_signal(&race_evt, false); + port_result_t pr0, pr1; + LTRACEF_LEVEL(1, "Collecting status from thread 0 . . .\n"); + st = port_read(r_port0, INFINITE_TIME, &pr0); + if (st < 0) { + printf("could not read port, status = %d\n", st); + ret = __LINE__; + } + LTRACEF_LEVEL(1, "Collecting status from thread 1 . . .\n"); + st = port_read(r_port1, INFINITE_TIME, &pr1); + if (st < 0) { + printf("could not read port, status = %d\n", st); + ret = __LINE__; + } + LTRACEF_LEVEL(1, "Checking responses . . .\n"); + if (memcmp(pr0.packet.value, pr1.packet.value, sizeof(pr0.packet.value)) != 0) { + printf("Race detected on iteration %d!\n", count); + ret = __LINE__; + } + event_unsignal(&race_evt); + int repeat = (ret == 0 && count++ < 99 ? 1 : 0); + LTRACEF_LEVEL(1, "Telling threads to %s\n", (repeat ? "repeat" : "quit")); + st = port_write(w_port, (repeat ? &kRepeat : &kQuit), 1); + if (st < 0) { + printf("could not write port, status = %d\n", st); + ret = __LINE__; + } + if (!repeat) { + break; + } + } + printf("\n%d passes completed with result %d\n", count, ret); + + st = port_close(r_port0); + if (st < 0) { + printf("could not close port, status = %d\n", st); + ret = __LINE__; + } + + st = port_close(r_port1); + if (st < 0) { + printf("could not close port, status = %d\n", st); + ret = __LINE__; + } + + st = port_close(w_port); + if (st < 0) { + printf("could not close port, status = %d\n", st); + ret = __LINE__; + } + + int retcode = -1; + thread_join(t1, &retcode, INFINITE_TIME); + if (retcode) + ret = retcode; + + thread_join(t2, &retcode, INFINITE_TIME); + if (retcode) + ret = retcode; + + st = port_destroy(w_port); + if (st < 0) { + printf("could not destroy port, status = %d\n", st); + ret = __LINE__; + } + + printf("two_thread_race: %d\n", ret); + return ret; +} + int two_threads_basic(void) { @@ -738,6 +980,7 @@ int port_tests(void) while (count--) { RUN_TEST(single_thread_basic); RUN_TEST(two_threads_basic); + RUN_TEST(two_threads_race); RUN_TEST(group_basic); RUN_TEST(group_dynamic); } diff --git a/kernel/port.c b/kernel/port.c index be124ecc8..c2e9cbb54 100644 --- a/kernel/port.c +++ b/kernel/port.c @@ -45,6 +45,7 @@ #define READPORT_MAGIC (0x70727472) // 'prtr' #define PORTGROUP_MAGIC (0x70727467) // 'prtg' +#define PORTHOLD_MAGIC (0x70727467) // 'prth' #define PORT_BUFF_SIZE 8 #define PORT_BUFF_SIZE_BIG 64 @@ -148,16 +149,24 @@ status_t port_create(const char *name, port_mode_t mode, port_t *port) return ERR_INVALID_ARGS; } - if (strlen(name) >= PORT_NAME_LEN) + if (strnlen(name, PORT_NAME_LEN) >= PORT_NAME_LEN) return ERR_INVALID_ARGS; + + // Add a stack-allocated port to the list until we can + // replace it with a heap-allocated port. + write_port_t stack_wp = { .magic = PORTHOLD_MAGIC }; + // We waste a few cycles here with a throwaway copy. + strlcpy(stack_wp.name, name, sizeof(stack_wp.name)); + // lookup for existing port, return that if found. write_port_t *wp = NULL; THREAD_LOCK(state1); list_for_every_entry(&write_port_list, wp, write_port_t, node) { if (strcmp(wp->name, name) == 0) { - // can't return closed ports. - if (wp->magic == WRITEPORT_MAGIC_X) + // can't return closed or partial ports. + if (wp->magic == WRITEPORT_MAGIC_X || + wp->magic == PORTHOLD_MAGIC) wp = NULL; THREAD_UNLOCK(state1); if (wp) { @@ -168,12 +177,17 @@ status_t port_create(const char *name, port_mode_t mode, port_t *port) } } } + list_add_tail(&write_port_list, &stack_wp.node); THREAD_UNLOCK(state1); // not found, create the write port and the circular buffer. wp = calloc(1, sizeof(write_port_t)); - if (!wp) + if (!wp) { + THREAD_LOCK(state2); + list_delete(&stack_wp.node); + THREAD_UNLOCK(state2); return ERR_NO_MEMORY; + } wp->magic = WRITEPORT_MAGIC_W; wp->mode = mode; @@ -184,13 +198,18 @@ status_t port_create(const char *name, port_mode_t mode, port_t *port) wp->buf = make_buf(size); if (!wp->buf) { free(wp); + THREAD_LOCK(state2); + list_delete(&stack_wp.node); + THREAD_UNLOCK(state2); return ERR_NO_MEMORY; } - // todo: race condtion! a port with the same name could have been created - // by another thread at is point. + // Avoid a name collision by swapping the temporary placeholder out of the + // list for the actual port. THREAD_LOCK(state2); + // Let's reserve a stack allocated entry then swap it for the allocated one. list_add_tail(&write_port_list, &wp->node); + list_delete(&stack_wp.node); THREAD_UNLOCK(state2); *port = (void *)wp; @@ -226,7 +245,8 @@ status_t port_open(const char *name, void *ctx, port_t *port) THREAD_LOCK(state); write_port_t *wp = NULL; list_for_every_entry(&write_port_list, wp, write_port_t, node) { - if (strcmp(wp->name, name) == 0) { + if (strcmp(wp->name, name) == 0 && + wp->magic != PORTHOLD_MAGIC) { // found; add read port to write port list. rp->wport = wp; if (wp->buf) {