af_packet: multithreading support 59/4759/3
authorMohsin KAZMI <sykazmi@cisco.com>
Wed, 18 Jan 2017 10:59:45 +0000 (11:59 +0100)
committerDamjan Marion <dmarion.lists@gmail.com>
Thu, 19 Jan 2017 11:00:11 +0000 (11:00 +0000)
This patch adds multithreading support for af_packet interfaces.

Change-Id: Ief5d1117e7ffeaa59dbc2831e583d5d8e8d4fa7a
Signed-off-by: Mohsin KAZMI <sykazmi@cisco.com>
src/vnet/devices/af_packet/af_packet.c
src/vnet/devices/af_packet/af_packet.h
src/vnet/devices/af_packet/device.c
src/vnet/devices/af_packet/node.c

index 91c3988..e491ba4 100644 (file)
@@ -171,6 +171,31 @@ error:
   return ret;
 }
 
+static void
+af_packet_worker_thread_enable ()
+{
+  /* If worker threads are enabled, switch to polling mode */
+  foreach_vlib_main ((
+                      {
+                      vlib_node_set_state (this_vlib_main,
+                                           af_packet_input_node.index,
+                                           VLIB_NODE_STATE_POLLING);
+                      }));
+
+}
+
+static void
+af_packet_worker_thread_disable ()
+{
+  foreach_vlib_main ((
+                      {
+                      vlib_node_set_state (this_vlib_main,
+                                           af_packet_input_node.index,
+                                           VLIB_NODE_STATE_INTERRUPT);
+                      }));
+
+}
+
 int
 af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
                     u32 * sw_if_index)
@@ -184,6 +209,7 @@ af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
   u8 hw_addr[6];
   clib_error_t *error;
   vnet_sw_interface_t *sw;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
   vnet_main_t *vnm = vnet_get_main ();
   uword *p;
   uword if_index;
@@ -226,6 +252,13 @@ af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
   apif->next_tx_frame = 0;
   apif->next_rx_frame = 0;
 
+  if (tm->n_vlib_mains > 1)
+    {
+      apif->lockp = clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES,
+                                           CLIB_CACHE_LINE_BYTES);
+      memset ((void *) apif->lockp, 0, CLIB_CACHE_LINE_BYTES);
+    }
+
   {
     unix_file_t template = { 0 };
     template.read_function = af_packet_fd_read_ready;
@@ -273,6 +306,10 @@ af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
                 0);
   if (sw_if_index)
     *sw_if_index = apif->sw_if_index;
+
+  if (tm->n_vlib_mains > 1 && pool_elts (apm->interfaces) == 1)
+    af_packet_worker_thread_enable ();
+
   return 0;
 
 error:
@@ -286,6 +323,7 @@ int
 af_packet_delete_if (vlib_main_t * vm, u8 * host_if_name)
 {
   vnet_main_t *vnm = vnet_get_main ();
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
   af_packet_main_t *apm = &af_packet_main;
   af_packet_if_t *apif;
   uword *p;
@@ -335,6 +373,8 @@ af_packet_delete_if (vlib_main_t * vm, u8 * host_if_name)
   ethernet_delete_interface (vnm, apif->hw_if_index);
 
   pool_put (apm->interfaces, apif);
+  if (tm->n_vlib_mains > 1 && pool_elts (apm->interfaces) == 0)
+    af_packet_worker_thread_disable ();
 
   return 0;
 }
