hsa: refactor proxy to minimize lock usage 23/41723/8
authorFlorin Coras <[email protected]>
Fri, 18 Oct 2024 07:46:57 +0000 (00:46 -0700)
committerDave Barach <[email protected]>
Sat, 19 Oct 2024 21:08:27 +0000 (21:08 +0000)
Use per worker context to minimize proxy session lock usage for io
events.

Type: improvement

Signed-off-by: Florin Coras <[email protected]>
Change-Id: Ia0ea204a8b09f72300fd40745b299246d5d0ddb7

src/plugins/hs_apps/proxy.c
src/plugins/hs_apps/proxy.h

index 6b0b246..a277679 100644 (file)
@@ -24,9 +24,35 @@ proxy_main_t proxy_main;
 
 #define TCP_MSS 1460
 
+static proxy_session_side_ctx_t *
+proxy_session_side_ctx_alloc (proxy_worker_t *wrk)
+{
+  proxy_session_side_ctx_t *ctx;
+
+  pool_get_zero (wrk->ctx_pool, ctx);
+  ctx->sc_index = ctx - wrk->ctx_pool;
+  ctx->ps_index = ~0;
+
+  return ctx;
+}
+
+static void
+proxy_session_side_ctx_free (proxy_worker_t *wrk,
+                            proxy_session_side_ctx_t *ctx)
+{
+  pool_put (wrk->ctx_pool, ctx);
+}
+
+static proxy_session_side_ctx_t *
+proxy_session_side_ctx_get (proxy_worker_t *wrk, u32 ctx_index)
+{
+  return pool_elt_at_index (wrk->ctx_pool, ctx_index);
+}
+
 static void
 proxy_do_connect (vnet_connect_args_t *a)
 {
+  ASSERT (session_vlib_thread_is_cl_thread ());
   vnet_connect (a);
   if (a->sep_ext.ext_cfg)
     clib_mem_free (a->sep_ext.ext_cfg);
@@ -88,7 +114,7 @@ proxy_program_connect (vnet_connect_args_t *a)
   clib_spinlock_lock (&wrk->pending_connects_lock);
 
   clib_fifo_add1 (wrk->pending_connects, *a);
-  n_pending = vec_len (wrk->pending_connects);
+  n_pending = clib_fifo_elts (wrk->pending_connects);
 
   clib_spinlock_unlock (&wrk->pending_connects_lock);
 
@@ -118,16 +144,6 @@ proxy_session_get (u32 ps_index)
   return pool_elt_at_index (pm->sessions, ps_index);
 }
 
