Create worker threads using ODP API (direct in/out mode) 06/8006/10
authorMichal Mazur <[email protected]>
Fri, 11 Aug 2017 19:18:05 +0000 (21:18 +0200)
committerMichal Mazur <[email protected]>
Thu, 14 Dec 2017 14:18:05 +0000 (15:18 +0100)
This patch allows VPP to run multiple worker threads compatible with ODP.
By default a single thread is assigned to each interface but this
can be changed using "set interface rx-placement" command.
Only Direct input/output mode without RSS is supported.

Change-Id: Ia6af7e5af4950159eb28b7d56bc9c8d34b80f8d4
Signed-off-by: Michal Mazur <[email protected]>
build-data/platforms/odp.mk
src/plugins/odp.am
src/plugins/odp/node.c
src/plugins/odp/odp_packet.c
src/plugins/odp/odp_packet.h
src/plugins/odp/thread.c [new file with mode: 0644]

index d60c54c..0aaf0fd 100644 (file)
@@ -34,7 +34,7 @@ vlib_configure_args_odp = --with-pre-data=128
 
 #ODP configuration parameters
 odp_uses_odp=yes
-odp_odp_libs = -lodp-dpdk -ldpdk -lpcap
+odp_odp_libs = -lodp-dpdk -ldpdk -lodphelper -lpcap
 odp_odp_inc_dir=$(ODP_INST_PATH)/include
 odp_odp_lib_dir=$(ODP_INST_PATH)/lib
 
index 40316f4..10c8e27 100644 (file)
@@ -19,7 +19,8 @@ odp_plugin_la_SOURCES = odp/cli.c     \
        odp/node.c                      \
        odp/odp_packet.c                \
        odp/device.c                    \
-       odp/buffer.c
+       odp/buffer.c                    \
+        odp/thread.c
 
 noinst_HEADERS += odp/odp_packet.h
 
index bd3fb8a..7e13095 100755 (executable)
@@ -251,24 +251,21 @@ static uword
 odp_packet_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                     vlib_frame_t * frame)
 {
-
-  int i;
   u32 n_rx_packets = 0;
-  u32 thread_index = vlib_get_thread_index ();
   odp_packet_main_t *om = odp_packet_main;
   odp_packet_if_t *oif;
-
-  for (i = 0; i < vec_len (om->interfaces); i++)
-    {
-      oif = vec_elt_at_index (om->interfaces, i);
-
-      if (oif->is_admin_up &&
-         (i % om->input_cpu_count) ==
-         (thread_index - om->input_cpu_first_index))
-       {
-         n_rx_packets += odp_packet_device_input_fn (vm, node, frame, oif);
-       }
-    }
+  vnet_device_input_runtime_t *rt = (void *) node->runtime_data;
+  vnet_device_and_queue_t *dq;
+
+  /*
+   * Poll all devices on this cpu for input/interrupts.
+   */
+  foreach_device_and_queue (dq, rt->devices_and_queues)
+  {
+    oif = pool_elt_at_index (om->interfaces, dq->dev_instance);
+    if (oif->is_admin_up)
+      n_rx_packets += odp_packet_device_input_fn (vm, node, frame, oif);
+  }
 
   return n_rx_packets;
 }
index 1ee12ff..7fb7d4b 100755 (executable)
@@ -117,33 +117,6 @@ create_pktio (const char *dev, odp_pool_t pool, u32 mode)
   return pktio;
 }
 
-int
-odp_worker_thread_enable ()
-{
-
-  /*If worker threads are enabled, switch to polling mode */
-  foreach_vlib_main ((
-                      {
-                      vlib_node_set_state (this_vlib_main,
-                                           odp_packet_input_node.index,
-                                           VLIB_NODE_STATE_POLLING);
-                      }));
-  return 0;
-}
-
-int
-odp_worker_thread_disable ()
-{
-  foreach_vlib_main ((
-                      {
-                      vlib_node_set_state (this_vlib_main,
-                                           odp_packet_input_node.index,
-                                           VLIB_NODE_STATE_DISABLED);
-                      }));
-
-  return 0;
-}
-
 u32
 odp_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
                      u32 * sw_if_index, u32 mode)
@@ -211,6 +184,8 @@ odp_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
 
   sw = vnet_get_hw_sw_interface (vnm, oif->hw_if_index);
   oif->sw_if_index = sw->sw_if_index;
+  vnet_hw_interface_set_input_node (vnm, oif->hw_if_index,
+                                   odp_packet_input_node.index);
 
   vnet_hw_interface_set_flags (vnm, oif->hw_if_index,
                               VNET_HW_INTERFACE_FLAG_LINK_UP);
