From: liuyacan Date: Sun, 9 May 2021 03:50:40 +0000 (+0000) Subject: session: support half-close connection X-Git-Tag: v21.10-rc0~85 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=534468e9f768ae7465ef722520dadfd916cdc9fb;p=vpp.git session: support half-close connection Some app(e.g. Envoy) may call shutdown() instead of close() when draining connection. Type: improvement Signed-off-by: liuyacan Change-Id: I9543b9ca3caa87b10b134fd1fc4019124e41e4d2 --- diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c index 64a4e7c77db..f27f6ba8e97 100644 --- a/src/vcl/ldp.c +++ b/src/vcl/ldp.c @@ -2219,6 +2219,8 @@ shutdown (int fd, int how) if (flags == SHUT_RDWR) rv = close (fd); + else if (flags == SHUT_WR) + rv = vls_shutdown (vlsh); } else { diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index 757c0fc45a7..69f492b8694 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -1313,6 +1313,24 @@ vls_close (vls_handle_t vlsh) return rv; } +int +vls_shutdown (vls_handle_t vlsh) +{ + vcl_locked_session_t *vls; + int rv; + + vls_mt_detect (); + if (!(vls = vls_get_w_dlock (vlsh))) + return VPPCOM_EBADFD; + + vls_mt_guard (vls, VLS_MT_OP_SPOOL); + rv = vppcom_session_shutdown (vls_to_sh (vls)); + vls_mt_unguard (); + vls_get_and_unlock (vlsh); + + return rv; +} + vls_handle_t vls_epoll_create (void) { diff --git a/src/vcl/vcl_locked.h b/src/vcl/vcl_locked.h index 11b71eee4af..3adcf62bc77 100644 --- a/src/vcl/vcl_locked.h +++ b/src/vcl/vcl_locked.h @@ -26,6 +26,7 @@ typedef int vls_handle_t; vls_handle_t vls_create (uint8_t proto, uint8_t is_nonblocking); +int vls_shutdown (vls_handle_t vlsh); int vls_close (vls_handle_t vlsh); int vls_bind (vls_handle_t vlsh, vppcom_endpt_t * ep); int vls_listen (vls_handle_t vlsh, int q_len); diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 6060ef82357..956f077b880 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -137,6 +137,7 @@ typedef enum vcl_session_flags_ VCL_SESSION_F_IS_VEP = 1 << 1, VCL_SESSION_F_IS_VEP_SESSION = 1 << 2, VCL_SESSION_F_HAS_RX_EVT = 1 << 3, + VCL_SESSION_F_SHUTDOWN = 1 << 4, } __clib_packed vcl_session_flags_t; typedef struct vcl_session_ diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 96a207b741f..0713a7b2d33 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -259,6 +259,23 @@ vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s) app_send_ctrl_evt_to_vpp (mq, app_evt); } +static void +vcl_send_session_shutdown (vcl_worker_t *wrk, vcl_session_t *s) +{ + app_session_evt_t _app_evt, *app_evt = &_app_evt; + session_shutdown_msg_t *mp; + svm_msg_q_t *mq; + + /* Send to thread that owns the session */ + mq = s->vpp_evt_q; + app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_SHUTDOWN); + mp = (session_shutdown_msg_t *) app_evt->evt->data; + memset (mp, 0, sizeof (*mp)); + mp->client_index = wrk->api_client_handle; + mp->handle = s->vpp_handle; + app_send_ctrl_evt_to_vpp (mq, app_evt); +} + static void vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s) { @@ -789,6 +806,42 @@ vcl_session_disconnected_handler (vcl_worker_t * wrk, return session; } +int +vppcom_session_shutdown (uint32_t session_handle) +{ + vcl_worker_t *wrk = vcl_worker_get_current (); + vcl_session_t *session; + vcl_session_state_t state; + u64 vpp_handle; + + session = vcl_session_get_w_handle (wrk, session_handle); + if (PREDICT_FALSE (!session)) + return VPPCOM_EBADFD; + + vpp_handle = session->vpp_handle; + state = session->session_state; + + VDBG (1, "session %u [0x%llx] state 0x%x (%s)", session->session_index, + vpp_handle, state, vppcom_session_state_str (state)); + + if (PREDICT_FALSE (state == VCL_STATE_LISTEN)) + { + VDBG (0, "ERROR: Cannot shutdown a listen socket!"); + return VPPCOM_EBADFD; + } + + if (PREDICT_TRUE (state == VCL_STATE_READY)) + { + VDBG (1, "session %u [0x%llx]: sending shutdown...", + session->session_index, vpp_handle); + + vcl_send_session_shutdown (wrk, session); + session->flags |= VCL_SESSION_F_SHUTDOWN; + } + + return VPPCOM_OK; +} + static int vppcom_session_disconnect (u32 session_handle) { @@ -2101,7 +2154,8 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf, return VPPCOM_EBADFD; } - if (PREDICT_FALSE (!vcl_session_is_open (s))) + if (PREDICT_FALSE (!vcl_session_is_open (s) || + s->flags & VCL_SESSION_F_SHUTDOWN)) { VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)", s->session_index, s->vpp_handle, s->session_state, diff --git a/src/vcl/vppcom.h b/src/vcl/vppcom.h index ae4888566c7..72e5d46bc8f 100644 --- a/src/vcl/vppcom.h +++ b/src/vcl/vppcom.h @@ -172,6 +172,7 @@ extern int vppcom_app_create (const char *app_name); extern void vppcom_app_destroy (void); extern int vppcom_session_create (uint8_t proto, uint8_t is_nonblocking); +extern int vppcom_session_shutdown (uint32_t session_handle); extern int vppcom_session_close (uint32_t session_handle); extern int vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep); extern int vppcom_session_listen (uint32_t session_handle, uint32_t q_len); diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 8abec7797ec..83106ef172e 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -1384,6 +1384,27 @@ vnet_unlisten (vnet_unlisten_args_t * a) return app_worker_stop_listen (app_wrk, al); } +int +vnet_shutdown_session (vnet_shutdown_args_t *a) +{ + app_worker_t *app_wrk; + session_t *s; + + s = session_get_from_handle_if_valid (a->handle); + if (!s) + return SESSION_E_NOSESSION; + + app_wrk = app_worker_get (s->app_wrk_index); + if (app_wrk->app_index != a->app_index) + return SESSION_E_OWNER; + + /* We're peeking into another's thread pool. Make sure */ + ASSERT (s->session_index == session_index_from_handle (a->handle)); + + session_half_close (s); + return 0; +} + int vnet_disconnect_session (vnet_disconnect_args_t * a) { diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 961561547d7..d3bfb3b03dd 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -141,6 +141,12 @@ typedef struct _vnet_connect_args u32 api_context; } vnet_connect_args_t; +typedef struct _vnet_shutdown_args_t +{ + session_handle_t handle; + u32 app_index; +} vnet_shutdown_args_t; + typedef struct _vnet_disconnect_args_t { session_handle_t handle; @@ -266,6 +272,7 @@ int vnet_application_detach (vnet_app_detach_args_t * a); int vnet_listen (vnet_listen_args_t * a); int vnet_connect (vnet_connect_args_t * a); int vnet_unlisten (vnet_unlisten_args_t * a); +int vnet_shutdown_session (vnet_shutdown_args_t *a); int vnet_disconnect_session (vnet_disconnect_args_t * a); int vnet_app_add_cert_key_pair (vnet_app_add_cert_key_pair_args_t * a); @@ -426,6 +433,13 @@ typedef struct session_connected_msg_ transport_endpoint_t lcl; } __clib_packed session_connected_msg_t; +typedef struct session_shutdown_msg_ +{ + u32 client_index; + u32 context; + session_handle_t handle; +} __clib_packed session_shutdown_msg_t; + typedef struct session_disconnect_msg_ { u32 client_index; diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 1fac5ed2bb9..eba9f64ca22 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -97,9 +97,10 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index, int session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type) { - /* only events supported are disconnect and reset */ - ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE - || evt_type == SESSION_CTRL_EVT_RESET); + /* only events supported are disconnect, shutdown and reset */ + ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE || + evt_type == SESSION_CTRL_EVT_HALF_CLOSE || + evt_type == SESSION_CTRL_EVT_RESET); return session_send_evt_to_thread (s, 0, s->thread_index, evt_type); } @@ -1087,7 +1088,9 @@ session_transport_closed_notify (transport_connection_t * tc) return; /* Transport thinks that app requested close but it actually didn't. - * Can happen for tcp if fin and rst are received in close succession. */ + * Can happen for tcp: + * 1)if fin and rst are received in close succession. + * 2)if app shutdown the connection. */ if (s->session_state == SESSION_STATE_READY) { session_transport_closing_notify (tc); @@ -1398,6 +1401,20 @@ session_stop_listen (session_t * s) return 0; } +/** + * Initialize session half-closing procedure. + * + * Note that half-closing will not change the state of the session. + */ +void +session_half_close (session_t *s) +{ + if (!s) + return; + + session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE); +} + /** * Initialize session closing procedure. * @@ -1438,6 +1455,24 @@ session_reset (session_t * s) session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET); } +/** + * Notify transport the session can be half-disconnected. + * + * Must be called from the session's thread. + */ +void +session_transport_half_close (session_t *s) +{ + /* Only READY session can be half-closed */ + if (s->session_state != SESSION_STATE_READY) + { + return; + } + + transport_half_close (session_get_transport_proto (s), s->connection_index, + s->thread_index); +} + /** * Notify transport the session can be disconnected. This should eventually * result in a delete notification that allows us to cleanup session state. diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 1a59d7df403..17a870dfcf0 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -455,8 +455,10 @@ session_clone_safe (u32 session_index, u32 thread_index) int session_open (u32 app_index, session_endpoint_t * tep, u32 opaque); int session_listen (session_t * s, session_endpoint_cfg_t * sep); int session_stop_listen (session_t * s); +void session_half_close (session_t *s); void session_close (session_t * s); void session_reset (session_t * s); +void session_transport_half_close (session_t *s); void session_transport_close (session_t * s); void session_transport_reset (session_t * s); void session_transport_cleanup (session_t * s); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index d30df33a5e7..b68ff53dd7a 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -194,6 +194,22 @@ session_mq_connect_uri_handler (void *data) } } +static void +session_mq_shutdown_handler (void *data) +{ + session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data; + vnet_shutdown_args_t _a, *a = &_a; + application_t *app; + + app = application_lookup (mp->client_index); + if (!app) + return; + + a->app_index = app->app_index; + a->handle = mp->handle; + vnet_shutdown_session (a); +} + static void session_mq_disconnect_handler (void *data) { @@ -1287,6 +1303,12 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) fp = e->rpc_args.fp; (*fp) (e->rpc_args.arg); break; + case SESSION_CTRL_EVT_HALF_CLOSE: + s = session_get_from_handle_if_valid (e->session_handle); + if (PREDICT_FALSE (!s)) + break; + session_transport_half_close (s); + break; case SESSION_CTRL_EVT_CLOSE: s = session_get_from_handle_if_valid (e->session_handle); if (PREDICT_FALSE (!s)) @@ -1314,6 +1336,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) case SESSION_CTRL_EVT_CONNECT_URI: session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt)); break; + case SESSION_CTRL_EVT_SHUTDOWN: + session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt)); + break; case SESSION_CTRL_EVT_DISCONNECT: session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt)); break; diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 2c3db5f18b5..821aac9394d 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -331,6 +331,7 @@ typedef enum SESSION_IO_EVT_BUILTIN_RX, SESSION_IO_EVT_BUILTIN_TX, SESSION_CTRL_EVT_RPC, + SESSION_CTRL_EVT_HALF_CLOSE, SESSION_CTRL_EVT_CLOSE, SESSION_CTRL_EVT_RESET, SESSION_CTRL_EVT_BOUND, @@ -344,6 +345,7 @@ typedef enum SESSION_CTRL_EVT_REQ_WORKER_UPDATE, SESSION_CTRL_EVT_WORKER_UPDATE, SESSION_CTRL_EVT_WORKER_UPDATE_REPLY, + SESSION_CTRL_EVT_SHUTDOWN, SESSION_CTRL_EVT_DISCONNECT, SESSION_CTRL_EVT_CONNECT, SESSION_CTRL_EVT_CONNECT_URI, @@ -371,6 +373,7 @@ typedef enum _ (CONNECT, connect) \ _ (CONNECT_URI, connect_uri) \ _ (CONNECTED, connected) \ + _ (SHUTDOWN, shutdown) \ _ (DISCONNECT, disconnect) \ _ (DISCONNECTED, disconnected) \ _ (DISCONNECTED_REPLY, disconnected_reply) \ diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index 18ae3ca5188..2c88a4c4931 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -317,6 +317,13 @@ transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep) return tp_vfts[tp].connect (tep); } +void +transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index) +{ + if (tp_vfts[tp].half_close) + tp_vfts[tp].half_close (conn_index, thread_index); +} + void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index) { diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index 67583d23be0..447552c539e 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -74,6 +74,7 @@ typedef struct _transport_proto_vft u32 (*start_listen) (u32 session_index, transport_endpoint_t * lcl); u32 (*stop_listen) (u32 conn_index); int (*connect) (transport_endpoint_cfg_t * rmt); + void (*half_close) (u32 conn_index, u32 thread_index); void (*close) (u32 conn_index, u32 thread_index); void (*reset) (u32 conn_index, u32 thread_index); void (*cleanup) (u32 conn_index, u32 thread_index); @@ -134,6 +135,8 @@ do { \ } while (0) int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep); +void transport_half_close (transport_proto_t tp, u32 conn_index, + u8 thread_index); void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index); void transport_reset (transport_proto_t tp, u32 conn_index, u8 thread_index); u32 transport_start_listen (transport_proto_t tp, u32 session_index, diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c index 90b832cd73d..16bf9451709 100644 --- a/src/vnet/tcp/tcp.c +++ b/src/vnet/tcp/tcp.c @@ -354,7 +354,6 @@ tcp_program_cleanup (tcp_worker_ctx_t * wrk, tcp_connection_t * tc) * 2) TIME_WAIT (active close) whereby after 2MSL the 2MSL timer triggers * and cleanup is called. * - * N.B. Half-close connections are not supported */ void tcp_connection_close (tcp_connection_t * tc) @@ -425,6 +424,30 @@ tcp_connection_close (tcp_connection_t * tc) } } +static void +tcp_session_half_close (u32 conn_index, u32 thread_index) +{ + tcp_worker_ctx_t *wrk; + tcp_connection_t *tc; + + tc = tcp_connection_get (conn_index, thread_index); + wrk = tcp_get_worker (tc->c_thread_index); + + /* If the connection is not in ESTABLISHED state, ignore it */ + if (tc->state != TCP_STATE_ESTABLISHED) + return; + if (!transport_max_tx_dequeue (&tc->connection)) + tcp_send_fin (tc); + else + tc->flags |= TCP_CONN_FINPNDG; + tcp_connection_set_state (tc, TCP_STATE_FIN_WAIT_1); + /* Set a timer in case the peer stops responding. Otherwise the + * connection will be stuck here forever. */ + ASSERT (tc->timers[TCP_TIMER_WAITCLOSE] == TCP_TIMER_HANDLE_INVALID); + tcp_timer_set (&wrk->timer_wheel, tc, TCP_TIMER_WAITCLOSE, + tcp_cfg.finwait1_time); +} + static void tcp_session_close (u32 conn_index, u32 thread_index) { @@ -1316,6 +1339,7 @@ const static transport_proto_vft_t tcp_proto = { .get_half_open = tcp_half_open_session_get_transport, .attribute = tcp_session_attribute, .connect = tcp_session_open, + .half_close = tcp_session_half_close, .close = tcp_session_close, .cleanup = tcp_session_cleanup, .cleanup_ho = tcp_session_cleanup_ho,