session: support half-close connection 61/32261/4
authorliuyacan <liuyacan@corp.netease.com>
Sun, 9 May 2021 03:50:40 +0000 (03:50 +0000)
committerFlorin Coras <florin.coras@gmail.com>
Wed, 12 May 2021 04:45:07 +0000 (04:45 +0000)
Some app(e.g. Envoy) may call shutdown() instead of close() when
draining connection.

Type: improvement

Signed-off-by: liuyacan <liuyacan@corp.netease.com>
Change-Id: I9543b9ca3caa87b10b134fd1fc4019124e41e4d2

15 files changed:
src/vcl/ldp.c
src/vcl/vcl_locked.c
src/vcl/vcl_locked.h
src/vcl/vcl_private.h
src/vcl/vppcom.c
src/vcl/vppcom.h
src/vnet/session/application.c
src/vnet/session/application_interface.h
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c
src/vnet/session/session_types.h
src/vnet/session/transport.c
src/vnet/session/transport.h
src/vnet/tcp/tcp.c

index 64a4e7c..f27f6ba 100644 (file)
@@ -2219,6 +2219,8 @@ shutdown (int fd, int how)
 
       if (flags == SHUT_RDWR)
        rv = close (fd);
+      else if (flags == SHUT_WR)
+       rv = vls_shutdown (vlsh);
     }
   else
     {
index 757c0fc..69f492b 100644 (file)
@@ -1313,6 +1313,24 @@ vls_close (vls_handle_t vlsh)
   return rv;
 }
 
+int
+vls_shutdown (vls_handle_t vlsh)
+{
+  vcl_locked_session_t *vls;
+  int rv;
+
+  vls_mt_detect ();
+  if (!(vls = vls_get_w_dlock (vlsh)))
+    return VPPCOM_EBADFD;
+
+  vls_mt_guard (vls, VLS_MT_OP_SPOOL);
+  rv = vppcom_session_shutdown (vls_to_sh (vls));
+  vls_mt_unguard ();
+  vls_get_and_unlock (vlsh);
+
+  return rv;
+}
+
 vls_handle_t
 vls_epoll_create (void)
 {
index 11b71ee..3adcf62 100644 (file)
@@ -26,6 +26,7 @@
 typedef int vls_handle_t;
 
 vls_handle_t vls_create (uint8_t proto, uint8_t is_nonblocking);
+int vls_shutdown (vls_handle_t vlsh);
 int vls_close (vls_handle_t vlsh);
 int vls_bind (vls_handle_t vlsh, vppcom_endpt_t * ep);
 int vls_listen (vls_handle_t vlsh, int q_len);
index 6060ef8..956f077 100644 (file)
@@ -137,6 +137,7 @@ typedef enum vcl_session_flags_
   VCL_SESSION_F_IS_VEP = 1 << 1,
   VCL_SESSION_F_IS_VEP_SESSION = 1 << 2,
   VCL_SESSION_F_HAS_RX_EVT = 1 << 3,
+  VCL_SESSION_F_SHUTDOWN = 1 << 4,
 } __clib_packed vcl_session_flags_t;
 
 typedef struct vcl_session_
index 96a207b..0713a7b 100644 (file)
@@ -259,6 +259,23 @@ vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
   app_send_ctrl_evt_to_vpp (mq, app_evt);
 }
 
