Skip to content

Commit

Permalink
prov/opx: Fix Coverity scan waiting while holding lock errors.
Browse files Browse the repository at this point in the history
Remove busy-waiting/sleeping while SHM FIFO has not been created/initialized.
Cast result of CPU_ISSET() to a uint64_t to fix a coverity warning about
potentially left-shifting by more than 31 bits. Whitespace/formatting fixups.

Signed-off-by: Ben Lynam <[email protected]>
  • Loading branch information
belynam authored and j-xiong committed Apr 1, 2024
1 parent 4c20ae3 commit b60d462
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 182 deletions.
182 changes: 82 additions & 100 deletions prov/opx/include/opx_shm.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (c) 2016-2018 Intel Corporation. All rights reserved.
* Copyright (c) 2021-2023 Cornelis Networks.
* Copyright (c) 2021-2024 Cornelis Networks.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -59,15 +59,17 @@
#include <limits.h>
#include <signal.h>

#ifdef OPX_DAOS_SUPPORT
#ifdef OPX_DAOS
#define OPX_SHM_MAX_CONN_NUM 0xffff
#else
/* FI_OPX_MAX_HFIS * 256 */
#define OPX_SHM_MAX_CONN_NUM 0x1000
#define OPX_SHM_MAX_CONN_NUM (0x1000)
#define OPX_SHM_MAX_CONN_MASK (OPX_SHM_MAX_CONN_NUM - 1)
#endif
static_assert((OPX_SHM_MAX_CONN_NUM & OPX_SHM_MAX_CONN_MASK) == 0,
"OPX_SHM_MAX_CONN_NUM must be a power of 2!");

#define OPX_SHM_SEGMENT_NAME_MAX_LENGTH (512)
#define OPX_SHM_TX_CONNECT_MAX_WAIT (5000) // 5 seconds
#define OPX_SHM_SEGMENT_NAME_PREFIX "/opx.shm."
#define OPX_SHM_FILE_NAME_PREFIX_FORMAT "%s-%02hhX.%d"

Expand All @@ -80,14 +82,14 @@ struct opx_shm_connection {
void *segment_ptr;
size_t segment_size;
bool inuse;
char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
};

struct opx_shm_tx {
struct opx_shm_tx *next; // for signal handler
struct fi_provider *prov;
struct opx_shm_fifo_segment *fifo_segment[OPX_SHM_MAX_CONN_NUM];
struct opx_shm_connection connection[OPX_SHM_MAX_CONN_NUM];
struct fi_provider *prov;
struct opx_shm_tx *next; // for signal handler
uint32_t rank;
uint32_t rank_inst;
};
Expand All @@ -98,12 +100,12 @@ struct opx_shm_resynch {
};

struct opx_shm_rx {
struct opx_shm_rx *next; // for signal handler
struct fi_provider *prov;
struct opx_shm_fifo_segment *fifo_segment;
void *segment_ptr;
size_t segment_size;
char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
struct fi_provider *prov;
struct opx_shm_rx *next; // for signal handler
struct opx_shm_resynch resynch_connection[OPX_SHM_MAX_CONN_NUM];
};

Expand All @@ -112,42 +114,42 @@ extern struct opx_shm_rx *shm_rx_head;

struct opx_shm_packet
{
ofi_atomic64_t sequence_;
uint32_t origin_rank;
uint32_t origin_rank_inst;
ofi_atomic64_t sequence_;
uint32_t origin_rank;
uint32_t origin_rank_inst;

// TODO: Figure out why using pad_next_cacheline causes a segfault due to alignment w/ movaps instruction
// but the other one below does not, even though in both cases the struct size is the
// same, and data starts at a 16-byte aligned offset into the struct.
// TODO: Figure out why using pad_next_cacheline causes a segfault due to alignment w/ movaps instruction
// but the other one below does not, even though in both cases the struct size is the
// same, and data starts at a 16-byte aligned offset into the struct.

// sizeof(opx_shm_packet) == 8320, data starts at offset 0x40 (64)
// uint8_t pad_next_cacheline[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t) - sizeof(uint32_t) - sizeof(uint32_t)];
// sizeof(opx_shm_packet) == 8320, data starts at offset 0x40 (64)
// uint8_t pad_next_cacheline[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t) - sizeof(uint32_t) - sizeof(uint32_t)];

// sizeof(opx_shm_packet) == 8320, data starts at offset 0x20 (32)
uint64_t pad;
// sizeof(opx_shm_packet) == 8320, data starts at offset 0x20 (32)
uint64_t pad;

uint8_t data[FI_OPX_SHM_PACKET_SIZE];
uint8_t data[FI_OPX_SHM_PACKET_SIZE];
}__attribute__((__aligned__(64)));

