Skip to content

Commit

Permalink
Address memory consumption related to AIO reads
Browse files Browse the repository at this point in the history
Add backpressure mechanism for AIO reads in io_uring module.

In some edge cases where storage backend can't keep up with client
reads, io_uring / samba will allow queueing up reads beyond what
is reasonable leading to OOM errors in some extreme edge cases.

This commit keeps a counter of outstanding read requests per-TCON
and switches to pread(2) once the queue depth limit is reached.
This effectively applies backpressure to client and prevents excessive
memory consumption. A total count of synchronous reads performed is
logged when TCON is disconnected if log level for the VFS module is
sufficiently high (3 or higher).

Fix memory leak related to AIO memory pool for some failure scenarios
for AIO reads.

The memory pool for AIO buffers is allocated under a memory context
that persists for the duration of the SMB session. When chunks of
pool are assigned out for a particular AIO request, a separate
small allocation (io_link) is performed with a pointer to the data
buffer and a talloc destructor set such that when the io_link is
freed the buffer is returned to the pool.

Initially these were performed as two separate steps (assigning out
buffer from pool and setting up io_link) which allowed for situations
to arise in which the request could error out before the io_link was
set resulting in the buffer never being returned to the memory pool.

This commit revises the workflow and API for the memory pool such
that the chunk is always assinged with an associated io_link allocated
under the memory context of the initial SMB2 read request. Once the
request is successful and we're getting ready to respond to client
then we reparent the io_link to the context of the response. This
ensures that the buffer is always returned when a limited-life
talloc chunk is freed.
  • Loading branch information
anodos325 committed Nov 1, 2024
1 parent b22bfc8 commit e0a3e15
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 58 deletions.
19 changes: 15 additions & 4 deletions source3/lib/truenas_mempool.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ static int io_buffer_destroy(struct io_pool_link *lnk)
return 0;
}

bool link_io_buffer_blob(TALLOC_CTX *mem_ctx, DATA_BLOB *buf)
static struct io_pool_link *link_io_buffer_blob(TALLOC_CTX *mem_ctx, DATA_BLOB *buf)
{
struct io_pool_link *lnk = NULL;

SMB_ASSERT(buf->data != NULL);

lnk = talloc_zero(mem_ctx, struct io_pool_link);
if (lnk == NULL) {
return false;
return lnk;
}
lnk->to_free = buf->data;
talloc_set_destructor(lnk, io_buffer_destroy);
return true;
return lnk;
}

static bool link_io_buffer(TALLOC_CTX *mem_ctx)
Expand Down Expand Up @@ -123,6 +123,7 @@ static bool init_io_pool(struct smbd_server_connection *sconn)
if (sconn->io_memory_pool == NULL) {
return false;
}
talloc_set_name(sconn->io_memory_pool, "TrueNAS Memory Pool");
}

if (io_buffer_timer == NULL) {
Expand All @@ -139,10 +140,13 @@ static bool init_io_pool(struct smbd_server_connection *sconn)
}

bool io_pool_alloc_blob(struct connection_struct *conn,
TALLOC_CTX *mem_ctx,
size_t buflen,
DATA_BLOB *out)
DATA_BLOB *out,
struct io_pool_link **lnk_out)
{
DATA_BLOB buf = { 0 };
struct io_pool_link *lnk = NULL;

if (!init_io_pool(conn->sconn)) {
return false;
Expand All @@ -153,7 +157,14 @@ bool io_pool_alloc_blob(struct connection_struct *conn,
return false;
}

lnk = link_io_buffer_blob(mem_ctx, &buf);
if (lnk == NULL) {
data_blob_free(&buf);
return false;
}

*out = buf;
*lnk_out = lnk;
alloc_cnt += 1;
return true;
}
Expand Down
30 changes: 10 additions & 20 deletions source3/lib/truenas_mempool.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,27 @@
* along with this program; if not, see <http://www.gnu.org/licenses/>.
*/


/**
* @brief Link the specified data blob to the specified memory context
* This is done so that when the specified context is freed
* the buffer associated the the specified DATA_BLOB is also
* freed. This may be used in lieu of talloc_steal of the buffer,
* and is required when memory pool is in use in order to ensure
* that memory is released to the pool c.f. documentation for
* talloc_pool(). A DATA_BLOB must be linked to no more than
* one talloc context.
*
* @param[in] ctx The talloc context for the link
* @param[in] buf Buffer to link to the context
*
* @return true on success false on failure.
*/
bool link_io_buffer_blob(TALLOC_CTX *mem_ctx, DATA_BLOB *buf);
struct io_pool_link;

/**
* @brief Allocate a DATA_BLOB with a buffer size specified by buflen
* using memory in the io_memory_pool.
* using memory in the io_memory_pool. The buffer will be freed
* when lnk_out is freed. lnk_out is allocated under the specified
* mem_ctx.
*
* @param[in] conn The current tree connection
* @param[in] mem_ctx Memory context under which to free buffer.
* @param[in] buflen size of buffer to allocate
* @param[buf] out New DATA_BLOB with buffer
* @param[out] buf New DATA_BLOB with buffer
* @param[out] lnk_out Autofree linkage for data blob buffer
*
* @return true on success false on failure.
*/
bool io_pool_alloc_blob(struct connection_struct *conn,
TALLOC_CTX *mem_ctx,
size_t buflen,
DATA_BLOB *out);
DATA_BLOB *out,
struct io_pool_link **lnk_out);

