session udp: shared local endpoints 61/26361/9
authorFlorin Coras <fcoras@cisco.com>
Sat, 4 Apr 2020 22:45:34 +0000 (22:45 +0000)
committerFlorin Coras <florin.coras@gmail.com>
Mon, 6 Apr 2020 14:53:17 +0000 (14:53 +0000)
Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ie7102355b95eefb233ec7d146e61819051a7bf07

src/plugins/unittest/session_test.c
src/vcl/vppcom.c
src/vnet/session/segment_manager.c
src/vnet/session/session.c
src/vnet/session/session_lookup.c
src/vnet/session/session_lookup.h
src/vnet/session/session_node.c
src/vnet/session/transport.c
src/vnet/session/transport.h
src/vnet/udp/udp.c

index 9368f9f..b48a961 100644 (file)
@@ -616,7 +616,7 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   SESSION_TEST ((error == 0), "client attachment should work");
   error = vnet_connect (&connect_args);
   SESSION_TEST ((error != 0), "client connect should return error code");
-  SESSION_TEST ((error == VNET_API_ERROR_SESSION_CONNECT),
+  SESSION_TEST ((error == SESSION_E_NOINTF),
                "error code should be connect (nothing in local scope)");
   detach_args.app_index = client_index;
   vnet_application_detach (&detach_args);
index 9a19348..cb450d9 100644 (file)
@@ -656,6 +656,7 @@ vcl_session_migrated_handler (vcl_worker_t * wrk, void *data)
     }
 
   s->vpp_thread_index = mp->vpp_thread_index;
+  s->vpp_handle = mp->new_handle;
   s->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
 
   vec_validate (wrk->vpp_event_queues, s->vpp_thread_index);
@@ -669,7 +670,8 @@ vcl_session_migrated_handler (vcl_worker_t * wrk, void *data)
     app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo->master_session_index,
                            SESSION_IO_EVT_TX, SVM_Q_WAIT);
 
-  VDBG (0, "Migrated 0x%x to thread %u", mp->handle, s->vpp_thread_index);
+  VDBG (0, "Migrated 0x%lx to thread %u 0x%lx", mp->handle,
+       s->vpp_thread_index, mp->new_handle);
 }
 
 static vcl_session_t *
index 154c7a6..716f2a3 100644 (file)
@@ -211,6 +211,15 @@ segment_manager_del_segment (segment_manager_t * sm, fifo_segment_t * fs)
   pool_put (sm->segments, fs);
 }
 
+static fifo_segment_t *
+segment_manager_get_segment_if_valid (segment_manager_t * sm,
+                                     u32 segment_index)
+{
+  if (pool_is_free_index (sm->segments, segment_index))
+    return 0;
+  return pool_elt_at_index (sm->segments, segment_index);
+}
+
 /**
  * Removes segment after acquiring writer lock
  */
@@ -221,15 +230,18 @@ segment_manager_lock_and_del_segment (segment_manager_t * sm, u32 fs_index)
   u8 is_prealloc;
 
   clib_rwlock_writer_lock (&sm->segments_rwlock);
-  fs = segment_manager_get_segment (sm, fs_index);
+
+  fs = segment_manager_get_segment_if_valid (sm, fs_index);
+  if (!fs)
+    goto done;
+
   is_prealloc = fifo_segment_flags (fs) & FIFO_SEGMENT_F_IS_PREALLOCATED;
   if (is_prealloc && !segment_manager_app_detached (sm))
-    {
-      clib_rwlock_writer_unlock (&sm->segments_rwlock);
-      return;
-    }
+    goto done;
 
   segment_manager_del_segment (sm, fs);
+
+done:
   clib_rwlock_writer_unlock (&sm->segments_rwlock);
 }
 
index ac29627..79f93c4 100644 (file)
@@ -1099,7 +1099,7 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
   if (rv < 0)
     {
       SESSION_DBG ("Transport failed to open connection.");
-      return VNET_API_ERROR_SESSION_CONNECT;
+      return rv;
     }
 
   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
@@ -1133,7 +1133,7 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
   if (rv < 0)
     {
       SESSION_DBG ("Transport failed to open connection.");
-      return VNET_API_ERROR_SESSION_CONNECT;
+      return rv;
     }
 
   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
index 9868323..4de6fdb 100644 (file)
@@ -1289,6 +1289,19 @@ session_lookup_safe6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt,
   return 0;
 }
 
+transport_connection_t *
+session_lookup_connection (u32 fib_index, ip46_address_t * lcl,
+                          ip46_address_t * rmt, u16 lcl_port, u16 rmt_port,
+                          u8 proto, u8 is_ip4)
+{
+  if (is_ip4)
+    return session_lookup_connection4 (fib_index, &lcl->ip4, &rmt->ip4,
+                                      lcl_port, rmt_port, proto);
+  else
+    return session_lookup_connection6 (fib_index, &lcl->ip6, &rmt->ip6,
+                                      lcl_port, rmt_port, proto);
+}
+
 int
 vnet_session_rule_add_del (session_rule_add_del_args_t * args)
 {
index 8224219..4e80566 100644 (file)
@@ -59,12 +59,15 @@ transport_connection_t *session_lookup_connection6 (u32 fib_index,
                                                    ip6_address_t * rmt,
                                                    u16 lcl_port,
                                                    u16 rmt_port, u8 proto);
-session_t *session_lookup_listener4 (u32 fib_index,
-                                    ip4_address_t * lcl, u16 lcl_port,
-                                    u8 proto, u8 use_wildcard);
-session_t *session_lookup_listener6 (u32 fib_index,
-                                    ip6_address_t * lcl, u16 lcl_port,
-                                    u8 proto, u8 use_wildcard);
+transport_connection_t *session_lookup_connection (u32 fib_index,
+                                                  ip46_address_t * lcl,
+                                                  ip46_address_t * rmt,
+                                                  u16 lcl_port, u16 rmt_port,
+                                                  u8 proto, u8 is_ip4);
+session_t *session_lookup_listener4 (u32 fib_index, ip4_address_t * lcl,
+                                    u16 lcl_port, u8 proto, u8 use_wildcard);
+session_t *session_lookup_listener6 (u32 fib_index, ip6_address_t * lcl,
+                                    u16 lcl_port, u8 proto, u8 use_wildcard);
 session_t *session_lookup_listener (u32 table_index,
                                    session_endpoint_t * sep);
 session_t *session_lookup_listener_wildcard (u32 table_index,
index f023a95..573fbe9 100644 (file)
@@ -137,7 +137,7 @@ session_mq_connect_handler (void *data)
 
   if ((rv = vnet_connect (a)))
     {
-      clib_warning ("connect returned: %U", format_vnet_api_errno, rv);
+      clib_warning ("connect returned: %U", format_session_error, rv);
       app_wrk = application_get_worker (app, mp->wrk_index);
       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
     }
index 29c94f3..ba62200 100644 (file)
 #include <vnet/session/session.h>
 #include <vnet/fib/fib.h>
 
+typedef struct local_endpoint_
+{
+  transport_endpoint_t ep;
+  int refcnt;
+} local_endpoint_t;
+
 /**
  * Per-type vector of transport protocol virtual function tables
  */
@@ -35,7 +41,7 @@ static transport_endpoint_table_t local_endpoints_table;
 /*
  * Pool of local endpoints
  */
-static transport_endpoint_t *local_endpoints;
+static local_endpoint_t *local_endpoints;
 
 /*
  * Local endpoints pool lock
@@ -401,44 +407,64 @@ transport_endpoint_del (u32 tepi)
   clib_spinlock_unlock_if_init (&local_endpoints_lock);
 }
 
-always_inline transport_endpoint_t *
+always_inline local_endpoint_t *
 transport_endpoint_new (void)
 {
-  transport_endpoint_t *tep;
-  pool_get_zero (local_endpoints, tep);
-  return tep;
+  local_endpoint_t *lep;
+  pool_get_zero (local_endpoints, lep);
+  return lep;
 }
 
 void
 transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port)
 {
-  u32 tepi;
-  transport_endpoint_t *tep;
+  local_endpoint_t *lep;
+  u32 lepi;
 
   /* Cleanup local endpoint if this was an active connect */
-  tepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip,
+  lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip,
                                    clib_net_to_host_u16 (port));
-  if (tepi != ENDPOINT_INVALID_INDEX)
+  if (lepi != ENDPOINT_INVALID_INDEX)
     {
-      tep = pool_elt_at_index (local_endpoints, tepi);
-      transport_endpoint_table_del (&local_endpoints_table, proto, tep);
-      transport_endpoint_del (tepi);
+      lep = pool_elt_at_index (local_endpoints, lepi);
+      if (!clib_atomic_sub_fetch (&lep->refcnt, 1))
+       {
+         transport_endpoint_table_del (&local_endpoints_table, proto,
+                                       &lep->ep);
+         transport_endpoint_del (lepi);
+       }
     }
 }
 
 static void
 transport_endpoint_mark_used (u8 proto, ip46_address_t * ip, u16 port)
 {
-  transport_endpoint_t *tep;
+  local_endpoint_t *lep;
   clib_spinlock_lock_if_init (&local_endpoints_lock);
-  tep = transport_endpoint_new ();
-  clib_memcpy_fast (&tep->ip, ip, sizeof (*ip));
-  tep->port = port;
-  transport_endpoint_table_add (&local_endpoints_table, proto, tep,
-                               tep - local_endpoints);
+  lep = transport_endpoint_new ();
+  clib_memcpy_fast (&lep->ep.ip, ip, sizeof (*ip));
+  lep->ep.port = port;
+  lep->refcnt = 1;
+  transport_endpoint_table_add (&local_endpoints_table, proto, &lep->ep,
+                               lep - local_endpoints);
   clib_spinlock_unlock_if_init (&local_endpoints_lock);
 }
 