struct opx_shm_fifo {
ofi_atomic64_t enqueue_pos_;
uint8_t pad0_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
ofi_atomic64_t dequeue_pos_;
uint8_t pad1_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
struct opx_shm_packet buffer_[FI_OPX_SHM_FIFO_SIZE];
ofi_atomic64_t enqueue_pos_;
uint8_t pad0_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
ofi_atomic64_t dequeue_pos_;
uint8_t pad1_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
struct opx_shm_packet buffer_[FI_OPX_SHM_FIFO_SIZE];
} __attribute__((__aligned__(64)));

static_assert((offsetof(struct opx_shm_fifo, enqueue_pos_) & 0x3fUL) == 0,
"struct opx_shm_fifo->enqueue_pos_ needs to be 64-byte aligned!");
"struct opx_shm_fifo->enqueue_pos_ needs to be 64-byte aligned!");
static_assert((offsetof(struct opx_shm_fifo, dequeue_pos_) & 0x3fUL) == 0,
"struct opx_shm_fifo->dequeue_pos_ needs to be 64-byte aligned!");
"struct opx_shm_fifo->dequeue_pos_ needs to be 64-byte aligned!");
static_assert(offsetof(struct opx_shm_fifo, buffer_) == (FI_OPX_CACHE_LINE_SIZE * 2),
"struct opx_shm_fifo->buffer_ should be 2 cachelines into struct");
"struct opx_shm_fifo->buffer_ should be 2 cachelines into struct");

struct opx_shm_fifo_segment {
ofi_atomic64_t initialized_;
uint8_t pad1_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
struct opx_shm_fifo fifo;
ofi_atomic64_t initialized_;
uint8_t pad1_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
struct opx_shm_fifo fifo;
} __attribute__((__aligned__(64)));

static inline
Expand Down Expand Up @@ -232,11 +234,11 @@ ssize_t opx_shm_rx_init (struct opx_shm_rx *rx,
rx->segment_ptr = segment_ptr;
rx->segment_size = segment_size;

// TODO: MHEINZ we probably need a lock here.
rx->next = shm_rx_head; shm_rx_head = rx; // add to signal handler list.
rx->next = shm_rx_head;
shm_rx_head = rx; // add to signal handler list.

ofi_atomic_set64(&rx->fifo_segment->initialized_, 1);

close(segment_fd); /* safe to close now */

FI_LOG(prov, FI_LOG_INFO, FI_LOG_FABRIC,
Expand Down Expand Up @@ -282,8 +284,8 @@ ssize_t opx_shm_tx_init (struct opx_shm_tx *tx,
tx->rank = hfi_rank;
tx->rank_inst = hfi_rank_inst;

// TODO: MHEINZ we probably need a lock here.
tx->next = shm_tx_head; shm_tx_head = tx; // add to signal handler list.
tx->next = shm_tx_head;
shm_tx_head = tx; // add to signal handler list.

return FI_SUCCESS;
}
Expand All @@ -299,83 +301,57 @@ ssize_t opx_shm_tx_connect (struct opx_shm_tx *tx,
assert(segment_index < OPX_SHM_MAX_CONN_NUM);
int err = 0;

char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
memset(segment_key, 0, OPX_SHM_SEGMENT_NAME_MAX_LENGTH);
void *segment_ptr = tx->connection[segment_index].segment_ptr;
if (segment_ptr == NULL) {
char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
snprintf(segment_key, OPX_SHM_SEGMENT_NAME_MAX_LENGTH,
OPX_SHM_SEGMENT_NAME_PREFIX "%s.%d",
unique_job_key, rx_id);

snprintf(segment_key, OPX_SHM_SEGMENT_NAME_MAX_LENGTH,
OPX_SHM_SEGMENT_NAME_PREFIX "%s.%d",
unique_job_key, rx_id);
int segment_fd = shm_open(segment_key, O_RDWR, 0600);
if (segment_fd == -1) {
FI_DBG(tx->prov, FI_LOG_FABRIC,
"Unable to create shm object '%s'; errno = '%s'\n",
segment_key, strerror(errno));
return -FI_EAGAIN;
}

if (segment_index >= OPX_SHM_MAX_CONN_NUM) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"Unable to create shm object '%s'; segment_index %u (rx %u) too large\n",
segment_key, segment_index, rx_id);
return -FI_E2BIG;
}
size_t segment_size = sizeof(struct opx_shm_fifo_segment) + 64;

int segment_fd;
unsigned loop = 0;
for (;;) {
segment_fd = shm_open(segment_key, O_RDWR, 0600);
if (segment_fd == -1) {
if (loop++ > OPX_SHM_TX_CONNECT_MAX_WAIT) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"Unable to create shm object '%s'; errno = '%s'\n",
segment_key, strerror(errno));
return -FI_EAGAIN;
}
usleep(1000);
} else {
break;
segment_ptr = mmap(NULL, segment_size, PROT_READ | PROT_WRITE,
MAP_SHARED, segment_fd, 0);
if (segment_ptr == MAP_FAILED) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"mmap failed: '%s'\n", strerror(errno));
err = errno;
goto error_return;
}
}

