session: proxy session migration fix 81/42081/1
authorMatus Fabian <[email protected]>
Mon, 30 Dec 2024 19:40:51 +0000 (20:40 +0100)
committerMatus Fabian <[email protected]>
Mon, 30 Dec 2024 19:42:37 +0000 (20:42 +0100)
Type: fix

Change-Id: I487ee4e69d8885f46d7a4af2c66a710da66108c5
Signed-off-by: Matus Fabian <[email protected]>
extras/hs-test/infra/suite_vpp_udp_proxy.go
extras/hs-test/proxy_test.go
src/plugins/hs_apps/proxy.c
src/vnet/session/application_worker.c
src/vnet/session/session.c

index 6a65a0b..2290aee 100644 (file)
@@ -63,6 +63,11 @@ func (s *VppUdpProxySuite) SetupTest() {
                s.Interfaces.Server.Ip4AddressString(),
                s.Interfaces.Server.HwAddress)
        vpp.Vppctl(arp)
+       arp = fmt.Sprintf("set ip neighbor %s %s %s",
+               s.Interfaces.Client.Peer.Name(),
+               s.Interfaces.Client.Ip4AddressString(),
+               s.Interfaces.Client.HwAddress)
+       vpp.Vppctl(arp)
 
        if *DryRun {
                s.LogStartedContainers()
@@ -127,7 +132,7 @@ func (s *VppUdpProxySuite) ClientSendReceive(toSend []byte, rcvBuffer []byte) (i
        }
        defer proxiedConn.Close()
 
-       err = proxiedConn.SetReadDeadline(time.Now().Add(time.Second * 5))
+       err = proxiedConn.SetDeadline(time.Now().Add(time.Second * 5))
        if err != nil {
                return 0, err
        }
@@ -173,7 +178,7 @@ var _ = Describe("VppUdpProxySuite", Ordered, ContinueOnFailure, func() {
        }
 })
 