+static void
+vcl_send_session_shutdown (vcl_worker_t *wrk, vcl_session_t *s)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_shutdown_msg_t *mp;
+  svm_msg_q_t *mq;
+
+  /* Send to thread that owns the session */
+  mq = s->vpp_evt_q;
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_SHUTDOWN);
+  mp = (session_shutdown_msg_t *) app_evt->evt->data;
+  memset (mp, 0, sizeof (*mp));
+  mp->client_index = wrk->api_client_handle;
+  mp->handle = s->vpp_handle;
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
 static void
 vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s)
 {
@@ -789,6 +806,42 @@ vcl_session_disconnected_handler (vcl_worker_t * wrk,
   return session;
 }
 
+int
+vppcom_session_shutdown (uint32_t session_handle)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  vcl_session_t *session;
+  vcl_session_state_t state;
+  u64 vpp_handle;
+
+  session = vcl_session_get_w_handle (wrk, session_handle);
+  if (PREDICT_FALSE (!session))
+    return VPPCOM_EBADFD;
+
+  vpp_handle = session->vpp_handle;
+  state = session->session_state;
+
+  VDBG (1, "session %u [0x%llx] state 0x%x (%s)", session->session_index,
+       vpp_handle, state, vppcom_session_state_str (state));
+
+  if (PREDICT_FALSE (state == VCL_STATE_LISTEN))
+    {
+      VDBG (0, "ERROR: Cannot shutdown a listen socket!");
+      return VPPCOM_EBADFD;
+    }
+
+  if (PREDICT_TRUE (state == VCL_STATE_READY))
+    {
+      VDBG (1, "session %u [0x%llx]: sending shutdown...",
+           session->session_index, vpp_handle);
+
+      vcl_send_session_shutdown (wrk, session);
+      session->flags |= VCL_SESSION_F_SHUTDOWN;
+    }
+
+  return VPPCOM_OK;
+}
+
 static int
 vppcom_session_disconnect (u32 session_handle)
 {
@@ -2101,7 +2154,8 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
       return VPPCOM_EBADFD;
     }
 
-  if (PREDICT_FALSE (!vcl_session_is_open (s)))
+  if (PREDICT_FALSE (!vcl_session_is_open (s) ||
+                    s->flags & VCL_SESSION_F_SHUTDOWN))
     {
       VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
            s->session_index, s->vpp_handle, s->session_state,
index ae48885..72e5d46 100644 (file)
@@ -172,6 +172,7 @@ extern int vppcom_app_create (const char *app_name);
 extern void vppcom_app_destroy (void);
 
 extern int vppcom_session_create (uint8_t proto, uint8_t is_nonblocking);
+extern int vppcom_session_shutdown (uint32_t session_handle);
 extern int vppcom_session_close (uint32_t session_handle);
 extern int vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep);
 extern int vppcom_session_listen (uint32_t session_handle, uint32_t q_len);
index 8abec77..83106ef 100644 (file)
@@ -1384,6 +1384,27 @@ vnet_unlisten (vnet_unlisten_args_t * a)
   return app_worker_stop_listen (app_wrk, al);
 }
 
+int
+vnet_shutdown_session (vnet_shutdown_args_t *a)
+{
+  app_worker_t *app_wrk;
+  session_t *s;
+
+  s = session_get_from_handle_if_valid (a->handle);
+  if (!s)
+    return SESSION_E_NOSESSION;
+
+  app_wrk = app_worker_get (s->app_wrk_index);
+  if (app_wrk->app_index != a->app_index)
+    return SESSION_E_OWNER;
+
+  /* We're peeking into another's thread pool. Make sure */
+  ASSERT (s->session_index == session_index_from_handle (a->handle));
+
+  session_half_close (s);
+  return 0;
+}
+
 int
 vnet_disconnect_session (vnet_disconnect_args_t * a)
 {
index 9615615..d3bfb3b 100644 (file)
@@ -141,6 +141,12 @@ typedef struct _vnet_connect_args
   u32 api_context;
 } vnet_connect_args_t;
 
+typedef struct _vnet_shutdown_args_t
+{
+  session_handle_t handle;
+  u32 app_index;
+} vnet_shutdown_args_t;
+
 typedef struct _vnet_disconnect_args_t
 {
   session_handle_t handle;
@@ -266,6 +272,7 @@ int vnet_application_detach (vnet_app_detach_args_t * a);
 int vnet_listen (vnet_listen_args_t * a);
 int vnet_connect (vnet_connect_args_t * a);
 int vnet_unlisten (vnet_unlisten_args_t * a);
+int vnet_shutdown_session (vnet_shutdown_args_t *a);
 int vnet_disconnect_session (vnet_disconnect_args_t * a);
 
 int vnet_app_add_cert_key_pair (vnet_app_add_cert_key_pair_args_t * a);
@@ -426,6 +433,13 @@ typedef struct session_connected_msg_
   transport_endpoint_t lcl;
 } __clib_packed session_connected_msg_t;
 
+typedef struct session_shutdown_msg_
+{
+  u32 client_index;
+  u32 context;
+  session_handle_t handle;
+} __clib_packed session_shutdown_msg_t;
+
 typedef struct session_disconnect_msg_
 {
   u32 client_index;
index 1fac5ed..eba9f64 100644 (file)
@@ -97,9 +97,10 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
 int
 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
 {
-  /* only events supported are disconnect and reset */
-  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
-         || evt_type == SESSION_CTRL_EVT_RESET);
+  /* only events supported are disconnect, shutdown and reset */
+  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
+         evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
+         evt_type == SESSION_CTRL_EVT_RESET);
   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
 }
 