-static inline proxy_session_t *
-proxy_session_get_if_valid (u32 ps_index)
-{
-  proxy_main_t *pm = &proxy_main;
-
-  if (pool_is_free_index (pm->sessions, ps_index))
-    return 0;
-  return pool_elt_at_index (pm->sessions, ps_index);
-}
-
 static void
 proxy_session_free (proxy_session_t *ps)
 {
@@ -148,7 +164,7 @@ proxy_session_postponed_free_rpc (void *arg)
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
   ps = proxy_session_get (ps_index);
-  segment_manager_dealloc_fifos (ps->server_rx_fifo, ps->server_tx_fifo);
+  segment_manager_dealloc_fifos (ps->po.rx_fifo, ps->po.tx_fifo);
   proxy_session_free (ps);
 
   clib_spinlock_unlock_if_init (&pm->sessions_lock);
@@ -159,84 +175,145 @@ proxy_session_postponed_free_rpc (void *arg)
 static void
 proxy_session_postponed_free (proxy_session_t *ps)
 {
-  session_send_rpc_evt_to_thread (ps->po_thread_index,
+  /* Passive open session handle has been invalidated so we don't have thread
+   * index at this point */
+  session_send_rpc_evt_to_thread (ps->po.rx_fifo->master_thread_index,
                                  proxy_session_postponed_free_rpc,
                                  uword_to_pointer (ps->ps_index, void *));
 }
 
+static void
+proxy_session_close_po (proxy_session_t *ps)
+{
+  vnet_disconnect_args_t _a = {}, *a = &_a;
+  proxy_main_t *pm = &proxy_main;
+
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock));
+
+  a->handle = ps->po.session_handle;
+  a->app_index = pm->server_app_index;
+  vnet_disconnect_session (a);
+
+  ps->po_disconnected = 1;
+}
+
+static void
+proxy_session_close_ao (proxy_session_t *ps)
+{
+  vnet_disconnect_args_t _a = {}, *a = &_a;
+  proxy_main_t *pm = &proxy_main;
+
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock));
+
+  a->handle = ps->ao.session_handle;
+  a->app_index = pm->active_open_app_index;
+  vnet_disconnect_session (a);
+
+  ps->ao_disconnected = 1;
+}
+
 static void
 proxy_try_close_session (session_t * s, int is_active_open)
 {
   proxy_main_t *pm = &proxy_main;
-  proxy_session_t *ps = 0;
-  vnet_disconnect_args_t _a, *a = &_a;
+  proxy_session_side_ctx_t *sc;
+  proxy_session_t *ps;
+  proxy_worker_t *wrk;
+
+  wrk = proxy_worker_get (s->thread_index);
+  sc = proxy_session_side_ctx_get (wrk, s->opaque);
 
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
-  ps = proxy_session_get (s->opaque);
+  ps = proxy_session_get (sc->ps_index);
 
   if (is_active_open)
     {
-      a->handle = ps->vpp_active_open_handle;
-      a->app_index = pm->active_open_app_index;
-      vnet_disconnect_session (a);
-      ps->ao_disconnected = 1;
+      proxy_session_close_ao (ps);
 
       if (!ps->po_disconnected)
        {
-         ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE);
-         a->handle = ps->vpp_server_handle;
-         a->app_index = pm->server_app_index;
-         vnet_disconnect_session (a);
-         ps->po_disconnected = 1;
+         ASSERT (ps->po.session_handle != SESSION_INVALID_HANDLE);
+         proxy_session_close_po (ps);
        }
     }
   else
     {
-      a->handle = ps->vpp_server_handle;
-      a->app_index = pm->server_app_index;
-      vnet_disconnect_session (a);
-      ps->po_disconnected = 1;
+      proxy_session_close_po (ps);
 
       if (!ps->ao_disconnected && !ps->active_open_establishing)
        {
          /* Proxy session closed before active open */
-         if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)
-           {
-             a->handle = ps->vpp_active_open_handle;
-             a->app_index = pm->active_open_app_index;
-             vnet_disconnect_session (a);
-           }
+         if (ps->ao.session_handle != SESSION_INVALID_HANDLE)
+           proxy_session_close_ao (ps);
          ps->ao_disconnected = 1;
        }
     }
   clib_spinlock_unlock_if_init (&pm->sessions_lock);
 }
 