@@ -220,16 +195,9 @@ odp_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
   if (sw_if_index)
     *sw_if_index = oif->sw_if_index;
 
-  if (tm->n_vlib_mains > 1 && pool_elts (om->interfaces) == 1)
-    {
-      /*Fixme :Workers support commented for now as vlib_buffer not thread safe */
-      //odp_worker_thread_enable ();
-    }
-  else
-    {
-      vlib_node_set_state (vm, odp_packet_input_node.index,
-                          VLIB_NODE_STATE_POLLING);
-    }
+  /* Assign queue 0 of the new interface to first available worker thread */
+  vnet_hw_interface_assign_rx_thread (vnm, oif->hw_if_index, 0, ~0);
+
   return 0;
 
 error:
@@ -258,6 +226,8 @@ odp_packet_delete_if (vlib_main_t * vm, u8 * host_if_name)
   oif = pool_elt_at_index (om->interfaces, p[0]);
   vnet_hw_interface_set_flags (vnm, oif->hw_if_index, 0);
 
+  vnet_hw_interface_unassign_rx_thread (vnm, oif->hw_if_index, 0);
+
   om->if_count--;
 
   odp_pktio_stop (odp_pktio_lookup ((char *) host_if_name));
@@ -274,8 +244,6 @@ odp_packet_delete_if (vlib_main_t * vm, u8 * host_if_name)
   if (tm->n_vlib_mains > 1 && pool_elts (om->interfaces) == 0)
     {
       odp_pool_destroy (om->pool);
-      /*Fixme :Workers support commented for now */
-      // odp_worker_thread_disable ();
     }
 
   return 0;
@@ -362,6 +330,9 @@ odp_packet_init (vlib_main_t * vm)
   vpm->virtual.end = params.pool_end;
   vpm->virtual.size = params.pool_size;
 
+  /* Initialization complete and worker threads do not need to sync */
+  tm->worker_thread_release = 1;
+
   return 0;
 }
 
index e511f40..f9c793b 100755 (executable)
@@ -5,6 +5,7 @@
  */
 
 #include <odp_api.h>
+#include <odp/helper/odph_api.h>
 
 #define SHM_PKT_BUF_SIZE       1598
 #define SHM_PKT_POOL_BUF_SIZE  1856
@@ -14,6 +15,8 @@
 #define APPL_MODE_PKT_QUEUE    1
 #define APPL_MODE_PKT_SCHED    2
 
+#define MAX_WORKERS 32
+
 typedef struct
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -41,6 +44,8 @@ typedef struct
   odp_instance_t instance;
   odp_pool_t pool;
   u32 if_count;
+  u32 thread_cnt;
+  odph_odpthread_t thread_tbl[MAX_WORKERS];
 } odp_packet_main_t;
 
 extern odp_packet_main_t *odp_packet_main;
diff --git a/src/plugins/odp/thread.c b/src/plugins/odp/thread.c
new file mode 100644 (file)
index 0000000..3a5482b
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <odp_api.h>
+#include <odp/helper/odph_api.h>
+#include <odp/odp_packet.h>
+
+static clib_error_t *
+odp_launch_thread (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
+{
+  odp_packet_main_t *om = odp_packet_main;
+  odp_cpumask_t thd_mask;
+  odph_odpthread_params_t thr_params;
+
+  if (om->thread_cnt == MAX_WORKERS)
+    return clib_error_return (0, "Failed to launch thread %u", lcore_id);
+
+  memset (&thr_params, 0, sizeof (thr_params));
+  thr_params.start = fp;
+  thr_params.arg = w;
+  thr_params.thr_type = ODP_THREAD_WORKER;
+  thr_params.instance = om->instance;
+  odp_cpumask_zero (&thd_mask);
+  odp_cpumask_set (&thd_mask, lcore_id);
+
+  odph_odpthreads_create (&om->thread_tbl[om->thread_cnt], &thd_mask,
+                         &thr_params);
+
+  om->thread_cnt++;
+
+  return 0;
+}
+
+static clib_error_t *
+odp_thread_set_lcore (u32 thread, u16 lcore)
+{
+
+  return 0;
+}
+
+static vlib_thread_callbacks_t odp_callbacks = {
+  .vlib_launch_thread_cb = &odp_launch_thread,
+  .vlib_thread_set_lcore_cb = &odp_thread_set_lcore,
+};
+
+static clib_error_t *
+odp_thread_init (vlib_main_t * vm)
+{
+  vlib_thread_cb_register (vm, &odp_callbacks);
+  return 0;
+}
+
+VLIB_INIT_FUNCTION (odp_thread_init);
+
+/** @endcond */
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */