session: postpone ct cleanups 11/34411/4
authorFlorin Coras <fcoras@cisco.com>
Tue, 9 Nov 2021 16:38:24 +0000 (08:38 -0800)
committerFlorin Coras <florin.coras@gmail.com>
Tue, 9 Nov 2021 18:33:28 +0000 (18:33 +0000)
Add infra to postpone cleanups while tx events are not delivered.

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I7611ac2442116f71a229569a7e274eb58eb84546

src/vnet/session/application_local.c

index 0abf03d..88536dc 100644 (file)
@@ -41,9 +41,16 @@ typedef struct ct_segments_
   ct_segment_t *segments;
 } ct_segments_ctx_t;
 
+typedef struct ct_cleanup_req_
+{
+  u32 ct_index;
+} ct_cleanup_req_t;
+
 typedef struct ct_main_
 {
   ct_connection_t **connections;       /**< Per-worker connection pools */
+  ct_cleanup_req_t **pending_cleanups;
+  u8 *heave_cleanups;
   u32 n_workers;                       /**< Number of vpp workers */
   u32 n_sessions;                      /**< Cumulative sessions counter */
   u32 *ho_reusable;                    /**< Vector of reusable ho indices */
@@ -864,31 +871,19 @@ global_scope:
 }
 
 static void
-ct_session_close (u32 ct_index, u32 thread_index)
+ct_session_postponed_cleanup (ct_connection_t *ct)
 {
-  ct_connection_t *ct, *peer_ct;
   app_worker_t *app_wrk;
   session_t *s;
 
-  ct = ct_connection_get (ct_index, thread_index);
   s = session_get (ct->c_s_index, ct->c_thread_index);
-  peer_ct = ct_connection_get (ct->peer_index, thread_index);
-  if (peer_ct)
-    {
-      peer_ct->peer_index = ~0;
-      /* Make sure session was allocated */
-      if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
-       {
-         ct_session_connect_notify (s, SESSION_E_REFUSED);
-       }
-      else if (peer_ct->c_s_index != ~0)
-       session_transport_closing_notify (&peer_ct->connection);
-      else
-       ct_connection_free (peer_ct);
-    }
+  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
 
   if (ct->flags & CT_CONN_F_CLIENT)
     {
+      if (app_wrk)
+       app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
+
       /* Normal free for client session as the fifos are allocated through
        * the connects segment manager in a segment that's not shared with
        * the server */
@@ -901,7 +896,10 @@ ct_session_close (u32 ct_index, u32 thread_index)
        * segment manager cleanups and notifications */
       app_wrk = app_worker_get_if_valid (s->app_wrk_index);
       if (app_wrk)
-       app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
+       {
+         app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
+         app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
+       }
 
       ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
       session_free (s);
@@ -910,6 +908,93 @@ ct_session_close (u32 ct_index, u32 thread_index)
   ct_connection_free (ct);
 }
 
+static void
+ct_handle_cleanups (void *args)
+{
+  uword thread_index = pointer_to_uword (args);
+  const u32 max_cleanups = 100;
+  ct_main_t *cm = &ct_main;
+  ct_cleanup_req_t *req;
+  ct_connection_t *ct;
+  u32 n_to_handle = 0;
+  session_t *s;
+
+  cm->heave_cleanups[thread_index] = 0;
+  n_to_handle = clib_fifo_elts (cm->pending_cleanups[thread_index]);
+  n_to_handle = clib_min (n_to_handle, max_cleanups);
+
+  while (n_to_handle)
+    {
+      clib_fifo_sub2 (cm->pending_cleanups[thread_index], req);
+      ct = ct_connection_get (req->ct_index, thread_index);
+      s = session_get (ct->c_s_index, ct->c_thread_index);
+      if (!svm_fifo_has_event (s->tx_fifo))
+       ct_session_postponed_cleanup (ct);
+      else
+       clib_fifo_add1 (cm->pending_cleanups[thread_index], *req);
+      n_to_handle -= 1;
+    }
+
+  if (clib_fifo_elts (cm->pending_cleanups[thread_index]))
+    {
+      cm->heave_cleanups[thread_index] = 1;
+      session_send_rpc_evt_to_thread_force (
+       thread_index, ct_handle_cleanups,
+       uword_to_pointer (thread_index, void *));
+    }
+}
+
+static void
+ct_program_cleanup (ct_connection_t *ct)
+{
+  ct_main_t *cm = &ct_main;
+  ct_cleanup_req_t *req;
+  uword thread_index;
+
+  thread_index = ct->c_thread_index;
+  clib_fifo_add2 (cm->pending_cleanups[thread_index], req);
+  req->ct_index = ct->c_c_index;
+
+  if (cm->heave_cleanups[thread_index])
+    return;
+
+  cm->heave_cleanups[thread_index] = 1;
+  session_send_rpc_evt_to_thread_force (
+    thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
+}
+
+static void
+ct_session_close (u32 ct_index, u32 thread_index)
+{
+  ct_connection_t *ct, *peer_ct;
+  session_t *s;
+
+  ct = ct_connection_get (ct_index, thread_index);
+  s = session_get (ct->c_s_index, ct->c_thread_index);
+  peer_ct = ct_connection_get (ct->peer_index, thread_index);
+  if (peer_ct)
+    {
+      peer_ct->peer_index = ~0;
+      /* Make sure session was allocated */
+      if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
+       {
+         ct_session_connect_notify (s, SESSION_E_REFUSED);
+       }
+      else if (peer_ct->c_s_index != ~0)
+       session_transport_closing_notify (&peer_ct->connection);
+      else
+       {
+         /* should not happen */
+         clib_warning ("ct peer without session");
+         ct_connection_free (peer_ct);
+       }
+    }
+
+  /* Do not send closed notify to make sure pending tx events are
+   * still delivered and program cleanup */
+  ct_program_cleanup (ct);
+}
+
 static transport_connection_t *
 ct_session_get (u32 ct_index, u32 thread_index)
 {
@@ -1046,6 +1131,8 @@ ct_enable_disable (vlib_main_t * vm, u8 is_en)
 
   cm->n_workers = vlib_num_workers ();
   vec_validate (cm->connections, cm->n_workers);
+  vec_validate (cm->pending_cleanups, cm->n_workers);
+  vec_validate (cm->heave_cleanups, cm->n_workers);
   clib_spinlock_init (&cm->ho_reuseable_lock);
   clib_rwlock_init (&cm->app_segs_lock);
   return 0;