+static void
+proxy_try_side_ctx_cleanup (session_t *s)
+{
+  proxy_main_t *pm = &proxy_main;
+  proxy_session_t *ps;
+  proxy_session_side_ctx_t *sc;
+  proxy_worker_t *wrk;
+
+  wrk = proxy_worker_get (s->thread_index);
+  sc = proxy_session_side_ctx_get (wrk, s->opaque);
+  if (sc->state == PROXY_SC_S_CREATED)
+    return;
+
+  clib_spinlock_lock_if_init (&pm->sessions_lock);
+
+  ps = proxy_session_get (sc->ps_index);
+
+  if (!ps->po_disconnected)
+    proxy_session_close_po (ps);
+
+  if (!ps->ao_disconnected)
+    proxy_session_close_ao (ps);
+
+  clib_spinlock_unlock_if_init (&pm->sessions_lock);
+}
+
 static void
 proxy_try_delete_session (session_t * s, u8 is_active_open)
 {
   proxy_main_t *pm = &proxy_main;
   proxy_session_t *ps = 0;
+  proxy_session_side_ctx_t *sc;
+  proxy_worker_t *wrk;
+  u32 ps_index;
+
+  wrk = proxy_worker_get (s->thread_index);
+  sc = proxy_session_side_ctx_get (wrk, s->opaque);
+  ps_index = sc->ps_index;
+
+  proxy_session_side_ctx_free (wrk, sc);
 
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
-  ps = proxy_session_get (s->opaque);
+  ps = proxy_session_get (ps_index);
 
   if (is_active_open)
     {
-      ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
+      ps->ao.session_handle = SESSION_INVALID_HANDLE;
 
       /* Revert master thread index change on connect notification */
-      ps->server_rx_fifo->master_thread_index = ps->po_thread_index;
+      ps->po.rx_fifo->master_thread_index =
+       ps->po.tx_fifo->master_thread_index;
 
       /* Passive open already cleaned up */
-      if (ps->vpp_server_handle == SESSION_INVALID_HANDLE)
+      if (ps->po.session_handle == SESSION_INVALID_HANDLE)
        {
          ASSERT (s->rx_fifo->refcnt == 1);
 
          /* The two sides of the proxy on different threads */
-         if (ps->po_thread_index != s->thread_index)
+         if (ps->po.tx_fifo->master_thread_index != s->thread_index)
            {
              /* This is not the right thread to delete the fifos */
              s->rx_fifo = 0;
@@ -249,9 +326,9 @@ proxy_try_delete_session (session_t * s, u8 is_active_open)
     }
   else
     {
-      ps->vpp_server_handle = SESSION_INVALID_HANDLE;
+      ps->po.session_handle = SESSION_INVALID_HANDLE;
 
-      if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
+      if (ps->ao.session_handle == SESSION_INVALID_HANDLE)
        {
          if (!ps->active_open_establishing)
            proxy_session_free (ps);
@@ -308,16 +385,24 @@ static int
 proxy_accept_callback (session_t * s)
 {
   proxy_main_t *pm = &proxy_main;
+  proxy_session_side_ctx_t *sc;
   proxy_session_t *ps;
+  proxy_worker_t *wrk;
+
+  wrk = proxy_worker_get (s->thread_index);
+  sc = proxy_session_side_ctx_alloc (wrk);
+  s->opaque = sc->sc_index;
 
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
   ps = proxy_session_alloc ();
-  ps->vpp_server_handle = session_handle (s);
-  ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
-  ps->po_thread_index = s->thread_index;
 
-  s->opaque = ps->ps_index;
+  ps->po.session_handle = session_handle (s);
+  ps->po.rx_fifo = s->rx_fifo;
+  ps->po.tx_fifo = s->tx_fifo;
+
+  ps->ao.session_handle = SESSION_INVALID_HANDLE;
+  sc->ps_index = ps->ps_index;
 
   clib_spinlock_unlock_if_init (&pm->sessions_lock);
 
@@ -358,98 +443,105 @@ proxy_transport_needs_crypto (transport_proto_t proto)
   return proto == TRANSPORT_PROTO_TLS;
 }
 
-static int
-proxy_rx_callback (session_t * s)
+static void
+proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s)
 {
+  int actual_transfer __attribute__ ((unused));
+  vnet_connect_args_t _a = {}, *a = &_a;
   proxy_main_t *pm = &proxy_main;
-  u32 thread_index = vlib_get_thread_index ();
-  svm_fifo_t *ao_tx_fifo;
+  u32 max_dequeue, ps_index;
   proxy_session_t *ps;
 
-  ASSERT (s->thread_index == thread_index);
-
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
-  ps = proxy_session_get (s->opaque);
+  ps = proxy_session_get (sc->ps_index);
 
-  if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE))
+  /* maybe we were already here */
+  if (ps->active_open_establishing)
     {
       clib_spinlock_unlock_if_init (&pm->sessions_lock);
+      return;
+    }
 
-      ao_tx_fifo = s->rx_fifo;
+  ps->active_open_establishing = 1;
+  ps_index = ps->ps_index;
 
-      /*
-       * Send event for active open tx fifo
-       */
-      if (svm_fifo_set_event (ao_tx_fifo))
-       {
-         u32 ao_thread_index = ao_tx_fifo->master_thread_index;
-         u32 ao_session_index = ao_tx_fifo->shr->master_session_index;
-         if (session_send_io_evt_to_thread_custom (&ao_session_index,
-                                                   ao_thread_index,
-                                                   SESSION_IO_EVT_TX))
-           clib_warning ("failed to enqueue tx evt");
-       }
+  clib_spinlock_unlock_if_init (&pm->sessions_lock);
 
-      if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
-       svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
-    }
-  else
+  max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
+  if (PREDICT_FALSE (max_dequeue == 0))
+    return;
+
+  max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
+  actual_transfer = svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */,
+                                  max_dequeue, pm->rx_buf[s->thread_index]);
+
+  /* Expectation is that here actual data just received is parsed and based
+   * on its contents, the destination and parameters of the connect to the
+   * upstream are decided
+   */
+
+  clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
+  a->api_context = ps_index;
+  a->app_index = pm->active_open_app_index;
+
+  if (proxy_transport_needs_crypto (a->sep.transport_proto))
     {
-      vnet_connect_args_t _a, *a = &_a;
-      svm_fifo_t *tx_fifo, *rx_fifo;
-      u32 max_dequeue, ps_index;
-      int actual_transfer __attribute__ ((unused));
+      session_endpoint_alloc_ext_cfg (&a->sep_ext,
+                                     TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
+      a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
+    }
 
-      /* maybe we were already here */
-      if (ps->active_open_establishing)
-       {
-         clib_spinlock_unlock_if_init (&pm->sessions_lock);
-         return 0;
-       }
+  proxy_program_connect (a);
+}
+
+static int
+proxy_rx_callback (session_t *s)
+{
+  proxy_session_side_ctx_t *sc;
+  svm_fifo_t *ao_tx_fifo;
+  proxy_session_t *ps;
+  proxy_worker_t *wrk;
 
-      rx_fifo = s->rx_fifo;
-      tx_fifo = s->tx_fifo;
+  ASSERT (s->thread_index == vlib_get_thread_index ());
 
-      ASSERT (rx_fifo->master_thread_index == thread_index);
-      ASSERT (tx_fifo->master_thread_index == thread_index);
+  wrk = proxy_worker_get (s->thread_index);
+  sc = proxy_session_side_ctx_get (wrk, s->opaque);
 
-      max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
+  if (PREDICT_FALSE (sc->state < PROXY_SC_S_ESTABLISHED))
+    {
+      proxy_main_t *pm = &proxy_main;
 
-      if (PREDICT_FALSE (max_dequeue == 0))
+      if (sc->state == PROXY_SC_S_CREATED)
        {
-         clib_spinlock_unlock_if_init (&pm->sessions_lock);
+         proxy_session_start_connect (sc, s);
+         sc->state = PROXY_SC_S_CONNECTING;
          return 0;
        }
 
-      max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
-      actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
-                                      max_dequeue, pm->rx_buf[thread_index]);
+      clib_spinlock_lock_if_init (&pm->sessions_lock);
 
-      /* $$$ your message in this space: parse url, etc. */
+      ps = proxy_session_get (sc->ps_index);
+      sc->pair = ps->ao;
 
-      clib_memset (a, 0, sizeof (*a));
+      clib_spinlock_unlock_if_init (&pm->sessions_lock);
 
-      ps->server_rx_fifo = rx_fifo;
-      ps->server_tx_fifo = tx_fifo;
-      ps->active_open_establishing = 1;
-      ps_index = ps->ps_index;
+      if (sc->pair.session_handle == SESSION_INVALID_HANDLE)
+       return 0;
 
-      clib_spinlock_unlock_if_init (&pm->sessions_lock);
+      sc->state = PROXY_SC_S_ESTABLISHED;
+    }
 
-      clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
-      a->api_context = ps_index;
-      a->app_index = pm->active_open_app_index;
+  ao_tx_fifo = s->rx_fifo;
 
-      if (proxy_transport_needs_crypto (a->sep.transport_proto))
-       {
-         session_endpoint_alloc_ext_cfg (&a->sep_ext,
-                                         TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
-         a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
-       }
+  /*
+   * Send event for active open tx fifo
+   */
+  if (svm_fifo_set_event (ao_tx_fifo))
+    session_program_tx_io_evt (sc->pair.session_handle, SESSION_IO_EVT_TX);
 
-      proxy_program_connect (a);
-    }
+  if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
+    svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
 
   return 0;
 }
