Skip to content

Commit

Permalink
issue: 632455 Handle correctly msg_flag returned by recvmmesg() (e.g.…
Browse files Browse the repository at this point in the history
… MSG_TRUNC)

Change-Id: Ifa6a0ace5f400978d6a475cb01f73adf655a93c8
Signed-off-by: Ophir Munk <[email protected]>
  • Loading branch information
OphirMunk committed Oct 26, 2015
1 parent 35f9263 commit 0ad1980
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/vma/sock/sock-redirect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ int recvmmsg(int __fd, struct mmsghdr *__mmsghdr, unsigned int __vlen, int __fla
if (p_socket_object) {
int ret = 0;
for (unsigned int i=0; i<__vlen; i++) {
int flags = __flags;
int flags = __flags;
ret = p_socket_object->rx(RX_RECVMSG, __mmsghdr[i].msg_hdr.msg_iov, __mmsghdr[i].msg_hdr.msg_iovlen, &flags,
(__SOCKADDR_ARG)__mmsghdr[i].msg_hdr.msg_name, (socklen_t*)&__mmsghdr[i].msg_hdr.msg_namelen, &__mmsghdr[i].msg_hdr);
if (ret < 0){
Expand Down
2 changes: 1 addition & 1 deletion src/vma/sock/socket_fd_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ ssize_t socket_fd_api::rx_os(const rx_call_t call_type, iovec* p_iov,
case RX_RECVMSG: {
__log_info_func("calling os receive with orig recvmsg");
return orig_os_api.recvmsg(m_fd, __msg, *p_flags);
}
}
}
return (ssize_t) -1;
}
Expand Down
5 changes: 3 additions & 2 deletions src/vma/sock/sockinfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,11 @@ void sockinfo::save_stats_tx_os(int bytes)
}
}

size_t sockinfo::handle_msg_trunc(size_t total_rx, size_t payload_size, int* p_flags)
size_t sockinfo::handle_msg_trunc(size_t total_rx, size_t payload_size, int in_flags, int* p_out_flags)
{
NOT_IN_USE(payload_size);
*p_flags &= ~MSG_TRUNC; //don't handle msg_trunc
NOT_IN_USE(in_flags);
*p_out_flags &= ~MSG_TRUNC; //don't handle msg_trunc
return total_rx;
}

Expand Down
12 changes: 6 additions & 6 deletions src/vma/sock/sockinfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou
virtual int zero_copy_rx (iovec *p_iov, mem_buf_desc_t *pdesc, int *p_flags) = 0;
int register_callback(vma_recv_callback_t callback, void *context);

virtual size_t handle_msg_trunc(size_t total_rx, size_t payload_size, int* p_flags);
virtual size_t handle_msg_trunc(size_t total_rx, size_t payload_size, int in_flags, int* p_out_flags);

bool attach_receiver(flow_tuple_with_local_if &flow_key);
bool detach_receiver(flow_tuple_with_local_if &flow_key);
Expand Down Expand Up @@ -223,14 +223,14 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou

inline int dequeue_packet(iovec *p_iov, ssize_t sz_iov,
sockaddr_in *__from, socklen_t *__fromlen,
int *p_flags)
int in_flags, int *p_out_flags)
{
mem_buf_desc_t *pdesc;
int total_rx = 0;
uint32_t nbytes, pos;
bool relase_buff = true;

bool is_peek = *p_flags & MSG_PEEK;
bool is_peek = in_flags & MSG_PEEK;
int rx_pkt_ready_list_idx = 1;
int rx_pkt_ready_offset = m_rx_pkt_ready_offset;

Expand All @@ -242,9 +242,9 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou
if (__from && __fromlen)
fetch_peer_info(&pdesc->path.rx.src, __from, __fromlen);

if (*p_flags & MSG_VMA_ZCOPY) {
if (in_flags & MSG_VMA_ZCOPY) {
relase_buff = false;
total_rx = zero_copy_rx(p_iov, pdesc, p_flags);
total_rx = zero_copy_rx(p_iov, pdesc, p_out_flags);
if (unlikely(total_rx < 0))
return -1;
m_rx_pkt_ready_offset = 0;
Expand Down Expand Up @@ -290,7 +290,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou
save_stats_rx_offload(total_rx);
}

total_rx = handle_msg_trunc(total_rx, payload_size, p_flags);
total_rx = handle_msg_trunc(total_rx, payload_size, in_flags, p_out_flags);

return total_rx;
}
Expand Down
12 changes: 7 additions & 5 deletions src/vma/sock/sockinfo_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,9 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov
int poll_count = 0;
int bytes_to_tcp_recved;
size_t total_iov_sz = 1;
bool block_this_run = m_b_blocking && !(*p_flags & MSG_DONTWAIT);
int out_flags = 0;
int in_flags = *p_flags;
bool block_this_run = m_b_blocking && !(in_flags & MSG_DONTWAIT);

m_loops_timer.start();

Expand All @@ -1330,7 +1332,7 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov
#ifdef VMA_TIME_MEASURE
INC_GO_TO_OS_RX_COUNT;
#endif
ret = socket_fd_api::rx_os(call_type, p_iov, sz_iov, p_flags, __from, __fromlen, __msg);
ret = socket_fd_api::rx_os(call_type, p_iov, sz_iov, &in_flags, __from, __fromlen, __msg);
save_stats_rx_os(ret);
return ret;
}
Expand All @@ -1339,7 +1341,7 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov
TAKE_T_RX_START;
#endif

