From b0f662fe93f1db0098f7b50306c2f084644788b1 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Thu, 27 Dec 2018 14:51:46 -0800 Subject: [PATCH] vcl/ldp: add write msg function and fine tuning Allows app to push data. Additionally, ensure reset/close replies are not sent unless vcl closes the session. Change-Id: Icbbf933cf57b55cfbcc7b802af0f83919a066f65 Signed-off-by: Florin Coras --- src/vcl/ldp.c | 52 ++++++----------------------------------- src/vcl/vppcom.c | 41 ++++++++++++++++---------------- src/vcl/vppcom.h | 2 ++ src/vnet/sctp/sctp.c | 39 ++++++++++++++++++------------- src/vnet/session/session.c | 21 +++++++++++++---- src/vnet/session/session_node.c | 16 +++++++++++-- src/vnet/tcp/tcp.c | 8 ++++--- src/vnet/tcp/tcp_input.c | 3 ++- test/test_sctp.py | 6 ++--- 9 files changed, 94 insertions(+), 94 deletions(-) diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c index d538770eadb..0d4fe43dd7d 100644 --- a/src/vcl/ldp.c +++ b/src/vcl/ldp.c @@ -391,7 +391,7 @@ close (int fd) errno = -rv; rv = -1; } - if (refcnt == 1) + if (refcnt <= 1) ldp_fd_free_w_sid (sid); } else @@ -511,7 +511,7 @@ write (int fd, const void *buf, size_t nbytes) LDBG (2, "fd %d (0x%x): calling vppcom_session_write(): sid %u (0x%x), " "buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes); - size = vppcom_session_write (sid, (void *) buf, nbytes); + size = vppcom_session_write_msg (sid, (void *) buf, nbytes); if (size < 0) { errno = -size; @@ -533,7 +533,6 @@ write (int fd, const void *buf, size_t nbytes) ssize_t writev (int fd, const struct iovec * iov, int iovcnt) { - const char *func_str; ssize_t size = 0, total = 0; u32 sid = ldp_sid_from_fd (fd); int i, rv = 0; @@ -547,33 +546,19 @@ writev (int fd, const struct iovec * iov, int iovcnt) if (sid != INVALID_SESSION_ID) { - func_str = "vppcom_session_write"; do { for (i = 0; i < iovcnt; ++i) { - if (LDP_DEBUG > 4) - printf ("%s:%d: LDP<%d>: fd %d (0x%x): calling %s() [%d]: " - "sid %u (0x%x), buf %p, nbytes %ld, total %ld", - __func__, __LINE__, getpid (), fd, fd, func_str, - i, sid, sid, iov[i].iov_base, iov[i].iov_len, total); - - rv = vppcom_session_write (sid, iov[i].iov_base, - iov[i].iov_len); + rv = vppcom_session_write_msg (sid, iov[i].iov_base, + iov[i].iov_len); if (rv < 0) break; else { total += rv; if (rv < iov[i].iov_len) - { - if (LDP_DEBUG > 4) - printf ("%s:%d: LDP<%d>: fd %d (0x%x): " - "rv (%d) < iov[%d].iov_len (%ld)", - __func__, __LINE__, getpid (), fd, fd, - rv, i, iov[i].iov_len); - break; - } + break; } } } @@ -589,32 +574,9 @@ writev (int fd, const struct iovec * iov, int iovcnt) } else { - func_str = "libc_writev"; - - if (LDP_DEBUG > 4) - printf ("%s:%d: LDP<%d>: fd %d (0x%x): calling %s(): " - "iov %p, iovcnt %d\n", __func__, __LINE__, getpid (), - fd, fd, func_str, iov, iovcnt); - size = libc_writev (fd, iov, iovcnt); } - if (LDP_DEBUG > 4) - { - if (size < 0) - { - int errno_val = errno; - perror (func_str); - fprintf (stderr, - "%s:%d: LDP<%d>: ERROR: fd %d (0x%x): %s() failed! " - "rv %ld, errno = %d\n", __func__, __LINE__, getpid (), fd, - fd, func_str, size, errno_val); - errno = errno_val; - } - else - printf ("%s:%d: LDP<%d>: fd %d (0x%x): returning %ld\n", - __func__, __LINE__, getpid (), fd, fd, size); - } return size; } @@ -3077,7 +3039,7 @@ ldp_epoll_pwait (int epfd, struct epoll_event *events, int maxevents, return -1; } - time_to_wait = ((timeout >= 0) ? (double) timeout : 0); + time_to_wait = ((timeout >= 0) ? (double) timeout / 1000 : 0); time_out = clib_time_now (&ldpw->clib_time) + time_to_wait; func_str = "vppcom_session_attr[GET_LIBC_EPFD]"; @@ -3128,7 +3090,7 @@ ldp_epoll_pwait (int epfd, struct epoll_event *events, int maxevents, epfd, epfd, func_str, libc_epfd, libc_epfd, events, maxevents, sigmask); - rv = libc_epoll_pwait (libc_epfd, events, maxevents, 1, sigmask); + rv = libc_epoll_pwait (libc_epfd, events, maxevents, 0, sigmask); if (rv != 0) goto done; } diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 86bb21413a4..a94df846402 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -410,7 +410,7 @@ vcl_flag_accepted_session (vcl_session_t * session, u64 handle, u32 flags) accepted_msg = &session->accept_evts_fifo[i]; if (accepted_msg->accepted_msg.handle == handle) { - accepted_msg->flags = flags; + accepted_msg->flags |= flags; return 1; } } @@ -431,8 +431,6 @@ vcl_session_reset_handler (vcl_worker_t * wrk, VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle); return VCL_INVALID_SESSION_INDEX; } - if (session->session_state >= STATE_VPP_CLOSING) - return sid; /* Caught a reset before actually accepting the session */ if (session->session_state == STATE_LISTEN) @@ -446,8 +444,6 @@ vcl_session_reset_handler (vcl_worker_t * wrk, session->session_state = STATE_DISCONNECT; VDBG (0, "reset session %u [0x%llx]", sid, reset_msg->handle); - vcl_send_session_reset_reply (vcl_session_vpp_evt_q (wrk, session), - wrk->my_client_index, reset_msg->handle, 0); return sid; } @@ -509,7 +505,8 @@ vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg) session = vcl_session_get_w_vpp_handle (wrk, msg->handle); if (PREDICT_FALSE (session != 0)) - VWRN ("session handle overlap %lu!", msg->handle); + VWRN ("session overlap handle %lu state %u!", msg->handle, + session->session_state); session = vcl_session_table_lookup_listener (wrk, msg->listener_handle); if (!session) @@ -1119,6 +1116,12 @@ vppcom_session_close (uint32_t session_handle) getpid (), vpp_handle, session_handle, rv, vppcom_retval_str (rv)); } + else if (state == STATE_DISCONNECT) + { + svm_msg_q_t *mq = vcl_session_vpp_evt_q (wrk, session); + vcl_send_session_reset_reply (mq, wrk->my_client_index, + session->vpp_handle, 0); + } } cleanup: @@ -1378,19 +1381,10 @@ handle: */ if (accept_flags) { - svm_msg_q_t *mq = vcl_session_vpp_evt_q (wrk, client_session); if (accept_flags & VCL_ACCEPTED_F_CLOSED) - { - client_session->session_state = STATE_DISCONNECT; - vcl_send_session_disconnected_reply (mq, wrk->my_client_index, - client_session->vpp_handle, 0); - } + client_session->session_state = STATE_VPP_CLOSING; else if (accept_flags & VCL_ACCEPTED_F_RESET) - { - client_session->session_state = STATE_DISCONNECT; - vcl_send_session_reset_reply (mq, wrk->my_client_index, - client_session->vpp_handle, 0); - } + client_session->session_state = STATE_DISCONNECT; } return vcl_session_handle (client_session); } @@ -1524,9 +1518,8 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, session_state_t state = s->session_state; rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN); - VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! " - "state 0x%x (%s), returning %d (%s)", - getpid (), s->vpp_handle, session_handle, state, + VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s)," + " returning %d (%s)", session_handle, s->vpp_handle, state, vppcom_session_state_str (state), rv, vppcom_retval_str (rv)); return rv; } @@ -1831,6 +1824,14 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n) 0 /* is_flush */ ); } +int +vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n) +{ + return vppcom_session_write_inline (session_handle, buf, n, + 1 /* is_flush */ ); +} + + static vcl_session_t * vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type) { diff --git a/src/vcl/vppcom.h b/src/vcl/vppcom.h index 30ab7c4a56a..00527f4ecfb 100644 --- a/src/vcl/vppcom.h +++ b/src/vcl/vppcom.h @@ -239,6 +239,8 @@ extern int vppcom_session_connect (uint32_t session_handle, extern int vppcom_session_read (uint32_t session_handle, void *buf, size_t n); extern int vppcom_session_write (uint32_t session_handle, void *buf, size_t n); +extern int vppcom_session_write_msg (uint32_t session_handle, void *buf, + size_t n); extern int vppcom_select (unsigned long n_bits, unsigned long *read_map, diff --git a/src/vnet/sctp/sctp.c b/src/vnet/sctp/sctp.c index 482f81a1e2a..10ec7700a2c 100644 --- a/src/vnet/sctp/sctp.c +++ b/src/vnet/sctp/sctp.c @@ -23,6 +23,7 @@ sctp_connection_bind (u32 session_index, transport_endpoint_t * tep) sctp_main_t *tm = &sctp_main; sctp_connection_t *listener; void *iface_ip; + u32 mtu = 1460; pool_get (tm->listener_pool, listener); clib_memset (listener, 0, sizeof (*listener)); @@ -43,11 +44,13 @@ sctp_connection_bind (u32 session_index, transport_endpoint_t * tep) ip_copy (&listener->sub_conn[SCTP_PRIMARY_PATH_IDX].connection.lcl_ip, &tep->ip, tep->is_ip4); - u32 mtu = tep->is_ip4 ? vnet_sw_interface_get_mtu (vnet_get_main (), - tep->sw_if_index, - VNET_MTU_IP4) : - vnet_sw_interface_get_mtu (vnet_get_main (), tep->sw_if_index, - VNET_MTU_IP6); + if (tep->sw_if_index != ENDPOINT_INVALID_INDEX) + mtu = tep->is_ip4 ? vnet_sw_interface_get_mtu (vnet_get_main (), + tep->sw_if_index, + VNET_MTU_IP4) : + vnet_sw_interface_get_mtu (vnet_get_main (), tep->sw_if_index, + VNET_MTU_IP6); + listener->sub_conn[SCTP_PRIMARY_PATH_IDX].PMTU = mtu; listener->sub_conn[SCTP_PRIMARY_PATH_IDX].connection.is_ip4 = tep->is_ip4; listener->sub_conn[SCTP_PRIMARY_PATH_IDX].connection.proto = @@ -192,12 +195,13 @@ format_sctp_connection_id (u8 * s, va_list * args) u8 i; for (i = 0; i < MAX_SCTP_CONNECTIONS; i++) { + if (i > 0 && sctp_conn->sub_conn[i].state == SCTP_SUBCONN_STATE_DOWN) + continue; if (sctp_conn->sub_conn[i].connection.is_ip4) { - s = format (s, "%U[#%d][%s] %U:%d->%U:%d", - s, + s = format (s, "[#%d][%s] %U:%d->%U:%d", sctp_conn->sub_conn[i].connection.thread_index, - "T", + "S", format_ip4_address, &sctp_conn->sub_conn[i].connection.lcl_ip.ip4, clib_net_to_host_u16 (sctp_conn->sub_conn[i]. @@ -209,10 +213,9 @@ format_sctp_connection_id (u8 * s, va_list * args) } else { - s = format (s, "%U[#%d][%s] %U:%d->%U:%d", - s, + s = format (s, "[#%d][%s] %U:%d->%U:%d", sctp_conn->sub_conn[i].connection.thread_index, - "T", + "S", format_ip6_address, &sctp_conn->sub_conn[i].connection.lcl_ip.ip6, clib_net_to_host_u16 (sctp_conn->sub_conn[i]. @@ -238,6 +241,8 @@ format_sctp_connection (u8 * s, va_list * args) if (verbose) { s = format (s, "%-15U", format_sctp_state, sctp_conn->state); + if (verbose > 1) + s = format (s, "\n"); } return s; @@ -458,6 +463,7 @@ sctp_connection_open (transport_endpoint_cfg_t * rmt) ip46_address_t lcl_addr; u16 lcl_port; uword thread_id; + u32 mtu = 1460; int rv; u8 idx = SCTP_PRIMARY_PATH_IDX; @@ -484,11 +490,12 @@ sctp_connection_open (transport_endpoint_cfg_t * rmt) clib_spinlock_lock_if_init (&tm->half_open_lock); sctp_conn = sctp_half_open_connection_new (thread_id); - u32 mtu = rmt->is_ip4 ? vnet_sw_interface_get_mtu (vnet_get_main (), - rmt->peer.sw_if_index, - VNET_MTU_IP4) : - vnet_sw_interface_get_mtu (vnet_get_main (), rmt->peer.sw_if_index, - VNET_MTU_IP6); + if (rmt->peer.sw_if_index != ENDPOINT_INVALID_INDEX) + mtu = rmt->is_ip4 ? vnet_sw_interface_get_mtu (vnet_get_main (), + rmt->peer.sw_if_index, + VNET_MTU_IP4) : + vnet_sw_interface_get_mtu (vnet_get_main (), rmt->peer.sw_if_index, + VNET_MTU_IP6); sctp_conn->sub_conn[idx].PMTU = mtu; transport_connection_t *trans_conn = &sctp_conn->sub_conn[idx].connection; diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index cbe0dd76430..d30254e5fc0 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -234,6 +234,7 @@ session_alloc_for_connection (transport_connection_t * tc) s = session_alloc (thread_index); s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4); s->enqueue_epoch = (u64) ~ 0; + s->session_state = SESSION_STATE_CLOSED; /* Attach transport to session and vice versa */ s->connection_index = tc->c_index; @@ -773,6 +774,7 @@ stream_session_accept_notify (transport_connection_t * tc) app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (!app_wrk) return -1; + s->session_state = SESSION_STATE_ACCEPTING; app = application_get (app_wrk->app_index); return app->cb_fns.session_accept_callback (s); } @@ -824,6 +826,7 @@ session_transport_delete_notify (transport_connection_t * tc) switch (s->session_state) { + case SESSION_STATE_ACCEPTING: case SESSION_STATE_TRANSPORT_CLOSING: /* If transport finishes or times out before we get a reply * from the app, mark transport as closed and wait for reply @@ -844,11 +847,13 @@ session_transport_delete_notify (transport_connection_t * tc) s->session_state = SESSION_STATE_TRANSPORT_CLOSED; session_program_transport_close (s); break; + case SESSION_STATE_TRANSPORT_CLOSED: + break; case SESSION_STATE_CLOSED: - case SESSION_STATE_ACCEPTING: session_delete (s); break; default: + clib_warning ("session state %u", s->session_state); session_delete (s); break; } @@ -869,7 +874,16 @@ session_transport_closed_notify (transport_connection_t * tc) if (!(s = session_get_if_valid (tc->s_index, tc->thread_index))) return; - s->session_state = SESSION_STATE_CLOSED; + + /* If app close has not been received or has not yet resulted in + * a transport close, only mark the session transport as closed */ + if (s->session_state <= SESSION_STATE_CLOSING) + { + session_lookup_del_session (s); + s->session_state = SESSION_STATE_TRANSPORT_CLOSED; + } + else + s->session_state = SESSION_STATE_CLOSED; } /** @@ -913,7 +927,6 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index, s->app_wrk_index = app_wrk->wrk_index; s->listener_index = listener_index; - s->session_state = SESSION_STATE_ACCEPTING; /* Shoulder-tap the server */ if (notify) @@ -1141,7 +1154,7 @@ void session_transport_close (stream_session_t * s) { /* If transport is already closed, just free the session */ - if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED) + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED) { session_free_w_fifos (s); return; diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 4323ed83cf8..45018daf45a 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -29,6 +29,7 @@ session_mq_accepted_reply_handler (void *data) { session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data; vnet_disconnect_args_t _a = { 0 }, *a = &_a; + stream_session_state_t old_state; app_worker_t *app_wrk; local_session_t *ls; stream_session_t *s; @@ -64,18 +65,29 @@ session_mq_accepted_reply_handler (void *data) else { s = session_get_from_handle_if_valid (mp->handle); - /* Closed while waiting for app to reply */ - if (!s || s->session_state > SESSION_STATE_READY) + if (!s) return; + app_wrk = app_worker_get (s->app_wrk_index); if (app_wrk->app_index != mp->context) { clib_warning ("app doesn't own session"); return; } + + old_state = s->session_state; s->session_state = SESSION_STATE_READY; if (!svm_fifo_is_empty (s->server_rx_fifo)) app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX); + + /* Closed while waiting for app to reply. Resend disconnect */ + if (old_state >= SESSION_STATE_TRANSPORT_CLOSING) + { + application_t *app = application_get (app_wrk->app_index); + app->cb_fns.session_disconnect_callback (s); + s->session_state = old_state; + return; + } } } diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c index 564f200bc00..7f6a087b2f3 100644 --- a/src/vnet/tcp/tcp.c +++ b/src/vnet/tcp/tcp.c @@ -284,7 +284,8 @@ tcp_connection_reset (tcp_connection_t * tc) break; case TCP_STATE_SYN_SENT: session_stream_connect_notify (&tc->connection, 1 /* fail */ ); - tcp_connection_cleanup (tc); + tcp_connection_set_state (tc, TCP_STATE_CLOSED); + tcp_timer_set (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME); break; case TCP_STATE_ESTABLISHED: tcp_connection_timers_reset (tc); @@ -307,6 +308,7 @@ tcp_connection_reset (tcp_connection_t * tc) tcp_connection_set_state (tc, TCP_STATE_CLOSED); break; case TCP_STATE_CLOSED: + case TCP_STATE_TIME_WAIT: break; default: TCP_DBG ("reset state: %u", tc->state); @@ -1293,9 +1295,9 @@ tcp_timer_waitclose_handler (u32 conn_index) * is closed. We haven't sent everything but we did try. */ tcp_cong_recovery_off (tc); tcp_send_fin (tc); - rto = clib_max (tc->rto >> tc->rto_boff, 1); + rto = clib_max ((tc->rto >> tc->rto_boff) * TCP_TO_TIMER_TICK, 1); tcp_timer_set (tc, TCP_TIMER_WAITCLOSE, - clib_min (rto * TCP_TO_TIMER_TICK, TCP_2MSL_TIME)); + clib_min (rto, TCP_2MSL_TIME)); session_transport_closed_notify (&tc->connection); } else diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c index c3ce2eb1ae1..fb80d7f5759 100644 --- a/src/vnet/tcp/tcp_input.c +++ b/src/vnet/tcp/tcp_input.c @@ -2671,7 +2671,8 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node, is_ip4); if (tmp->state != tc0->state) { - clib_warning ("state changed"); + if (tc0->state != TCP_STATE_CLOSED) + clib_warning ("state changed"); goto drop; } } diff --git a/test/test_sctp.py b/test/test_sctp.py index e4f0bd97416..f30feda111a 100644 --- a/test/test_sctp.py +++ b/test/test_sctp.py @@ -62,13 +62,13 @@ class TestSCTP(VppTestCase): # Start builtin server and client uri = "sctp://" + self.loop0.local_ip4 + "/1234" - error = self.vapi.cli("test echo server appns 0 fifo-size 4 uri " + - uri) + error = self.vapi.cli("test echo server appns 0 fifo-size 4 " + + "no-echo uri " + uri) if error: self.logger.critical(error) self.assertEqual(error.find("failed"), -1) - error = self.vapi.cli("test echo client mbytes 10" + + error = self.vapi.cli("test echo client mbytes 10 no-return " + " appns 1" + " fifo-size 4" + " no-output test-bytes syn-timeout 3" + -- 2.16.6