@@ -470,8 +562,8 @@ proxy_force_ack (void *handlep)
 static int
 proxy_tx_callback (session_t * proxy_s)
 {
-  proxy_main_t *pm = &proxy_main;
-  proxy_session_t *ps;
+  proxy_session_side_ctx_t *sc;
+  proxy_worker_t *wrk;
   u32 min_free;
 
   min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
@@ -481,21 +573,17 @@ proxy_tx_callback (session_t * proxy_s)
       return 0;
     }
 
-  clib_spinlock_lock_if_init (&pm->sessions_lock);
-
-  ps = proxy_session_get (proxy_s->opaque);
-
-  if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
-    goto unlock;
+  wrk = proxy_worker_get (proxy_s->thread_index);
+  sc = proxy_session_side_ctx_get (wrk, proxy_s->opaque);
+  if (sc->state < PROXY_SC_S_ESTABLISHED)
+    return 0;
 
   /* Force ack on active open side to update rcv wnd. Make sure it's done on
    * the right thread */
-  void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *);
-  session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index,
-                                 proxy_force_ack, arg);
-
-unlock:
-  clib_spinlock_unlock_if_init (&pm->sessions_lock);
+  void *arg = uword_to_pointer (sc->pair.session_handle, void *);
+  session_send_rpc_evt_to_thread (
+    session_thread_from_handle (sc->pair.session_handle), proxy_force_ack,
+    arg);
 
   return 0;
 }