void *_io_pool_calloc_size(struct connection_struct *conn, size_t size,
const char *name, const char *location);
Expand Down
110 changes: 90 additions & 20 deletions source3/modules/vfs_io_uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct open_how;
#define IO_URING_ASYNC_FSYNC 0x04

#define VFS_URING_WRITEQ_DEFAULT 10
#define VFS_URING_READQ_DEFAULT 20

static int vfs_io_uring_debug_level = DBGC_VFS;

Expand All @@ -61,6 +62,13 @@ static int vfs_io_uring_debug_level = DBGC_VFS;

struct vfs_io_uring_request;

struct vfs_io_uring_queue_config {
int queue_sz;
uint op_cnt;
uint sync_cnt;
uint async_cnt;
};

struct vfs_io_uring_config {
struct io_uring uring;
struct tevent_fd *fde;
Expand All @@ -71,10 +79,8 @@ struct vfs_io_uring_config {
int async_ops;
struct vfs_io_uring_request *queue;
struct vfs_io_uring_request *pending;
int uring_write_queue_sz;
uint uring_write_op_cnt;
uint sync_write_cnt;
uint async_write_cnt;
struct vfs_io_uring_queue_config writeq;
struct vfs_io_uring_queue_config readq;
};

struct vfs_io_uring_request {
Expand Down Expand Up @@ -249,18 +255,29 @@ static int vfs_io_uring_connect(vfs_handle_struct *handle, const char *service,
config->async_ops |= IO_URING_ASYNC_WRITE;
}

config->uring_write_queue_sz = lp_parm_int(SNUM(handle->conn),
config->writeq.queue_sz = lp_parm_int(SNUM(handle->conn),
"io_uring",
"write_queue_sz",
VFS_URING_WRITEQ_DEFAULT);
if (config->uring_write_queue_sz <= 0) {
if (config->writeq.queue_sz <= 0) {
DBG_ERR("%d: write_queue_sz parameter must be greater than 0. "
"setting to default of %d\n",
config->uring_write_queue_sz,
config->writeq.queue_sz,
VFS_URING_WRITEQ_DEFAULT);
config->uring_write_queue_sz = VFS_URING_WRITEQ_DEFAULT;
config->writeq.queue_sz = VFS_URING_WRITEQ_DEFAULT;
}

config->readq.queue_sz = lp_parm_int(SNUM(handle->conn),
"io_uring",
"read_queue_sz",
VFS_URING_READQ_DEFAULT);
if (config->readq.queue_sz <= 0) {
DBG_ERR("%d: read_queue_sz parameter must be greater than 0. "
"setting to default of %d\n",
config->readq.queue_sz,
VFS_URING_READQ_DEFAULT);
config->readq.queue_sz = VFS_URING_READQ_DEFAULT;
}

ret = io_uring_queue_init(num_entries, &config->uring, flags);
if (ret < 0) {
Expand Down Expand Up @@ -475,6 +492,7 @@ struct vfs_io_uring_pread_state {
struct iovec iov;
size_t nread;
struct vfs_io_uring_request ur;
bool is_sync_read;
};

static void vfs_io_uring_pread_submit(struct vfs_io_uring_pread_state *state);
Expand Down Expand Up @@ -510,6 +528,34 @@ static struct tevent_req *vfs_io_uring_pread_send(struct vfs_handle_struct *hand
if (req == NULL) {
return NULL;
}

/*
* Apply backpressure to client by performing synchronous read
*/
if (config->readq.op_cnt > config->readq.queue_sz) {
ssize_t nread;
ok = sys_valid_io_range(offset, n);
if (!ok) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}

config->readq.sync_cnt++;
state->is_sync_read = true;

nread = sys_pread_full(fsp_get_io_fd(fsp), data, n, offset);
if (nread == -1) {
DBG_ERR("%s: read from file failed with error: %s\n",
fsp_str_dbg(fsp), strerror(errno));
tevent_req_error(req, errno);
return tevent_req_post(req, ev);
}

state->nread = nread;
tevent_req_done(req);
return tevent_req_post(req, ev);
}

if (config->async_ops & IO_URING_ASYNC_READ) {
state->ur.sqe_flags |= IOSQE_ASYNC;
}
Expand All @@ -518,6 +564,11 @@ static struct tevent_req *vfs_io_uring_pread_send(struct vfs_handle_struct *hand
state->ur.completion_fn = vfs_io_uring_pread_completion;
state->ur.destructor_fn = vfs_io_uring_pread_destructor;

// Increment because at this point we'll hit the receive_fn
// which decrements the op_cnt
config->readq.op_cnt++;
config->readq.async_cnt++;

SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pread, profile_p,
state->ur.profile_bytes, n);
SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->ur.profile_bytes);
Expand Down Expand Up @@ -613,10 +664,26 @@ static ssize_t vfs_io_uring_pread_recv(struct tevent_req *req,
req, struct vfs_io_uring_pread_state);
ssize_t ret;

if (state->is_sync_read) {
if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
tevent_req_received(req);
return -1;
}
vfs_aio_state->error = 0;
ret = state->nread;

tevent_req_received(req);
return ret;
}

SMBPROFILE_BYTES_ASYNC_END(state->ur.profile_bytes);
vfs_aio_state->duration = nsec_time_diff(&state->ur.end_time,
&state->ur.start_time);

SMB_ASSERT(state->ur.config != NULL);
SMB_ASSERT(state->ur.config->readq.op_cnt > 0);
state->ur.config->readq.op_cnt--;

if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
tevent_req_received(req);
return -1;
Expand Down Expand Up @@ -676,16 +743,16 @@ static struct tevent_req *vfs_io_uring_pwrite_send(struct vfs_handle_struct *han
* Apply backpressure to client by performing synchronous write
*
*/
if (config->uring_write_op_cnt > config->uring_write_queue_sz) {
if (config->writeq.op_cnt > config->writeq.queue_sz) {
ok = sys_valid_io_range(offset, n);
if (!ok) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}

config->sync_write_cnt++;
config->writeq.sync_cnt++;
state->is_sync_write = true;
state->nwritten = pwrite(fsp_get_io_fd(fsp), data, n, offset);
state->nwritten = sys_pwrite_full(fsp_get_io_fd(fsp), data, n, offset);
if (state->nwritten == -1) {
DBG_ERR("%s: write to file failed with error: %s\n",
fsp_str_dbg(fsp), strerror(errno));
Expand All @@ -707,8 +774,8 @@ static struct tevent_req *vfs_io_uring_pwrite_send(struct vfs_handle_struct *han

// Increment because at this point we'll hit the receive_fn
// which decrements the op_cnt
config->uring_write_op_cnt++;
config->async_write_cnt++;
config->writeq.op_cnt++;
config->writeq.async_cnt++;

SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite, profile_p,
state->ur.profile_bytes, n);
Expand Down Expand Up @@ -822,8 +889,8 @@ static ssize_t vfs_io_uring_pwrite_recv(struct tevent_req *req,
&state->ur.start_time);

SMB_ASSERT(state->ur.config != NULL);
SMB_ASSERT(state->ur.config->uring_write_op_cnt > 0);
state->ur.config->uring_write_op_cnt--;
SMB_ASSERT(state->ur.config->writeq.op_cnt > 0);
state->ur.config->writeq.op_cnt--;

if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
tevent_req_received(req);
Expand Down Expand Up @@ -949,11 +1016,14 @@ static void vfs_io_uring_disconnect(vfs_handle_struct *handle)
struct vfs_io_uring_config,
smb_panic(__location__));

if (config->sync_write_cnt) {
DBG_NOTICE("Performed %u synchronous writes and "
"%u async writes.\n",
config->sync_write_cnt,
config->async_write_cnt);
// optional logging for performance team to check whether
// we hit sync fallback during torture run.
if (config->writeq.sync_cnt || config->readq.sync_cnt) {
DBG_NOTICE("Performed %u synchronous writes and %u async "
"writes, and %u synchronous reads and %u async "
"reads.\n",
config->writeq.sync_cnt, config->writeq.async_cnt,
config->readq.sync_cnt, config->readq.async_cnt);
}
}

Expand Down
2 changes: 2 additions & 0 deletions source3/smbd/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ bool srv_init_signing(struct smbXsrv_connection *conn);
/* The following definitions come from smbd/aio.c */

struct aio_extra;
struct io_pool_link;
bool aio_write_through_requested(struct aio_extra *aio_ex);
NTSTATUS schedule_smb2_aio_read(connection_struct *conn,
struct smb_request *smbreq,
files_struct *fsp,
TALLOC_CTX *ctx,
DATA_BLOB *preadbuf,
struct io_pool_link *lnk,
off_t startpos,
size_t smb_maxcnt);
NTSTATUS schedule_aio_smb2_write(connection_struct *conn,
Expand Down
3 changes: 2 additions & 1 deletion source3/smbd/smb2_aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ NTSTATUS schedule_smb2_aio_read(connection_struct *conn,
files_struct *fsp,
TALLOC_CTX *ctx,
DATA_BLOB *preadbuf,
struct io_pool_link *lnk,
off_t startpos,
size_t smb_maxcnt)
{
Expand Down Expand Up @@ -356,7 +357,7 @@ NTSTATUS schedule_smb2_aio_read(connection_struct *conn,

/* Create the out buffer. */

if (!io_pool_alloc_blob(conn, smb_maxcnt, preadbuf)) {
if (!io_pool_alloc_blob(conn, ctx, smb_maxcnt, preadbuf, &lnk)) {
return NT_STATUS_NO_MEMORY;
}

Expand Down
Loading

0 comments on commit e0a3e15

Please sign in to comment.