+void
+transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port)
+{
+  local_endpoint_t *lep;
+  u32 lepi;
+
+  lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip,
+                                   clib_net_to_host_u16 (port));
+  if (lepi != ENDPOINT_INVALID_INDEX)
+    {
+      lep = pool_elt_at_index (local_endpoints, lepi);
+      clib_atomic_add_fetch (&lep->refcnt, 1);
+    }
+}
+
 /**
  * Allocate local port and add if successful add entry to local endpoint
  * table to mark the pair as used.
@@ -572,13 +598,13 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg,
   else
     {
       port = clib_net_to_host_u16 (rmt_cfg->peer.port);
+      *lcl_port = port;
       tei = transport_endpoint_lookup (&local_endpoints_table, proto,
                                       lcl_addr, port);
       if (tei != ENDPOINT_INVALID_INDEX)
        return SESSION_E_PORTINUSE;
 
       transport_endpoint_mark_used (proto, lcl_addr, port);
-      *lcl_port = port;
     }
 
   return 0;
index eb98032..5592601 100644 (file)
@@ -239,6 +239,8 @@ int transport_alloc_local_port (u8 proto, ip46_address_t * ip);
 int transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt,
                                    ip46_address_t * lcl_addr,
                                    u16 * lcl_port);
+void transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip,
+                                    u16 port);
 void transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port);
 void transport_enable_disable (vlib_main_t * vm, u8 is_en);
 void transport_init (void);
index 65d50ff..48d518a 100644 (file)
@@ -149,6 +149,17 @@ udp_connection_delete (udp_connection_t * uc)
   udp_connection_cleanup (uc);
 }
 
+static u8
+udp_connection_port_used_extern (u16 lcl_port, u8 is_ip4)
+{
+  udp_main_t *um = vnet_get_udp_main ();
+  udp_dst_port_info_t *pi;
+
+  pi = udp_get_dst_port_info (um, lcl_port, is_ip4);
+  return (pi && !pi->n_connections
+         && udp_is_valid_dst_port (lcl_port, is_ip4));
+}
+
 u32
 udp_session_bind (u32 session_index, transport_endpoint_t * lcl)
 {
@@ -156,18 +167,15 @@ udp_session_bind (u32 session_index, transport_endpoint_t * lcl)
   vlib_main_t *vm = vlib_get_main ();
   transport_endpoint_cfg_t *lcl_ext;
   udp_connection_t *listener;
-  udp_dst_port_info_t *pi;
   u16 lcl_port_ho;
   void *iface_ip;
 
   lcl_port_ho = clib_net_to_host_u16 (lcl->port);
-  pi = udp_get_dst_port_info (um, lcl_port_ho, lcl->is_ip4);
 
-  if (pi && !pi->n_connections
-      && udp_is_valid_dst_port (lcl_port_ho, lcl->is_ip4))
+  if (udp_connection_port_used_extern (lcl_port_ho, lcl->is_ip4))
     {
       clib_warning ("port already used");
-      return -1;
+      return SESSION_E_PORTINUSE;
     }
 
   pool_get (um->listener_pool, listener);
@@ -415,15 +423,33 @@ udp_open_connection (transport_endpoint_cfg_t * rmt)
 {
   vlib_main_t *vm = vlib_get_main ();
   u32 thread_index = vm->thread_index;
-  udp_connection_t *uc;
   ip46_address_t lcl_addr;
+  udp_connection_t *uc;
   u16 lcl_port;
   int rv;
 
   rv = transport_alloc_local_endpoint (TRANSPORT_PROTO_UDP, rmt, &lcl_addr,
                                       &lcl_port);
   if (rv)
-    return rv;
+    {
+      if (rv != SESSION_E_PORTINUSE)
+       return rv;
+
+      if (udp_connection_port_used_extern (lcl_port, rmt->is_ip4))
+       return SESSION_E_PORTINUSE;
+
+      /* If port in use, check if 5-tuple is also in use */
+      if (session_lookup_connection (rmt->fib_index, &lcl_addr, &rmt->ip,
+                                    lcl_port, rmt->port, TRANSPORT_PROTO_UDP,
+                                    rmt->is_ip4))
+       return SESSION_E_PORTINUSE;
+
+      /* 5-tuple is available so increase lcl endpoint refcount and proceed
+       * with connection allocation */
+      transport_share_local_endpoint (TRANSPORT_PROTO_UDP, &lcl_addr,
+                                     lcl_port);
+      goto conn_alloc;
+    }
 
   if (udp_is_valid_dst_port (lcl_port, rmt->is_ip4))
     {
@@ -441,6 +467,8 @@ udp_open_connection (transport_endpoint_cfg_t * rmt)
        }
     }
 
+conn_alloc:
+
   udp_connection_register_port (vm, lcl_port, rmt->is_ip4);
 
   /* We don't poll main thread if we have workers */