@@ -1087,7 +1088,9 @@ session_transport_closed_notify (transport_connection_t * tc)
     return;
 
   /* Transport thinks that app requested close but it actually didn't.
-   * Can happen for tcp if fin and rst are received in close succession. */
+   * Can happen for tcp:
+   * 1)if fin and rst are received in close succession.
+   * 2)if app shutdown the connection.  */
   if (s->session_state == SESSION_STATE_READY)
     {
       session_transport_closing_notify (tc);
@@ -1398,6 +1401,20 @@ session_stop_listen (session_t * s)
   return 0;
 }
 
+/**
+ * Initialize session half-closing procedure.
+ *
+ * Note that half-closing will not change the state of the session.
+ */
+void
+session_half_close (session_t *s)
+{
+  if (!s)
+    return;
+
+  session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
+}
+
 /**
  * Initialize session closing procedure.
  *
@@ -1438,6 +1455,24 @@ session_reset (session_t * s)
   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
 }
 
+/**
+ * Notify transport the session can be half-disconnected.
+ *
+ * Must be called from the session's thread.
+ */
+void
+session_transport_half_close (session_t *s)
+{
+  /* Only READY session can be half-closed */
+  if (s->session_state != SESSION_STATE_READY)
+    {
+      return;
+    }
+
+  transport_half_close (session_get_transport_proto (s), s->connection_index,
+                       s->thread_index);
+}
+
 /**
  * Notify transport the session can be disconnected. This should eventually
  * result in a delete notification that allows us to cleanup session state.
index 1a59d7d..17a870d 100644 (file)
@@ -455,8 +455,10 @@ session_clone_safe (u32 session_index, u32 thread_index)
 int session_open (u32 app_index, session_endpoint_t * tep, u32 opaque);
 int session_listen (session_t * s, session_endpoint_cfg_t * sep);
 int session_stop_listen (session_t * s);
+void session_half_close (session_t *s);
 void session_close (session_t * s);
 void session_reset (session_t * s);
+void session_transport_half_close (session_t *s);
 void session_transport_close (session_t * s);
 void session_transport_reset (session_t * s);
 void session_transport_cleanup (session_t * s);
index d30df33..b68ff53 100644 (file)
@@ -194,6 +194,22 @@ session_mq_connect_uri_handler (void *data)
     }
 }
 
+static void
+session_mq_shutdown_handler (void *data)
+{
+  session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
+  vnet_shutdown_args_t _a, *a = &_a;
+  application_t *app;
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  a->app_index = app->app_index;
+  a->handle = mp->handle;
+  vnet_shutdown_session (a);
+}
+
 static void
 session_mq_disconnect_handler (void *data)
 {
@@ -1287,6 +1303,12 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
       fp = e->rpc_args.fp;
       (*fp) (e->rpc_args.arg);
       break;
+    case SESSION_CTRL_EVT_HALF_CLOSE:
+      s = session_get_from_handle_if_valid (e->session_handle);
+      if (PREDICT_FALSE (!s))
+       break;
+      session_transport_half_close (s);
+      break;
     case SESSION_CTRL_EVT_CLOSE:
       s = session_get_from_handle_if_valid (e->session_handle);
       if (PREDICT_FALSE (!s))
@@ -1314,6 +1336,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
     case SESSION_CTRL_EVT_CONNECT_URI:
       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
       break;
+    case SESSION_CTRL_EVT_SHUTDOWN:
+      session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
+      break;
     case SESSION_CTRL_EVT_DISCONNECT:
       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
       break;
index 2c3db5f..821aac9 100644 (file)
@@ -331,6 +331,7 @@ typedef enum
   SESSION_IO_EVT_BUILTIN_RX,
   SESSION_IO_EVT_BUILTIN_TX,
   SESSION_CTRL_EVT_RPC,
+  SESSION_CTRL_EVT_HALF_CLOSE,
   SESSION_CTRL_EVT_CLOSE,
   SESSION_CTRL_EVT_RESET,
   SESSION_CTRL_EVT_BOUND,
@@ -344,6 +345,7 @@ typedef enum
   SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
   SESSION_CTRL_EVT_WORKER_UPDATE,
   SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
+  SESSION_CTRL_EVT_SHUTDOWN,
   SESSION_CTRL_EVT_DISCONNECT,
   SESSION_CTRL_EVT_CONNECT,
   SESSION_CTRL_EVT_CONNECT_URI,
@@ -371,6 +373,7 @@ typedef enum
   _ (CONNECT, connect)                                                        \
   _ (CONNECT_URI, connect_uri)                                                \
   _ (CONNECTED, connected)                                                    \
+  _ (SHUTDOWN, shutdown)                                                      \
   _ (DISCONNECT, disconnect)                                                  \
   _ (DISCONNECTED, disconnected)                                              \
   _ (DISCONNECTED_REPLY, disconnected_reply)                                  \
index 18ae3ca..2c88a4c 100644 (file)
@@ -317,6 +317,13 @@ transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep)
   return tp_vfts[tp].connect (tep);
 }
 
+void
+transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
+{
+  if (tp_vfts[tp].half_close)
+    tp_vfts[tp].half_close (conn_index, thread_index);
+}
+
 void
 transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
 {
index 67583d2..447552c 100644 (file)
@@ -74,6 +74,7 @@ typedef struct _transport_proto_vft
   u32 (*start_listen) (u32 session_index, transport_endpoint_t * lcl);
   u32 (*stop_listen) (u32 conn_index);
   int (*connect) (transport_endpoint_cfg_t * rmt);
+  void (*half_close) (u32 conn_index, u32 thread_index);
   void (*close) (u32 conn_index, u32 thread_index);
   void (*reset) (u32 conn_index, u32 thread_index);
   void (*cleanup) (u32 conn_index, u32 thread_index);
@@ -134,6 +135,8 @@ do {                                                                \
 } while (0)
 
 int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep);
+void transport_half_close (transport_proto_t tp, u32 conn_index,
+                          u8 thread_index);
 void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index);
 void transport_reset (transport_proto_t tp, u32 conn_index, u8 thread_index);
 u32 transport_start_listen (transport_proto_t tp, u32 session_index,
index 90b832c..16bf945 100644 (file)
@@ -354,7 +354,6 @@ tcp_program_cleanup (tcp_worker_ctx_t * wrk, tcp_connection_t * tc)
  * 2) TIME_WAIT (active close) whereby after 2MSL the 2MSL timer triggers
  * and cleanup is called.
  *
- * N.B. Half-close connections are not supported
  */
 void
 tcp_connection_close (tcp_connection_t * tc)