@@ -504,7 +592,10 @@ static void
 proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
 {
   if (ntf == SESSION_CLEANUP_TRANSPORT)
-    return;
+    {
+      proxy_try_side_ctx_cleanup (s);
+      return;
+    }
 
   proxy_try_delete_session (s, 0 /* is_active_open */ );
 }
@@ -530,10 +621,17 @@ active_open_alloc_session_fifos (session_t *s)
 
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
+  /* Active open opaque is pointing at proxy session */
   ps = proxy_session_get (s->opaque);
 
-  txf = ps->server_rx_fifo;
-  rxf = ps->server_tx_fifo;
+  if (ps->po_disconnected)
+    {
+      clib_spinlock_unlock_if_init (&pm->sessions_lock);
+      return SESSION_E_ALLOC;
+    }
+
+  txf = ps->po.rx_fifo;
+  rxf = ps->po.tx_fifo;
 
   /*
    * Reset the active-open tx-fifo master indices so the active-open session
@@ -564,31 +662,35 @@ active_open_connected_callback (u32 app_index, u32 opaque,
 {
   proxy_main_t *pm = &proxy_main;
   proxy_session_t *ps;
-  u8 thread_index = vlib_get_thread_index ();
-
-  /*
-   * Setup proxy session handle.
-   */
-  clib_spinlock_lock_if_init (&pm->sessions_lock);
-
-  ps = proxy_session_get (opaque);
+  proxy_worker_t *wrk;
+  proxy_session_side_ctx_t *sc;
 
   /* Connection failed */
   if (err)
     {
-      vnet_disconnect_args_t _a, *a = &_a;
+      clib_spinlock_lock_if_init (&pm->sessions_lock);
 
-      a->handle = ps->vpp_server_handle;
-      a->app_index = pm->server_app_index;
-      vnet_disconnect_session (a);
-      ps->po_disconnected = 1;
-    }
-  else
-    {
-      ps->vpp_active_open_handle = session_handle (s);
-      ps->active_open_establishing = 0;
+      ps = proxy_session_get (opaque);
+      ps->ao_disconnected = 1;
+      proxy_session_close_po (ps);
+
+      clib_spinlock_unlock_if_init (&pm->sessions_lock);
+
+      return 0;
     }
 
+  wrk = proxy_worker_get (s->thread_index);
+
+  clib_spinlock_lock_if_init (&pm->sessions_lock);
+
+  ps = proxy_session_get (opaque);
+
+  ps->ao.rx_fifo = s->rx_fifo;
+  ps->ao.tx_fifo = s->tx_fifo;
+  ps->ao.session_handle = session_handle (s);
+
+  ps->active_open_establishing = 0;
+
   /* Passive open session was already closed! */
   if (ps->po_disconnected)
     {
@@ -598,16 +700,21 @@ active_open_connected_callback (u32 app_index, u32 opaque,
       return -1;
     }
 
-  s->opaque = opaque;
+  sc = proxy_session_side_ctx_alloc (wrk);
+  sc->pair = ps->po;
+  sc->ps_index = ps->ps_index;
 
   clib_spinlock_unlock_if_init (&pm->sessions_lock);
 
+  sc->state = PROXY_SC_S_ESTABLISHED;
+  s->opaque = sc->sc_index;
+
   /*
    * Send event for active open tx fifo
    */
-  ASSERT (s->thread_index == thread_index);
+  ASSERT (s->thread_index == vlib_get_thread_index ());
   if (svm_fifo_set_event (s->tx_fifo))
-    session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
+    session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX);
 
   return 0;
 }
@@ -658,8 +765,8 @@ active_open_rx_callback (session_t * s)
 static int
 active_open_tx_callback (session_t * ao_s)
 {
-  proxy_main_t *pm = &proxy_main;
-  proxy_session_t *ps;
+  proxy_session_side_ctx_t *sc;
+  proxy_worker_t *wrk;
   u32 min_free;
 
   min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
@@ -669,22 +776,17 @@ active_open_tx_callback (session_t * ao_s)
       return 0;
     }
 