@@ -344,9 +384,24 @@ af_packet_init (vlib_main_t * vm)
 {
   af_packet_main_t *apm = &af_packet_main;
   vlib_thread_main_t *tm = vlib_get_thread_main ();
+  vlib_thread_registration_t *tr;
+  uword *p;
 
   memset (apm, 0, sizeof (af_packet_main_t));
 
+  apm->input_cpu_first_index = 0;
+  apm->input_cpu_count = 1;
+
+  /* find out which cpus will be used for input */
+  p = hash_get_mem (tm->thread_registrations_by_name, "workers");
+  tr = p ? (vlib_thread_registration_t *) p[0] : 0;
+
+  if (tr && tr->count > 0)
+    {
+      apm->input_cpu_first_index = tr->first_index;
+      apm->input_cpu_count = tr->count;
+    }
+
   mhash_init_vec_string (&apm->if_index_by_host_if_name, sizeof (uword));
 
   vec_validate_aligned (apm->rx_buffers, tm->n_vlib_mains - 1,
index 19e2523..e00e5cb 100644 (file)
@@ -20,6 +20,7 @@
 typedef struct
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+  volatile u32 *lockp;
   u8 *host_if_name;
   int fd;
   struct tpacket_req *rx_req;
@@ -50,6 +51,12 @@ typedef struct
 
   /* hash of host interface names */
   mhash_t if_index_by_host_if_name;
+
+  /* first cpu index */
+  u32 input_cpu_first_index;
+
+  /* total cpu count */
+  u32 input_cpu_count;
 } af_packet_main_t;
 
 af_packet_main_t af_packet_main;
index 1fb4000..e3bf9bb 100644 (file)
@@ -92,6 +92,12 @@ af_packet_interface_tx (vlib_main_t * vm,
   struct tpacket2_hdr *tph;
   u32 frame_not_ready = 0;
 
+  if (PREDICT_FALSE (apif->lockp != 0))
+    {
+      while (__sync_lock_test_and_set (apif->lockp, 1))
+       ;
+    }
+
   while (n_left > 0)
     {
       u32 len;
@@ -152,6 +158,9 @@ af_packet_interface_tx (vlib_main_t * vm,
        }
     }
 
+  if (PREDICT_FALSE (apif->lockp != 0))
+    *apif->lockp = 0;
+
   if (PREDICT_FALSE (frame_not_ready))
     vlib_error_count (vm, node->node_index,
                      AF_PACKET_TX_ERROR_FRAME_NOT_READY, frame_not_ready);
index 7200432..476ccca 100644 (file)
@@ -108,10 +108,9 @@ buffer_add_to_chain (vlib_main_t * vm, u32 bi, u32 first_bi, u32 prev_bi)
 
 always_inline uword
 af_packet_device_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
-                          vlib_frame_t * frame, u32 device_idx)
+                          vlib_frame_t * frame, af_packet_if_t * apif)
 {
   af_packet_main_t *apm = &af_packet_main;
-  af_packet_if_t *apif = pool_elt_at_index (apm->interfaces, device_idx);
   struct tpacket2_hdr *tph;
   u32 next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
   u32 block = 0;
@@ -125,10 +124,10 @@ af_packet_device_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   u32 frame_num = apif->rx_req->tp_frame_nr;
   u8 *block_start = apif->rx_ring + block * block_size;
   uword n_trace = vlib_get_trace_count (vm, node);
+  u32 cpu_index = os_get_cpu_number ();
   u32 n_buffer_bytes = vlib_buffer_free_list_buffer_size (vm,
                                                          VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
   u32 min_bufs = apif->rx_req->tp_frame_size / n_buffer_bytes;
-  int cpu_index = node->cpu_index;
 
   if (apif->per_interface_next_index != ~0)
     next_index = apif->per_interface_next_index;
@@ -249,16 +248,18 @@ af_packet_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 {
   int i;
   u32 n_rx_packets = 0;
-
+  u32 cpu_index = os_get_cpu_number ();
   af_packet_main_t *apm = &af_packet_main;
+  af_packet_if_t *apif;
 
-  /* *INDENT-OFF* */
-  clib_bitmap_foreach (i, apm->pending_input_bitmap,
-    ({
-      clib_bitmap_set (apm->pending_input_bitmap, i, 0);
-      n_rx_packets += af_packet_device_input_fn(vm, node, frame, i);
-    }));
-  /* *INDENT-ON* */
+  for (i = 0; i < vec_len (apm->interfaces); i++)
+    {
+      apif = vec_elt_at_index (apm->interfaces, i);
+      if (apif->is_admin_up &&
+         (i % apm->input_cpu_count) ==
+         (cpu_index - apm->input_cpu_first_index))
+       n_rx_packets += af_packet_device_input_fn (vm, node, frame, apif);
+    }
 
   return n_rx_packets;
 }
@@ -270,6 +271,9 @@ VLIB_REGISTER_NODE (af_packet_input_node) = {
   .sibling_of = "device-input",
   .format_trace = format_af_packet_input_trace,
   .type = VLIB_NODE_TYPE_INPUT,
+  /**
+   * default state is INTERRUPT mode, switch to POLLING if worker threads are enabled
+   */
   .state = VLIB_NODE_STATE_INTERRUPT,
   .n_errors = AF_PACKET_INPUT_N_ERROR,
   .error_strings = af_packet_input_error_strings,