-var _ = Describe("VppUdpProxySuiteSolo", Ordered, ContinueOnFailure, func() {
+var _ = Describe("VppUdpProxySuiteSolo", Ordered, ContinueOnFailure, Serial, func() {
        var s VppUdpProxySuite
        BeforeAll(func() {
                s.SetupSuite()
index 3afdc31..d371de4 100644 (file)
@@ -25,6 +25,7 @@ func init() {
        RegisterVppProxySoloTests(VppProxyHttpGetTcpMTTest, VppProxyHttpPutTcpMTTest, VppProxyTcpIperfMTTest,
                VppProxyUdpIperfMTTest, VppConnectProxyStressTest, VppConnectProxyStressMTTest)
        RegisterVppUdpProxyTests(VppProxyUdpTest)
+       RegisterVppUdpProxySoloTests(VppProxyUdpMigrationMTTest)
        RegisterEnvoyProxyTests(EnvoyProxyHttpGetTcpTest, EnvoyProxyHttpPutTcpTest)
        RegisterNginxProxyTests(NginxMirroringTest)
        RegisterNginxProxySoloTests(MirrorMultiThreadTest)
@@ -350,3 +351,25 @@ func VppProxyUdpTest(s *VppUdpProxySuite) {
        s.AssertNil(err, fmt.Sprint(err))
        s.AssertEqual([]byte("hello"), b[:n])
 }
+
+func VppProxyUdpMigrationMTTest(s *VppUdpProxySuite) {
+       remoteServerConn := s.StartEchoServer()
+       defer remoteServerConn.Close()
+
+       vppProxy := s.Containers.VppProxy.VppInstance
+       cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri udp://%s/%d", s.VppProxyAddr(), s.ProxyPort())
+       cmd += fmt.Sprintf(" client-uri udp://%s/%d", s.ServerAddr(), s.ServerPort())
+       s.Log(vppProxy.Vppctl(cmd))
+
+       b := make([]byte, 1500)
+
+       n, err := s.ClientSendReceive([]byte("hello"), b)
+       s.AssertNil(err, fmt.Sprint(err))
+       s.AssertEqual([]byte("hello"), b[:n])
+
+       n, err = s.ClientSendReceive([]byte("world"), b)
+       s.AssertNil(err, fmt.Sprint(err))
+       s.AssertEqual([]byte("world"), b[:n])
+
+       s.Log(s.Containers.VppProxy.VppInstance.Vppctl("show session verbose 2"))
+}
index 9d5b949..82b904f 100644 (file)
@@ -26,6 +26,14 @@ proxy_main_t proxy_main;
 
 #define TCP_MSS 1460
 
+#define PROXY_DEBUG 0
+
+#if PROXY_DEBUG
+#define PROXY_DBG(_fmt, _args...) clib_warning (_fmt, ##_args)
+#else
+#define PROXY_DBG(_fmt, _args...)
+#endif
+
 static proxy_session_side_ctx_t *
 proxy_session_side_ctx_alloc (proxy_worker_t *wrk)
 {
@@ -200,6 +208,8 @@ proxy_session_postponed_free_rpc (void *arg)
 
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
+  PROXY_DBG ("[%u] ps %u postponed free", vlib_get_thread_index (), ps_index);
+
   ps = proxy_session_get (ps_index);
   segment_manager_dealloc_fifos (ps->po.rx_fifo, ps->po.tx_fifo);
   proxy_session_free (ps);
@@ -262,6 +272,9 @@ proxy_try_close_session (session_t * s, int is_active_open)
   wrk = proxy_worker_get (s->thread_index);
   sc = proxy_session_side_ctx_get (wrk, s->opaque);
 
+  PROXY_DBG ("[%u] ps %u close (is ao %u)", vlib_get_thread_index (),
+            sc->ps_index, is_active_open);
+
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
   ps = proxy_session_get (sc->ps_index);
@@ -304,6 +317,8 @@ proxy_try_side_ctx_cleanup (session_t *s)
   if (sc->state == PROXY_SC_S_CREATED)
     return;
 
+  PROXY_DBG ("[%u] ps %u side ctx cleanup", vlib_get_thread_index (),
+            sc->ps_index);
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
   ps = proxy_session_get (sc->ps_index);
@@ -330,6 +345,9 @@ proxy_try_delete_session (session_t * s, u8 is_active_open)
   sc = proxy_session_side_ctx_get (wrk, s->opaque);
   ps_index = sc->ps_index;
 
+  PROXY_DBG ("[%u] ps %u delete (is ao %u)", vlib_get_thread_index (),
+            sc->ps_index, is_active_open);
+
   proxy_session_side_ctx_free (wrk, sc);
 
   clib_spinlock_lock_if_init (&pm->sessions_lock);
@@ -436,6 +454,8 @@ proxy_accept_callback (session_t * s)
 
   ps = proxy_session_alloc ();
 
+  PROXY_DBG ("[%u] ps %u new", vlib_get_thread_index (), ps->ps_index);
+
   ps->po.session_handle = session_handle (s);
   ps->po.rx_fifo = s->rx_fifo;
   ps->po.tx_fifo = s->tx_fifo;
@@ -614,6 +634,8 @@ proxy_rx_callback (session_t *s)
 
       if (sc->state == PROXY_SC_S_CREATED)
        {
+         PROXY_DBG ("[%u] ps %u start connect", vlib_get_thread_index (),
+                    sc->ps_index);
          proxy_session_start_connect (sc, s);
          sc->state = PROXY_SC_S_CONNECTING;
          return 0;
@@ -719,6 +741,8 @@ active_open_alloc_session_fifos (session_t *s)
   svm_fifo_t *rxf, *txf;
   proxy_session_t *ps;
 
+  PROXY_DBG ("[%u] ps %u ao alloc fifos", vlib_get_thread_index (), s->opaque);
+
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
   /* Active open opaque is pointing at proxy session */
@@ -788,6 +812,8 @@ active_open_connected_callback (u32 app_index, u32 opaque,
   /* Connection failed */
   if (err)
     {
+      PROXY_DBG ("[%u] ps %u connect failed: %d", vlib_get_thread_index (),
+                opaque, err);
       clib_spinlock_lock_if_init (&pm->sessions_lock);
 
       ps = proxy_session_get (opaque);
@@ -806,6 +832,8 @@ active_open_connected_callback (u32 app_index, u32 opaque,
       return 0;
     }
 
+  PROXY_DBG ("[%u] ps %u connected", vlib_get_thread_index (), opaque);
+
   wrk = proxy_worker_get (s->thread_index);
 
   clib_spinlock_lock_if_init (&pm->sessions_lock);
@@ -867,6 +895,9 @@ active_open_migrate_po_fixup_rpc (void *arg)
   proxy_session_t *ps;
   session_t *po_s;
 
+  PROXY_DBG ("[%u] ps %u migrate (po fixup)", vlib_get_thread_index (),
+            ps_index);
+
   wrk = proxy_worker_get (vlib_get_thread_index ());
 
   clib_spinlock_lock_if_init (&pm->sessions_lock);
@@ -874,8 +905,6 @@ active_open_migrate_po_fixup_rpc (void *arg)
   ps = proxy_session_get (ps_index);
 
   po_s = session_get_from_handle (ps->po.session_handle);
-  po_s->rx_fifo = ps->po.rx_fifo;
-  po_s->tx_fifo = ps->po.tx_fifo;
 
   po_sc = proxy_session_side_ctx_get (wrk, po_s->opaque);
   po_sc->pair = ps->ao;
@@ -896,6 +925,9 @@ active_open_migrate_rpc (void *arg)
   proxy_session_t *ps;
   session_t *s;
 
+  PROXY_DBG ("[%u] ps %u migrate (alloc new sc)", vlib_get_thread_index (),
+            ps_index);
+
   wrk = proxy_worker_get (vlib_get_thread_index ());
   sc = proxy_session_side_ctx_alloc (wrk);
 
@@ -908,15 +940,6 @@ active_open_migrate_rpc (void *arg)
   s->opaque = sc->sc_index;
   s->flags &= ~SESSION_F_IS_MIGRATING;
 
-  /* Fixup passive open session because of migration and zc */
-  ps->ao.rx_fifo = ps->po.tx_fifo = s->rx_fifo;
-  ps->ao.tx_fifo = ps->po.rx_fifo = s->tx_fifo;
-
-  ps->po.tx_fifo->shr->master_session_index =
-    session_index_from_handle (ps->po.session_handle);
-  ps->po.tx_fifo->master_thread_index =
-    session_thread_from_handle (ps->po.session_handle);
-
   sc->pair = ps->po;
 
   clib_spinlock_unlock_if_init (&pm->sessions_lock);
@@ -937,14 +960,18 @@ active_open_migrate_callback (session_t *s, session_handle_t new_sh)
   wrk = proxy_worker_get (s->thread_index);
   sc = proxy_session_side_ctx_get (wrk, s->opaque);
 
+  PROXY_DBG ("[%u] ps %u migrate (free sc)", vlib_get_thread_index (),
+            sc->ps_index);
+
   /* NOTE: this is just an example. ZC makes this migration rather
    * tedious. Probably better approaches could be found */
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
   ps = proxy_session_get (sc->ps_index);
   ps->ao.session_handle = new_sh;
-  ps->ao.rx_fifo = 0;
-  ps->ao.tx_fifo = 0;
+  ps->ao.tx_fifo->shr->master_session_index =
+    session_index_from_handle (new_sh);
+  ps->ao.tx_fifo->master_thread_index = session_thread_from_handle (new_sh);
 
   clib_spinlock_unlock_if_init (&pm->sessions_lock);
 
index f056aad..cae340c 100644 (file)
@@ -455,7 +455,10 @@ app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
 
   /* Allocate fifos for session, unless the app is a builtin proxy */
   if (application_is_builtin_proxy (app))
-    return app->cb_fns.proxy_alloc_session_fifos (s);
+    {
+      s->flags |= SESSION_F_PROXY;
+      return app->cb_fns.proxy_alloc_session_fifos (s);
+    }
 
   sm = app_worker_get_connect_segment_manager (app_wrk);
   return app_worker_alloc_session_fifos (sm, s);
index be2c5dc..cc0e89f 100644 (file)
@@ -1030,10 +1030,13 @@ session_switch_pool (void *cb_args)
   if (!app_wrk)
     goto app_closed;
 
-  /* Cleanup fifo segment slice state for fifos */
-  sm = app_worker_get_connect_segment_manager (app_wrk);
-  segment_manager_detach_fifo (sm, &s->rx_fifo);
-  segment_manager_detach_fifo (sm, &s->tx_fifo);
+  if (!(s->flags & SESSION_F_PROXY))
+    {
+      /* Cleanup fifo segment slice state for fifos */
+      sm = app_worker_get_connect_segment_manager (app_wrk);
+      segment_manager_detach_fifo (sm, &s->rx_fifo);
+      segment_manager_detach_fifo (sm, &s->tx_fifo);
+    }
 
   /* Check if session closed during migration */
   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
@@ -1079,7 +1082,7 @@ session_dgram_connect_notify (transport_connection_t * tc,
     session_lookup_add_connection (tc, session_handle (new_s));
 
   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
-  if (app_wrk)
+  if (app_wrk && !(new_s->flags & SESSION_F_PROXY))
     {
       /* New set of fifos attached to the same shared memory */
       sm = app_worker_get_connect_segment_manager (app_wrk);