size_t segment_size = sizeof(struct opx_shm_fifo_segment) + 64;
close(segment_fd); /* safe to close now */

void *segment_ptr = mmap(NULL, segment_size, PROT_READ | PROT_WRITE,
MAP_SHARED, segment_fd, 0);
if (segment_ptr == MAP_FAILED) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"mmap failed: '%s'\n", strerror(errno));
err = errno;
goto error_return;
tx->connection[segment_index].segment_ptr = segment_ptr;
tx->connection[segment_index].segment_size = segment_size;
tx->connection[segment_index].inuse = false;
strcpy(tx->connection[segment_index].segment_key, segment_key);
}

close(segment_fd); /* safe to close now */

/*
* Wait for completion of the initialization of the SHM segment before using
* it.
*/
loop = 0;
struct opx_shm_fifo_segment *fifo_segment =
(struct opx_shm_fifo_segment *)(((uintptr_t)segment_ptr + 64) & (~0x03Full));
for (;;) {
uint64_t init =
atomic_load_explicit(&fifo_segment->initialized_.val, memory_order_acquire);

if (init == 0) {
if (loop++ > OPX_SHM_TX_CONNECT_MAX_WAIT) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"SHM object '%s' still initializing.\n",
segment_key);
return -FI_EAGAIN;
}
usleep(1000);
} else {
break;
}
uint64_t init = atomic_load_explicit(&fifo_segment->initialized_.val,
memory_order_acquire);
if (init == 0) {
FI_DBG(tx->prov, FI_LOG_FABRIC,
"SHM object '%s' still initializing.\n",
tx->connection[segment_index].segment_key);
return -FI_EAGAIN;
}

tx->connection[segment_index].segment_ptr = segment_ptr;
tx->connection[segment_index].segment_size = segment_size;
tx->connection[segment_index].inuse = false;
tx->fifo_segment[segment_index] = fifo_segment;
strcpy(tx->connection[segment_index].segment_key, segment_key);

FI_LOG(tx->prov, FI_LOG_INFO, FI_LOG_FABRIC,
"SHM connection to %u context passed. Segment (%s), %d, segment (%p) size %zu segment_index %u\n",
rx_id, segment_key, segment_fd, segment_ptr, segment_size, segment_index);
"SHM connection to %u context passed. Segment (%s), segment (%p) size %zu segment_index %u\n",
rx_id, tx->connection[segment_index].segment_key, segment_ptr,
tx->connection[segment_index].segment_size, segment_index);

return FI_SUCCESS;

Expand Down Expand Up @@ -436,19 +412,25 @@ static inline
void * opx_shm_tx_next (struct opx_shm_tx *tx, uint8_t peer_hfi_unit, uint8_t peer_rx_index,
uint64_t *pos, bool use_rank, unsigned rank, unsigned rank_inst, ssize_t *rc)
{
#ifdef OPX_DAOS
/* HFI Rank Support: Used HFI rank index instead of HFI index. */
unsigned segment_index = (!use_rank) ? OPX_SHM_SEGMENT_INDEX(peer_hfi_unit, peer_rx_index)
: opx_shm_daos_rank_index(rank, rank_inst);

#else
unsigned segment_index = OPX_SHM_SEGMENT_INDEX(peer_hfi_unit, peer_rx_index);
#endif
assert(segment_index < OPX_SHM_MAX_CONN_NUM);

#ifndef NDEBUG
if (segment_index >= OPX_SHM_MAX_CONN_NUM) {
*rc = -FI_EIO;
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"SHM %u context exceeds maximum contexts supported.\n", segment_index);
return NULL;
}
#endif

if (tx->fifo_segment[segment_index] == NULL) {
if (OFI_UNLIKELY(tx->fifo_segment[segment_index] == NULL)) {
*rc = -FI_EIO;
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"SHM %u context FIFO not initialized.\n", segment_index);
Expand Down
Loading

0 comments on commit b60d462

Please sign in to comment.