@@ -425,6 +424,30 @@ tcp_connection_close (tcp_connection_t * tc)
     }
 }
 
+static void
+tcp_session_half_close (u32 conn_index, u32 thread_index)
+{
+  tcp_worker_ctx_t *wrk;
+  tcp_connection_t *tc;
+
+  tc = tcp_connection_get (conn_index, thread_index);
+  wrk = tcp_get_worker (tc->c_thread_index);
+
+  /* If the connection is not in ESTABLISHED state, ignore it */
+  if (tc->state != TCP_STATE_ESTABLISHED)
+    return;
+  if (!transport_max_tx_dequeue (&tc->connection))
+    tcp_send_fin (tc);
+  else
+    tc->flags |= TCP_CONN_FINPNDG;
+  tcp_connection_set_state (tc, TCP_STATE_FIN_WAIT_1);
+  /* Set a timer in case the peer stops responding. Otherwise the
+   * connection will be stuck here forever. */
+  ASSERT (tc->timers[TCP_TIMER_WAITCLOSE] == TCP_TIMER_HANDLE_INVALID);
+  tcp_timer_set (&wrk->timer_wheel, tc, TCP_TIMER_WAITCLOSE,
+                tcp_cfg.finwait1_time);
+}
+
 static void
 tcp_session_close (u32 conn_index, u32 thread_index)
 {
@@ -1316,6 +1339,7 @@ const static transport_proto_vft_t tcp_proto = {
   .get_half_open = tcp_half_open_session_get_transport,
   .attribute = tcp_session_attribute,
   .connect = tcp_session_open,
+  .half_close = tcp_session_half_close,
   .close = tcp_session_close,
   .cleanup = tcp_session_cleanup,
   .cleanup_ho = tcp_session_cleanup_ho,