hsa: switch proxy to using first worker connects 08/41708/2
authorFlorin Coras <[email protected]>
Tue, 15 Oct 2024 02:43:42 +0000 (19:43 -0700)
committerFlorin Coras <[email protected]>
Tue, 15 Oct 2024 04:25:35 +0000 (00:25 -0400)
Type: improvement

Signed-off-by: Florin Coras <[email protected]>
Change-Id: I04821236ba8ab02525bd99a1ed4572dfcf5e5131

src/plugins/hs_apps/proxy.c
src/plugins/hs_apps/proxy.h

index c7e7b2a..6b0b246 100644 (file)
@@ -24,45 +24,78 @@ proxy_main_t proxy_main;
 
 #define TCP_MSS 1460
 
-typedef struct
+static void
+proxy_do_connect (vnet_connect_args_t *a)
 {
-  session_endpoint_cfg_t sep;
-  u32 app_index;
-  u32 api_context;
-} proxy_connect_args_t;
+  vnet_connect (a);
+  if (a->sep_ext.ext_cfg)
+    clib_mem_free (a->sep_ext.ext_cfg);
+}
 
 static void
-proxy_cb_fn (void *data, u32 data_len)
+proxy_handle_connects_rpc (void *args)
 {
-  proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
-  vnet_connect_args_t a;
+  u32 thread_index = pointer_to_uword (args), n_connects = 0, n_pending;
+  proxy_worker_t *wrk;
+  u32 max_connects;
+
+  wrk = proxy_worker_get (thread_index);
+
+  clib_spinlock_lock (&wrk->pending_connects_lock);
+
+  n_pending = clib_fifo_elts (wrk->pending_connects);
+  max_connects = clib_min (32, n_pending);
+  vec_validate (wrk->burst_connects, max_connects);
+
+  while (n_connects < max_connects)
+    clib_fifo_sub1 (wrk->pending_connects, wrk->burst_connects[n_connects++]);
+
+  clib_spinlock_unlock (&wrk->pending_connects_lock);
+
+  /* Do connects without locking pending_connects */
+  n_connects = 0;
+  while (n_connects < max_connects)
+    {
+      proxy_do_connect (&wrk->burst_connects[n_connects]);
+      n_connects += 1;
+    }
 
-  clib_memset (&a, 0, sizeof (a));
-  a.api_context = pa->api_context;
-  a.app_index = pa->app_index;
-  clib_memcpy (&a.sep_ext, &pa->sep, sizeof (pa->sep));
-  vnet_connect (&a);
-  if (a.sep_ext.ext_cfg)
-    clib_mem_free (a.sep_ext.ext_cfg);
+  /* More work to do, program rpc */
+  if (max_connects < n_pending)
+    session_send_rpc_evt_to_thread_force (
+      transport_cl_thread (), proxy_handle_connects_rpc,
+      uword_to_pointer ((uword) thread_index, void *));
 }
 
 static void
-proxy_call_main_thread (vnet_connect_args_t * a)
+proxy_program_connect (vnet_connect_args_t *a)
 {
-  if (vlib_get_thread_index () == 0)
-    {
-      vnet_connect (a);
-      if (a->sep_ext.ext_cfg)
-       clib_mem_free (a->sep_ext.ext_cfg);
-    }
-  else
+  u32 connects_thread = transport_cl_thread (), thread_index, n_pending;
+  proxy_worker_t *wrk;
+
+  thread_index = vlib_get_thread_index ();
+
+  /* If already on first worker, handle request */
+  if (thread_index == connects_thread)
     {
-      proxy_connect_args_t args;
-      args.api_context = a->api_context;
-      args.app_index = a->app_index;
-      clib_memcpy (&args.sep, &a->sep_ext, sizeof (a->sep_ext));
-      vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
+      proxy_do_connect (a);
+      return;
     }
+
+  /* If not on first worker, queue request */
+  wrk = proxy_worker_get (thread_index);
+
+  clib_spinlock_lock (&wrk->pending_connects_lock);
+
+  clib_fifo_add1 (wrk->pending_connects, *a);
+  n_pending = vec_len (wrk->pending_connects);
+
+  clib_spinlock_unlock (&wrk->pending_connects_lock);
+
+  if (n_pending == 1)
+    session_send_rpc_evt_to_thread_force (
+      connects_thread, proxy_handle_connects_rpc,
+      uword_to_pointer ((uword) thread_index, void *));
 }
 
 static proxy_session_t *
@@ -415,7 +448,7 @@ proxy_rx_callback (session_t * s)
          a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
        }
 
-      proxy_call_main_thread (a);
+      proxy_program_connect (a);
     }
 
   return 0;
index 26f4de2..6636156 100644 (file)
@@ -41,8 +41,16 @@ typedef struct
   u32 po_thread_index;
 } proxy_session_t;
 
+typedef struct proxy_worker_
+{
+  clib_spinlock_t pending_connects_lock;
+  vnet_connect_args_t *pending_connects;
+  vnet_connect_args_t *burst_connects;
+} proxy_worker_t;
+
 typedef struct
 {
+  proxy_worker_t *workers;             /**< per-thread data */
   proxy_session_t *sessions;           /**< session pool, shared */
   clib_spinlock_t sessions_lock;       /**< lock for session pool */
   u8 **rx_buf;                         /**< intermediate rx buffers */
@@ -75,6 +83,13 @@ typedef struct
 
 extern proxy_main_t proxy_main;
 
+static inline proxy_worker_t *
+proxy_worker_get (u32 thread_index)
+{
+  proxy_main_t *pm = &proxy_main;
+  return vec_elt_at_index (pm->workers, thread_index);
+}
+
 #endif /* __included_proxy_h__ */
 
 /*