-  clib_spinlock_lock_if_init (&pm->sessions_lock);
-
-  ps = proxy_session_get_if_valid (ao_s->opaque);
-  if (!ps)
-    goto unlock;
+  wrk = proxy_worker_get (ao_s->thread_index);
+  sc = proxy_session_side_ctx_get (wrk, ao_s->opaque);
 
-  if (ps->vpp_server_handle == SESSION_INVALID_HANDLE)
-    goto unlock;
+  if (sc->state < PROXY_SC_S_ESTABLISHED)
+    return 0;
 
   /* Force ack on proxy side to update rcv wnd */
-  void *arg = uword_to_pointer (ps->vpp_server_handle, void *);
+  void *arg = uword_to_pointer (sc->pair.session_handle, void *);
   session_send_rpc_evt_to_thread (
-    session_thread_from_handle (ps->vpp_server_handle), proxy_force_ack, arg);
-
-unlock:
-  clib_spinlock_unlock_if_init (&pm->sessions_lock);
+    session_thread_from_handle (sc->pair.session_handle), proxy_force_ack,
+    arg);
 
   return 0;
 }
@@ -834,15 +936,25 @@ proxy_server_create (vlib_main_t * vm)
 {
   vlib_thread_main_t *vtm = vlib_get_thread_main ();
   proxy_main_t *pm = &proxy_main;
+  proxy_worker_t *wrk;
   u32 num_threads;
   int i;
 
+  if (vlib_num_workers ())
+    clib_spinlock_init (&pm->sessions_lock);
+
   num_threads = 1 /* main thread */  + vtm->n_threads;
   vec_validate (pm->rx_buf, num_threads - 1);
 
   for (i = 0; i < num_threads; i++)
     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
 
+  vec_validate (pm->workers, vlib_num_workers ());
+  vec_foreach (wrk, pm->workers)
+    {
+      clib_spinlock_init (&wrk->pending_connects_lock);
+    }
+
   proxy_server_add_ckpair ();
 
   if (proxy_server_attach ())
@@ -886,9 +998,6 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
   pm->private_segment_count = 0;
   pm->segment_size = 512 << 20;
 
-  if (vlib_num_workers ())
-    clib_spinlock_init (&pm->sessions_lock);
-
   if (!unformat_user (input, unformat_line_input, line_input))
     return 0;
 
index 6636156..86db69c 100644 (file)
 #include <vnet/session/session.h>
 #include <vnet/session/application_interface.h>
 
+#define foreach_proxy_session_side_state                                      \
+  _ (CREATED, "created")                                                      \
+  _ (CONNECTING, "connecting")                                                \
+  _ (ESTABLISHED, "establiehed")                                              \
+  _ (CLOSED, "closed")
+
+typedef enum proxy_session_side_state_
+{
+#define _(sym, str) PROXY_SC_S_##sym,
+  foreach_proxy_session_side_state
+#undef _
+} proxy_session_side_state_t;
+typedef struct proxy_session_side_
+{
+  session_handle_t session_handle;
+  svm_fifo_t *rx_fifo;
+  svm_fifo_t *tx_fifo;
+} proxy_session_side_t;
+
+typedef struct proxy_session_side_ctx_
+{
+  proxy_session_side_t pair;
+  proxy_session_side_state_t state;
+  u32 sc_index;
+  u32 ps_index;
+} proxy_session_side_ctx_t;
+
 typedef struct
 {
-  svm_fifo_t *server_rx_fifo;
-  svm_fifo_t *server_tx_fifo;
+  proxy_session_side_t po; /**< passive open side */
+  proxy_session_side_t ao; /**< active open side */
 
-  session_handle_t vpp_server_handle;
-  session_handle_t vpp_active_open_handle;
   volatile int active_open_establishing;
   volatile int po_disconnected;
   volatile int ao_disconnected;
 
   u32 ps_index;
-  u32 po_thread_index;
 } proxy_session_t;
 
 typedef struct proxy_worker_
 {
+  proxy_session_side_ctx_t *ctx_pool;
   clib_spinlock_t pending_connects_lock;
   vnet_connect_args_t *pending_connects;
   vnet_connect_args_t *burst_connects;