if (unlikely((*p_flags & MSG_WAITALL) && !(*p_flags & MSG_PEEK))) {
if (unlikely((in_flags & MSG_WAITALL) && !(in_flags & MSG_PEEK))) {
total_iov_sz = 0;
for (int i = 0; i < sz_iov; i++) {
total_iov_sz += p_iov[i].iov_len;
Expand Down Expand Up @@ -1384,15 +1386,15 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov
}
si_tcp_logfunc("something in rx queues: %d %p", m_n_rx_pkt_ready_list_count, m_rx_pkt_ready_list.front());

total_rx = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, p_flags);
total_rx = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags);


/*
* RCVBUFF Accounting: Going 'out' of the internal buffer: if some bytes are not tcp_recved yet - do that.
* The packet might not be 'acked' (tcp_recved)
*
*/
if (!(*p_flags & (MSG_PEEK | MSG_VMA_ZCOPY))) {
if (!(in_flags & (MSG_PEEK | MSG_VMA_ZCOPY))) {
m_rcvbuff_current -= total_rx;


Expand Down
27 changes: 14 additions & 13 deletions src/vma/sock/sockinfo_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,8 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
{
int ret;
uint64_t poll_sn;
int out_flags = 0;
int in_flags = *p_flags;

si_udp_logfunc("");

Expand Down Expand Up @@ -1169,7 +1171,7 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
if (m_n_rx_pkt_ready_list_count > 0) {
// Found a ready packet in the list
if (__msg) handle_cmsg(__msg);
ret = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, p_flags);
ret = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags);
goto out;
}
m_lock_rcv.unlock();
Expand All @@ -1180,15 +1182,15 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
* We (probably) do not have a ready packet.
* Wait for RX to become ready.
*/
rx_wait_ret = rx_wait(m_b_blocking && !(*p_flags & MSG_DONTWAIT));
rx_wait_ret = rx_wait(m_b_blocking && !(in_flags & MSG_DONTWAIT));

m_lock_rcv.lock();

if (likely(rx_wait_ret == 0)) {
// Got 0, means we might have a ready packet
if (m_n_rx_pkt_ready_list_count > 0) {
if (__msg) handle_cmsg(__msg);
ret = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, p_flags);
ret = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags);
goto out;
} else {
m_lock_rcv.unlock();
Expand All @@ -1205,7 +1207,7 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
* If we got here, either the socket is not offloaded or rx_wait() returned 1.
*/
os:
if (*p_flags & MSG_VMA_ZCOPY_FORCE) {
if (in_flags & MSG_VMA_ZCOPY_FORCE) {
errno = EIO;
ret = -1;
goto out;
Expand All @@ -1215,8 +1217,8 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
INC_GO_TO_OS_RX_COUNT;
#endif

*p_flags &= ~MSG_VMA_ZCOPY;
ret = socket_fd_api::rx_os(call_type, p_iov, sz_iov, p_flags, __from, __fromlen, __msg);
in_flags &= ~MSG_VMA_ZCOPY;
ret = socket_fd_api::rx_os(call_type, p_iov, sz_iov, &in_flags, __from, __fromlen, __msg);
save_stats_rx_os(ret);
if (ret > 0) {
// This will cause the next non-blocked read to check the OS again.
Expand All @@ -1228,7 +1230,7 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
m_lock_rcv.unlock();

if (__msg)
__msg->msg_flags |= (*p_flags) & MSG_TRUNC;
__msg->msg_flags |= out_flags & MSG_TRUNC;

if (ret < 0) {
#ifdef VMA_TIME_MEASURE
Expand Down Expand Up @@ -2252,16 +2254,15 @@ int sockinfo_udp::zero_copy_rx(iovec *p_iov, mem_buf_desc_t *p_desc, int *p_flag
return total_rx;
}

size_t sockinfo_udp::handle_msg_trunc(size_t total_rx, size_t payload_size, int* p_flags)
size_t sockinfo_udp::handle_msg_trunc(size_t total_rx, size_t payload_size, int in_flags, int* p_out_flags)
{
if (payload_size > total_rx) {
m_rx_ready_byte_count -= (payload_size-total_rx);
m_p_socket_stats->n_rx_ready_byte_count -= (payload_size-total_rx);
if (*p_flags & MSG_TRUNC) return payload_size;
else *p_flags |= MSG_TRUNC;
} else {
*p_flags &= ~MSG_TRUNC;
}
*p_out_flags |= MSG_TRUNC;
if (in_flags & MSG_TRUNC)
return payload_size;
}

return total_rx;
}
Expand Down
2 changes: 1 addition & 1 deletion src/vma/sock/sockinfo_udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class sockinfo_udp : public sockinfo

virtual void post_deqeue (bool release_buff);
virtual int zero_copy_rx (iovec *p_iov, mem_buf_desc_t *pdesc, int *p_flags);
virtual size_t handle_msg_trunc(size_t total_rx, size_t payload_size, int* p_flags);
virtual size_t handle_msg_trunc(size_t total_rx, size_t payload_size, int in_flags, int* p_out_flags);

inline void handle_ip_pktinfo(struct cmsg_state *cm_state);
inline void handle_recv_timestamping(struct cmsg_state *cm_state);
Expand Down

0 comments on commit 0ad1980

Please sign in to comment.