Use thread local storage for thread index
[vpp.git] / src / vnet / devices / virtio / vhost-user.c
index f490f0c..5e720f6 100644 (file)
@@ -331,7 +331,7 @@ vhost_user_tx_thread_placement (vhost_user_intf_t * vui)
 {
   //Let's try to assign one queue to each thread
   u32 qid = 0;
-  u32 cpu_index = 0;
+  u32 thread_index = 0;
   vui->use_tx_spinlock = 0;
   while (1)
     {
@@ -341,20 +341,21 @@ vhost_user_tx_thread_placement (vhost_user_intf_t * vui)
          if (!rxvq->started || !rxvq->enabled)
            continue;
 
-         vui->per_cpu_tx_qid[cpu_index] = qid;
-         cpu_index++;
-         if (cpu_index == vlib_get_thread_main ()->n_vlib_mains)
+         vui->per_cpu_tx_qid[thread_index] = qid;
+         thread_index++;
+         if (thread_index == vlib_get_thread_main ()->n_vlib_mains)
            return;
        }
       //We need to loop, meaning the spinlock has to be used
       vui->use_tx_spinlock = 1;
-      if (cpu_index == 0)
+      if (thread_index == 0)
        {
          //Could not find a single valid one
-         for (cpu_index = 0;
-              cpu_index < vlib_get_thread_main ()->n_vlib_mains; cpu_index++)
+         for (thread_index = 0;
+              thread_index < vlib_get_thread_main ()->n_vlib_mains;
+              thread_index++)
            {
-             vui->per_cpu_tx_qid[cpu_index] = 0;
+             vui->per_cpu_tx_qid[thread_index] = 0;
            }
          return;
        }
@@ -368,14 +369,15 @@ vhost_user_rx_thread_placement ()
   vhost_user_intf_t *vui;
   vhost_cpu_t *vhc;
   u32 *workers = 0;
+  u32 thread_index;
+  vlib_main_t *vm;
 
   //Let's list all workers cpu indexes
   u32 i;
   for (i = vum->input_cpu_first_index;
        i < vum->input_cpu_first_index + vum->input_cpu_count; i++)
     {
-      vlib_node_set_state (vlib_mains ? vlib_mains[i] : &vlib_global_main,
-                          vhost_user_input_node.index,
+      vlib_node_set_state (vlib_mains[i], vhost_user_input_node.index,
                           VLIB_NODE_STATE_DISABLED);
       vec_add1 (workers, i);
     }
@@ -399,19 +401,59 @@ vhost_user_rx_thread_placement ()
            continue;
 
          i %= vec_len (vui_workers);
-         u32 cpu_index = vui_workers[i];
+         thread_index = vui_workers[i];
          i++;
-         vhc = &vum->cpus[cpu_index];
+         vhc = &vum->cpus[thread_index];
 
          iaq.qid = qid;
          iaq.vhost_iface_index = vui - vum->vhost_user_interfaces;
          vec_add1 (vhc->rx_queues, iaq);
-         vlib_node_set_state (vlib_mains ? vlib_mains[cpu_index] :
-             &vlib_global_main, vhost_user_input_node.index,
-             VLIB_NODE_STATE_POLLING);
        }
   });
   /* *INDENT-ON* */
+
+  vec_foreach (vhc, vum->cpus)
+  {
+    vhost_iface_and_queue_t *vhiq;
+    u8 mode = VHOST_USER_INTERRUPT_MODE;
+
+    vec_foreach (vhiq, vhc->rx_queues)
+    {
+      vui = &vum->vhost_user_interfaces[vhiq->vhost_iface_index];
+      if (vui->operation_mode == VHOST_USER_POLLING_MODE)
+       {
+         /* At least one interface is polling, cpu is set to polling */
+         mode = VHOST_USER_POLLING_MODE;
+         break;
+       }
+    }
+    vhc->operation_mode = mode;
+  }
+
+  for (thread_index = vum->input_cpu_first_index;
+       thread_index < vum->input_cpu_first_index + vum->input_cpu_count;
+       thread_index++)
+    {
+      vlib_node_state_t state = VLIB_NODE_STATE_POLLING;
+
+      vhc = &vum->cpus[thread_index];
+      vm = vlib_mains ? vlib_mains[thread_index] : &vlib_global_main;
+      switch (vhc->operation_mode)
+       {
+       case VHOST_USER_INTERRUPT_MODE:
+         state = VLIB_NODE_STATE_INTERRUPT;
+         break;
+       case VHOST_USER_POLLING_MODE:
+         state = VLIB_NODE_STATE_POLLING;
+         break;
+       default:
+         clib_warning ("BUG: bad operation mode %d", vhc->operation_mode);
+         break;
+       }
+      vlib_node_set_state (vm, vhost_user_input_node.index, state);
+    }
+
+  vec_free (workers);
 }
 
 static int
@@ -486,12 +528,68 @@ vhost_user_update_iface_state (vhost_user_intf_t * vui)
   vhost_user_tx_thread_placement (vui);
 }
 
+static void
+vhost_user_set_interrupt_pending (vhost_user_intf_t * vui, u32 ifq)
+{
+  vhost_user_main_t *vum = &vhost_user_main;
+  vhost_cpu_t *vhc;
+  u32 thread_index;
+  vhost_iface_and_queue_t *vhiq;
+  vlib_main_t *vm;
+  u32 ifq2;
+  u8 done = 0;
+
+  if (vhost_user_intf_ready (vui))
+    {
+      vec_foreach (vhc, vum->cpus)
+      {
+       if (vhc->operation_mode == VHOST_USER_POLLING_MODE)
+         continue;
+
+       vec_foreach (vhiq, vhc->rx_queues)
+       {
+         /*
+          * Match the interface and the virtqueue number
+          */
+         if ((vhiq->vhost_iface_index == (ifq >> 8)) &&
+             (VHOST_VRING_IDX_TX (vhiq->qid) == (ifq & 0xff)))
+           {
+             thread_index = vhc - vum->cpus;
+             vm = vlib_mains ? vlib_mains[thread_index] : &vlib_global_main;
+             /*
+              * Convert RX virtqueue number in the lower byte to vring
+              * queue index for the input node process. Top bytes contain
+              * the interface, lower byte contains the queue index.
+              */
+             ifq2 = ((ifq >> 8) << 8) | vhiq->qid;
+             vhc->pending_input_bitmap =
+               clib_bitmap_set (vhc->pending_input_bitmap, ifq2, 1);
+             vlib_node_set_interrupt_pending (vm,
+                                              vhost_user_input_node.index);
+             done = 1;
+             break;
+           }
+       }
+       if (done)
+         break;
+      }
+    }
+}
+
 static clib_error_t *
 vhost_user_callfd_read_ready (unix_file_t * uf)
 {
   __attribute__ ((unused)) int n;
   u8 buff[8];
+  vhost_user_intf_t *vui =
+    pool_elt_at_index (vhost_user_main.vhost_user_interfaces,
+                      uf->private_data >> 8);
+
   n = read (uf->file_descriptor, ((char *) &buff), 8);
+  DBG_SOCK ("if %d CALL queue %d", uf->private_data >> 8,
+           uf->private_data & 0xff);
+  vhost_user_set_interrupt_pending (vui, uf->private_data);
+
   return 0;
 }
 
@@ -504,13 +602,20 @@ vhost_user_kickfd_read_ready (unix_file_t * uf)
     pool_elt_at_index (vhost_user_main.vhost_user_interfaces,
                       uf->private_data >> 8);
   u32 qid = uf->private_data & 0xff;
+
   n = read (uf->file_descriptor, ((char *) &buff), 8);
   DBG_SOCK ("if %d KICK queue %d", uf->private_data >> 8, qid);
 
   vlib_worker_thread_barrier_sync (vlib_get_main ());
-  vui->vrings[qid].started = 1;
-  vhost_user_update_iface_state (vui);
+  if (!vui->vrings[qid].started ||
+      (vhost_user_intf_ready (vui) != vui->is_up))
+    {
+      vui->vrings[qid].started = 1;
+      vhost_user_update_iface_state (vui);
+    }
   vlib_worker_thread_barrier_release (vlib_get_main ());
+
+  vhost_user_set_interrupt_pending (vui, uf->private_data);
   return 0;
 }
 
@@ -584,7 +689,10 @@ vhost_user_vring_close (vhost_user_intf_t * vui, u32 qid)
       vring->callfd_idx = ~0;
     }
   if (vring->errfd != -1)
-    close (vring->errfd);
+    {
+      close (vring->errfd);
+      vring->errfd = -1;
+    }
   vhost_user_vring_init (vui, qid);
 }
 
@@ -905,8 +1013,12 @@ vhost_user_socket_read (unix_file_t * uf)
        vui->vrings[msg.state.index].last_avail_idx =
        vui->vrings[msg.state.index].used->idx;
 
-      /* tell driver that we don't want interrupts */
-      vui->vrings[msg.state.index].used->flags = VRING_USED_F_NO_NOTIFY;
+      if (vui->operation_mode == VHOST_USER_POLLING_MODE)
+       /* tell driver that we don't want interrupts */
+       vui->vrings[msg.state.index].used->flags = VRING_USED_F_NO_NOTIFY;
+      else
+       /* tell driver that we want interrupts */
+       vui->vrings[msg.state.index].used->flags = 0;
       break;
 
     case VHOST_USER_SET_OWNER:
@@ -1027,12 +1139,16 @@ vhost_user_socket_read (unix_file_t * uf)
          goto close_socket;
        }
 
-      /* Spec says: Client must [...] stop ring upon receiving VHOST_USER_GET_VRING_BASE. */
-      vhost_user_vring_close (vui, msg.state.index);
-
+      /*
+       * Copy last_avail_idx from the vring before closing it because
+       * closing the vring also initializes the vring last_avail_idx
+       */
       msg.state.num = vui->vrings[msg.state.index].last_avail_idx;
       msg.flags |= 4;
       msg.size = sizeof (msg.state);
+
+      /* Spec says: Client must [...] stop ring upon receiving VHOST_USER_GET_VRING_BASE. */
+      vhost_user_vring_close (vui, msg.state.index);
       break;
 
     case VHOST_USER_NONE:
@@ -1477,7 +1593,7 @@ vhost_user_if_input (vlib_main_t * vm,
   u32 n_trace = vlib_get_trace_count (vm, node);
   u16 qsz_mask;
   u32 map_hint = 0;
-  u16 cpu_index = os_get_cpu_number ();
+  u16 thread_index = vlib_get_thread_index ();
   u16 copy_len = 0;
 
   {
@@ -1536,32 +1652,32 @@ vhost_user_if_input (vlib_main_t * vm,
    * in the loop and come back later. This is not an issue as for big packet,
    * processing cost really comes from the memory copy.
    */
-  if (PREDICT_FALSE (vum->cpus[cpu_index].rx_buffers_len < n_left + 1))
+  if (PREDICT_FALSE (vum->cpus[thread_index].rx_buffers_len < n_left + 1))
     {
-      u32 curr_len = vum->cpus[cpu_index].rx_buffers_len;
-      vum->cpus[cpu_index].rx_buffers_len +=
+      u32 curr_len = vum->cpus[thread_index].rx_buffers_len;
+      vum->cpus[thread_index].rx_buffers_len +=
        vlib_buffer_alloc_from_free_list (vm,
-                                         vum->cpus[cpu_index].rx_buffers +
+                                         vum->cpus[thread_index].rx_buffers +
                                          curr_len,
                                          VHOST_USER_RX_BUFFERS_N - curr_len,
                                          VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
 
       if (PREDICT_FALSE
-         (vum->cpus[cpu_index].rx_buffers_len <
+         (vum->cpus[thread_index].rx_buffers_len <
           VHOST_USER_RX_BUFFER_STARVATION))
        {
          /* In case of buffer starvation, discard some packets from the queue
           * and log the event.
           * We keep doing best effort for the remaining packets. */
-         u32 flush = (n_left + 1 > vum->cpus[cpu_index].rx_buffers_len) ?
-           n_left + 1 - vum->cpus[cpu_index].rx_buffers_len : 1;
+         u32 flush = (n_left + 1 > vum->cpus[thread_index].rx_buffers_len) ?
+           n_left + 1 - vum->cpus[thread_index].rx_buffers_len : 1;
          flush = vhost_user_rx_discard_packet (vm, vui, txvq, flush);
 
          n_left -= flush;
          vlib_increment_simple_counter (vnet_main.
                                         interface_main.sw_if_counters +
                                         VNET_INTERFACE_COUNTER_DROP,
-                                        os_get_cpu_number (),
+                                        vlib_get_thread_index (),
                                         vui->sw_if_index, flush);
 
          vlib_error_count (vm, vhost_user_input_node.index,
@@ -1581,7 +1697,7 @@ vhost_user_if_input (vlib_main_t * vm,
          u32 desc_data_offset;
          vring_desc_t *desc_table = txvq->desc;
 
-         if (PREDICT_FALSE (vum->cpus[cpu_index].rx_buffers_len <= 1))
+         if (PREDICT_FALSE (vum->cpus[thread_index].rx_buffers_len <= 1))
            {
              /* Not enough rx_buffers
               * Note: We yeld on 1 so we don't need to do an additional
@@ -1592,17 +1708,18 @@ vhost_user_if_input (vlib_main_t * vm,
            }
 
          desc_current = txvq->avail->ring[txvq->last_avail_idx & qsz_mask];
-         vum->cpus[cpu_index].rx_buffers_len--;
-         bi_current = (vum->cpus[cpu_index].rx_buffers)
-           [vum->cpus[cpu_index].rx_buffers_len];
+         vum->cpus[thread_index].rx_buffers_len--;
+         bi_current = (vum->cpus[thread_index].rx_buffers)
+           [vum->cpus[thread_index].rx_buffers_len];
          b_head = b_current = vlib_get_buffer (vm, bi_current);
          to_next[0] = bi_current;      //We do that now so we can forget about bi_current
          to_next++;
          n_left_to_next--;
 
          vlib_prefetch_buffer_with_index (vm,
-                                          (vum->cpus[cpu_index].rx_buffers)
-                                          [vum->cpus[cpu_index].
+                                          (vum->
+                                           cpus[thread_index].rx_buffers)
+                                          [vum->cpus[thread_index].
                                            rx_buffers_len - 1], LOAD);
 
          /* Just preset the used descriptor id and length for later */
@@ -1676,7 +1793,7 @@ vhost_user_if_input (vlib_main_t * vm,
                  (b_current->current_length == VLIB_BUFFER_DATA_SIZE))
                {
                  if (PREDICT_FALSE
-                     (vum->cpus[cpu_index].rx_buffers_len == 0))
+                     (vum->cpus[thread_index].rx_buffers_len == 0))
                    {
                      /* Cancel speculation */
                      to_next--;
@@ -1690,17 +1807,18 @@ vhost_user_if_input (vlib_main_t * vm,
                       * but valid.
                       */
                      vhost_user_input_rewind_buffers (vm,
-                                                      &vum->cpus[cpu_index],
+                                                      &vum->cpus
+                                                      [thread_index],
                                                       b_head);
                      n_left = 0;
                      goto stop;
                    }
 
                  /* Get next output */
-                 vum->cpus[cpu_index].rx_buffers_len--;
+                 vum->cpus[thread_index].rx_buffers_len--;
                  u32 bi_next =
-                   (vum->cpus[cpu_index].rx_buffers)[vum->cpus
-                                                     [cpu_index].rx_buffers_len];
+                   (vum->cpus[thread_index].rx_buffers)[vum->cpus
+                                                        [thread_index].rx_buffers_len];
                  b_current->next_buffer = bi_next;
                  b_current->flags |= VLIB_BUFFER_NEXT_PRESENT;
                  bi_current = bi_next;
@@ -1708,7 +1826,7 @@ vhost_user_if_input (vlib_main_t * vm,
                }
 
              /* Prepare a copy order executed later for the data */
-             vhost_copy_t *cpy = &vum->cpus[cpu_index].copy[copy_len];
+             vhost_copy_t *cpy = &vum->cpus[thread_index].copy[copy_len];
              copy_len++;
              u32 desc_data_l =
                desc_table[desc_current].len - desc_data_offset;
@@ -1747,7 +1865,7 @@ vhost_user_if_input (vlib_main_t * vm,
 
            /* redirect if feature path enabled */
            vnet_feature_start_device_input_x1 (vui->sw_if_index, &next0,
-                                               b_head, 0);
+                                               b_head);
 
            u32 bi = to_next[-1];       //Cannot use to_next[-1] in the macro
            vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
@@ -1765,7 +1883,7 @@ vhost_user_if_input (vlib_main_t * vm,
          if (PREDICT_FALSE (copy_len >= VHOST_USER_RX_COPY_THRESHOLD))
            {
              if (PREDICT_FALSE
-                 (vhost_user_input_copy (vui, vum->cpus[cpu_index].copy,
+                 (vhost_user_input_copy (vui, vum->cpus[thread_index].copy,
                                          copy_len, &map_hint)))
                {
                  clib_warning
@@ -1790,7 +1908,7 @@ vhost_user_if_input (vlib_main_t * vm,
 
   /* Do the memory copies */
   if (PREDICT_FALSE
-      (vhost_user_input_copy (vui, vum->cpus[cpu_index].copy,
+      (vhost_user_input_copy (vui, vum->cpus[thread_index].copy,
                              copy_len, &map_hint)))
     {
       clib_warning ("Memory mapping error on interface hw_if_index=%d "
@@ -1805,7 +1923,8 @@ vhost_user_if_input (vlib_main_t * vm,
   vhost_user_log_dirty_ring (vui, txvq, idx);
 
   /* interrupt (call) handling */
-  if ((txvq->callfd_idx != ~0) && !(txvq->avail->flags & 1))
+  if ((txvq->callfd_idx != ~0) &&
+      !(txvq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
     {
       txvq->n_since_last_int += n_rx_packets;
 
@@ -1817,9 +1936,9 @@ vhost_user_if_input (vlib_main_t * vm,
   vlib_increment_combined_counter
     (vnet_main.interface_main.combined_sw_if_counters
      + VNET_INTERFACE_COUNTER_RX,
-     os_get_cpu_number (), vui->sw_if_index, n_rx_packets, n_rx_bytes);
+     vlib_get_thread_index (), vui->sw_if_index, n_rx_packets, n_rx_bytes);
 
-  vnet_device_increment_rx_packets (cpu_index, n_rx_packets);
+  vnet_device_increment_rx_packets (thread_index, n_rx_packets);
 
   return n_rx_packets;
 }
@@ -1830,17 +1949,34 @@ vhost_user_input (vlib_main_t * vm,
 {
   vhost_user_main_t *vum = &vhost_user_main;
   uword n_rx_packets = 0;
-  u32 cpu_index = os_get_cpu_number ();
+  u32 thread_index = vlib_get_thread_index ();
+  vhost_iface_and_queue_t *vhiq;
+  vhost_user_intf_t *vui;
+  vhost_cpu_t *vhc;
 
+  vhc = &vum->cpus[thread_index];
+  if (PREDICT_TRUE (vhc->operation_mode == VHOST_USER_POLLING_MODE))
+    {
+      vec_foreach (vhiq, vum->cpus[thread_index].rx_queues)
+      {
+       vui = &vum->vhost_user_interfaces[vhiq->vhost_iface_index];
+       n_rx_packets += vhost_user_if_input (vm, vum, vui, vhiq->qid, node);
+      }
+    }
+  else
+    {
+      int i;
 
-  vhost_iface_and_queue_t *vhiq;
-  vec_foreach (vhiq, vum->cpus[cpu_index].rx_queues)
-  {
-    vhost_user_intf_t *vui =
-      &vum->vhost_user_interfaces[vhiq->vhost_iface_index];
-    n_rx_packets += vhost_user_if_input (vm, vum, vui, vhiq->qid, node);
-  }
+      /* *INDENT-OFF* */
+      clib_bitmap_foreach (i, vhc->pending_input_bitmap, ({
+       int qid = i & 0xff;
 
+       clib_bitmap_set (vhc->pending_input_bitmap, i, 0);
+       vui = pool_elt_at_index (vum->vhost_user_interfaces, i >> 8);
+       n_rx_packets += vhost_user_if_input (vm, vum, vui, qid, node);
+      }));
+      /* *INDENT-ON* */
+    }
   return n_rx_packets;
 }
 
@@ -1963,7 +2099,7 @@ vhost_user_tx (vlib_main_t * vm,
   vhost_user_vring_t *rxvq;
   u16 qsz_mask;
   u8 error;
-  u32 cpu_index = os_get_cpu_number ();
+  u32 thread_index = vlib_get_thread_index ();
   u32 map_hint = 0;
   u8 retry = 8;
   u16 copy_len;
@@ -1983,7 +2119,7 @@ vhost_user_tx (vlib_main_t * vm,
 
   qid =
     VHOST_VRING_IDX_RX (*vec_elt_at_index
-                       (vui->per_cpu_tx_qid, os_get_cpu_number ()));
+                       (vui->per_cpu_tx_qid, vlib_get_thread_index ()));
   rxvq = &vui->vrings[qid];
   if (PREDICT_FALSE (vui->use_tx_spinlock))
     vhost_user_vring_lock (vui, qid);
@@ -2010,10 +2146,10 @@ retry:
 
       if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
        {
-         vum->cpus[cpu_index].current_trace =
+         vum->cpus[thread_index].current_trace =
            vlib_add_trace (vm, node, b0,
-                           sizeof (*vum->cpus[cpu_index].current_trace));
-         vhost_user_tx_trace (vum->cpus[cpu_index].current_trace,
+                           sizeof (*vum->cpus[thread_index].current_trace));
+         vhost_user_tx_trace (vum->cpus[thread_index].current_trace,
                               vui, qid / 2, b0, rxvq);
        }
 
@@ -2055,14 +2191,14 @@ retry:
       {
        // Get a header from the header array
        virtio_net_hdr_mrg_rxbuf_t *hdr =
-         &vum->cpus[cpu_index].tx_headers[tx_headers_len];
+         &vum->cpus[thread_index].tx_headers[tx_headers_len];
        tx_headers_len++;
        hdr->hdr.flags = 0;
        hdr->hdr.gso_type = 0;
        hdr->num_buffers = 1;   //This is local, no need to check
 
        // Prepare a copy order executed later for the header
-       vhost_copy_t *cpy = &vum->cpus[cpu_index].copy[copy_len];
+       vhost_copy_t *cpy = &vum->cpus[thread_index].copy[copy_len];
        copy_len++;
        cpy->len = vui->virtio_net_hdr_sz;
        cpy->dst = buffer_map_addr;
@@ -2087,7 +2223,7 @@ retry:
              else if (vui->virtio_net_hdr_sz == 12)    //MRG is available
                {
                  virtio_net_hdr_mrg_rxbuf_t *hdr =
-                   &vum->cpus[cpu_index].tx_headers[tx_headers_len - 1];
+                   &vum->cpus[thread_index].tx_headers[tx_headers_len - 1];
 
                  //Move from available to used buffer
                  rxvq->used->ring[rxvq->last_used_idx & qsz_mask].id =
@@ -2149,7 +2285,7 @@ retry:
            }
 
          {
-           vhost_copy_t *cpy = &vum->cpus[cpu_index].copy[copy_len];
+           vhost_copy_t *cpy = &vum->cpus[thread_index].copy[copy_len];
            copy_len++;
            cpy->len = bytes_left;
            cpy->len = (cpy->len > buffer_len) ? buffer_len : cpy->len;
@@ -2192,8 +2328,8 @@ retry:
 
       if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
        {
-         vum->cpus[cpu_index].current_trace->hdr =
-           vum->cpus[cpu_index].tx_headers[tx_headers_len - 1];
+         vum->cpus[thread_index].current_trace->hdr =
+           vum->cpus[thread_index].tx_headers[tx_headers_len - 1];
        }
 
       n_left--;                        //At the end for error counting when 'goto done' is invoked
@@ -2203,7 +2339,7 @@ retry:
 done:
   //Do the memory copies
   if (PREDICT_FALSE
-      (vhost_user_tx_copy (vui, vum->cpus[cpu_index].copy,
+      (vhost_user_tx_copy (vui, vum->cpus[thread_index].copy,
                           copy_len, &map_hint)))
     {
       clib_warning ("Memory mapping error on interface hw_if_index=%d "
@@ -2235,7 +2371,8 @@ done:
     }
 
   /* interrupt (call) handling */
-  if ((rxvq->callfd_idx != ~0) && !(rxvq->avail->flags & 1))
+  if ((rxvq->callfd_idx != ~0) &&
+      !(rxvq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
     {
       rxvq->n_since_last_int += frame->n_vectors - n_left;
 
@@ -2252,7 +2389,7 @@ done3:
       vlib_increment_simple_counter
        (vnet_main.interface_main.sw_if_counters
         + VNET_INTERFACE_COUNTER_DROP,
-        os_get_cpu_number (), vui->sw_if_index, n_left);
+        vlib_get_thread_index (), vui->sw_if_index, n_left);
     }
 
   vlib_buffer_free (vm, vlib_frame_args (frame), frame->n_vectors);
@@ -2306,14 +2443,11 @@ vhost_user_process (vlib_main_t * vm,
   f64 timeout = 3153600000.0 /* 100 years */ ;
   uword *event_data = 0;
 
-  sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
+  sockfd = -1;
   sun.sun_family = AF_UNIX;
   template.read_function = vhost_user_socket_read;
   template.error_function = vhost_user_socket_error;
 
-  if (sockfd < 0)
-    return 0;
-
   while (1)
     {
       vlib_process_wait_for_event_or_clock (vm, timeout);
@@ -2328,6 +2462,23 @@ vhost_user_process (vlib_main_t * vm,
          if (vui->unix_server_index == ~0) { //Nothing to do for server sockets
              if (vui->unix_file_index == ~0)
                {
+                 if ((sockfd < 0) &&
+                     ((sockfd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0))
+                   {
+                     /*
+                      * 1st time error or new error for this interface,
+                      * spit out the message and record the error
+                      */
+                     if (!vui->sock_errno || (vui->sock_errno != errno))
+                       {
+                         clib_unix_warning
+                           ("Error: Could not open unix socket for %s",
+                            vui->sock_filename);
+                         vui->sock_errno = errno;
+                       }
+                     continue;
+                   }
+
                  /* try to connect */
                  strncpy (sun.sun_path, (char *) vui->sock_filename,
                           sizeof (sun.sun_path) - 1);
@@ -2349,11 +2500,8 @@ vhost_user_process (vlib_main_t * vm,
                          vui - vhost_user_main.vhost_user_interfaces;
                      vui->unix_file_index = unix_file_add (&unix_main, &template);
 
-                     //Re-open for next connect
-                     if ((sockfd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0) {
-                         clib_warning("Critical: Could not open unix socket");
-                         return 0;
-                     }
+                     /* This sockfd is considered consumed */
+                     sockfd = -1;
                    }
                  else
                    {
@@ -2540,7 +2688,7 @@ vhost_user_vui_init (vnet_main_t * vnm,
                     vhost_user_intf_t * vui,
                     int server_sock_fd,
                     const char *sock_filename,
-                    u64 feature_mask, u32 * sw_if_index)
+                    u64 feature_mask, u32 * sw_if_index, u8 operation_mode)
 {
   vnet_sw_interface_t *sw;
   sw = vnet_get_hw_sw_interface (vnm, vui->hw_if_index);
@@ -2567,6 +2715,7 @@ vhost_user_vui_init (vnet_main_t * vnm,
   vui->feature_mask = feature_mask;
   vui->unix_file_index = ~0;
   vui->log_base_addr = 0;
+  vui->operation_mode = operation_mode;
 
   for (q = 0; q < VHOST_VRING_MAX_N; q++)
     vhost_user_vring_init (vui, q);
@@ -2588,18 +2737,113 @@ vhost_user_vui_init (vnet_main_t * vnm,
   vhost_user_tx_thread_placement (vui);
 }
 
+static uword
+vhost_user_send_interrupt_process (vlib_main_t * vm,
+                                  vlib_node_runtime_t * rt, vlib_frame_t * f)
+{
+  vhost_user_intf_t *vui;
+  f64 timeout = 3153600000.0 /* 100 years */ ;
+  uword event_type, *event_data = 0;
+  vhost_user_main_t *vum = &vhost_user_main;
+  vhost_iface_and_queue_t *vhiq;
+  vhost_cpu_t *vhc;
+  f64 now, poll_time_remaining;
+
+  while (1)
+    {
+      poll_time_remaining =
+       vlib_process_wait_for_event_or_clock (vm, timeout);
+      event_type = vlib_process_get_events (vm, &event_data);
+      vec_reset_length (event_data);
+
+      /*
+       * Use the remaining timeout if it is less than coalesce time to avoid
+       * resetting the existing timer in the middle of expiration
+       */
+      timeout = poll_time_remaining;
+      if (vlib_process_suspend_time_is_zero (timeout) ||
+         (timeout > vum->coalesce_time))
+       timeout = vum->coalesce_time;
+
+      now = vlib_time_now (vm);
+      switch (event_type)
+       {
+       case VHOST_USER_EVENT_START_TIMER:
+         if (!vlib_process_suspend_time_is_zero (poll_time_remaining))
+           break;
+         /* fall through */
+
+       case ~0:
+         vec_foreach (vhc, vum->cpus)
+         {
+           u32 thread_index = vhc - vum->cpus;
+           f64 next_timeout;
+
+           next_timeout = timeout;
+           vec_foreach (vhiq, vum->cpus[thread_index].rx_queues)
+           {
+             vui = &vum->vhost_user_interfaces[vhiq->vhost_iface_index];
+             vhost_user_vring_t *rxvq =
+               &vui->vrings[VHOST_VRING_IDX_RX (vhiq->qid)];
+             vhost_user_vring_t *txvq =
+               &vui->vrings[VHOST_VRING_IDX_TX (vhiq->qid)];
+
+             if (txvq->n_since_last_int)
+               {
+                 if (now >= txvq->int_deadline)
+                   vhost_user_send_call (vm, txvq);
+                 else
+                   next_timeout = txvq->int_deadline - now;
+               }
+
+             if (rxvq->n_since_last_int)
+               {
+                 if (now >= rxvq->int_deadline)
+                   vhost_user_send_call (vm, rxvq);
+                 else
+                   next_timeout = rxvq->int_deadline - now;
+               }
+
+             if ((next_timeout < timeout) && (next_timeout > 0.0))
+               timeout = next_timeout;
+           }
+         }
+         break;
+
+       default:
+         clib_warning ("BUG: unhandled event type %d", event_type);
+         break;
+       }
+    }
+  return 0;
+}
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (vhost_user_send_interrupt_node,static) = {
+    .function = vhost_user_send_interrupt_process,
+    .type = VLIB_NODE_TYPE_PROCESS,
+    .name = "vhost-user-send-interrupt-process",
+};
+/* *INDENT-ON* */
+
 int
 vhost_user_create_if (vnet_main_t * vnm, vlib_main_t * vm,
                      const char *sock_filename,
                      u8 is_server,
                      u32 * sw_if_index,
                      u64 feature_mask,
-                     u8 renumber, u32 custom_dev_instance, u8 * hwaddr)
+                     u8 renumber, u32 custom_dev_instance, u8 * hwaddr,
+                     u8 operation_mode)
 {
   vhost_user_intf_t *vui = NULL;
   u32 sw_if_idx = ~0;
   int rv = 0;
   int server_sock_fd = -1;
+  vhost_user_main_t *vum = &vhost_user_main;
+
+  if ((operation_mode != VHOST_USER_POLLING_MODE) &&
+      (operation_mode != VHOST_USER_INTERRUPT_MODE))
+    return VNET_API_ERROR_UNIMPLEMENTED;
 
   if (sock_filename == NULL || !(strlen (sock_filename) > 0))
     {
@@ -2619,7 +2863,7 @@ vhost_user_create_if (vnet_main_t * vnm, vlib_main_t * vm,
 
   vhost_user_create_ethernet (vnm, vm, vui, hwaddr);
   vhost_user_vui_init (vnm, vui, server_sock_fd, sock_filename,
-                      feature_mask, &sw_if_idx);
+                      feature_mask, &sw_if_idx, operation_mode);
 
   if (renumber)
     vnet_interface_name_renumber (sw_if_idx, custom_dev_instance);
@@ -2629,6 +2873,15 @@ vhost_user_create_if (vnet_main_t * vnm, vlib_main_t * vm,
 
   // Process node must connect
   vlib_process_signal_event (vm, vhost_user_process_node.index, 0, 0);
+
+  if ((operation_mode == VHOST_USER_INTERRUPT_MODE) &&
+      !vum->interrupt_mode && (vum->coalesce_time > 0.0) &&
+      (vum->coalesce_frames > 0))
+    {
+      vum->interrupt_mode = 1;
+      vlib_process_signal_event (vm, vhost_user_send_interrupt_node.index,
+                                VHOST_USER_EVENT_START_TIMER, 0);
+    }
   return rv;
 }
 
@@ -2637,7 +2890,8 @@ vhost_user_modify_if (vnet_main_t * vnm, vlib_main_t * vm,
                      const char *sock_filename,
                      u8 is_server,
                      u32 sw_if_index,
-                     u64 feature_mask, u8 renumber, u32 custom_dev_instance)
+                     u64 feature_mask, u8 renumber, u32 custom_dev_instance,
+                     u8 operation_mode)
 {
   vhost_user_main_t *vum = &vhost_user_main;
   vhost_user_intf_t *vui = NULL;
@@ -2646,6 +2900,9 @@ vhost_user_modify_if (vnet_main_t * vnm, vlib_main_t * vm,
   int rv = 0;
   vnet_hw_interface_t *hwif;
 
+  if ((operation_mode != VHOST_USER_POLLING_MODE) &&
+      (operation_mode != VHOST_USER_INTERRUPT_MODE))
+    return VNET_API_ERROR_UNIMPLEMENTED;
   if (!(hwif = vnet_get_sup_hw_interface (vnm, sw_if_index)) ||
       hwif->dev_class_index != vhost_user_dev_class.index)
     return VNET_API_ERROR_INVALID_SW_IF_INDEX;
@@ -2660,16 +2917,42 @@ vhost_user_modify_if (vnet_main_t * vnm, vlib_main_t * vm,
 
   vhost_user_term_if (vui);
   vhost_user_vui_init (vnm, vui, server_sock_fd,
-                      sock_filename, feature_mask, &sw_if_idx);
+                      sock_filename, feature_mask, &sw_if_idx,
+                      operation_mode);
 
   if (renumber)
     vnet_interface_name_renumber (sw_if_idx, custom_dev_instance);
 
   // Process node must connect
   vlib_process_signal_event (vm, vhost_user_process_node.index, 0, 0);
+
+  if ((operation_mode == VHOST_USER_INTERRUPT_MODE) &&
+      !vum->interrupt_mode && (vum->coalesce_time > 0.0) &&
+      (vum->coalesce_frames > 0))
+    {
+      vum->interrupt_mode = 1;
+      vlib_process_signal_event (vm, vhost_user_send_interrupt_node.index,
+                                VHOST_USER_EVENT_START_TIMER, 0);
+    }
   return rv;
 }
 
+static uword
+unformat_vhost_user_operation_mode (unformat_input_t * input, va_list * args)
+{
+  u8 *operation_mode = va_arg (*args, u8 *);
+  uword rc = 1;
+
+  if (unformat (input, "interrupt"))
+    *operation_mode = VHOST_USER_INTERRUPT_MODE;
+  else if (unformat (input, "polling"))
+    *operation_mode = VHOST_USER_POLLING_MODE;
+  else
+    rc = 0;
+
+  return rc;
+}
+
 clib_error_t *
 vhost_user_connect_command_fn (vlib_main_t * vm,
                               unformat_input_t * input,
@@ -2685,6 +2968,7 @@ vhost_user_connect_command_fn (vlib_main_t * vm,
   u8 hwaddr[6];
   u8 *hw = NULL;
   clib_error_t *error = NULL;
+  u8 operation_mode = VHOST_USER_POLLING_MODE;
 
   /* Get a line of input. */
   if (!unformat_user (input, unformat_line_input, line_input))
@@ -2706,6 +2990,9 @@ vhost_user_connect_command_fn (vlib_main_t * vm,
        {
          renumber = 1;
        }
+      else if (unformat (line_input, "mode %U",
+                        unformat_vhost_user_operation_mode, &operation_mode))
+       ;
       else
        {
          error = clib_error_return (0, "unknown input `%U'",
@@ -2719,7 +3006,8 @@ vhost_user_connect_command_fn (vlib_main_t * vm,
   int rv;
   if ((rv = vhost_user_create_if (vnm, vm, (char *) sock_filename,
                                  is_server, &sw_if_index, feature_mask,
-                                 renumber, custom_dev_instance, hw)))
+                                 renumber, custom_dev_instance, hw,
+                                 operation_mode)))
     {
       error = clib_error_return (0, "vhost_user_create_if returned %d", rv);
       goto done;
@@ -2809,6 +3097,7 @@ vhost_user_dump_ifs (vnet_main_t * vnm, vlib_main_t * vm,
       vui = pool_elt_at_index (vum->vhost_user_interfaces, hi->dev_instance);
 
       vec_add2 (r_vuids, vuid, 1);
+      vuid->operation_mode = vui->operation_mode;
       vuid->sw_if_index = vui->sw_if_index;
       vuid->virtio_net_hdr_sz = vui->virtio_net_hdr_sz;
       vuid->features = vui->features;
@@ -2833,6 +3122,25 @@ vhost_user_dump_ifs (vnet_main_t * vnm, vlib_main_t * vm,
   return rv;
 }
 
+static u8 *
+format_vhost_user_operation_mode (u8 * s, va_list * va)
+{
+  int operation_mode = va_arg (*va, int);
+
+  switch (operation_mode)
+    {
+    case VHOST_USER_POLLING_MODE:
+      s = format (s, "%s", "polling");
+      break;
+    case VHOST_USER_INTERRUPT_MODE:
+      s = format (s, "%s", "interrupt");
+      break;
+    default:
+      s = format (s, "%s", "invalid");
+    }
+  return s;
+}
+
 clib_error_t *
 show_vhost_user_command_fn (vlib_main_t * vm,
                            unformat_input_t * input,
@@ -2941,14 +3249,22 @@ show_vhost_user_command_fn (vlib_main_t * vm,
                       (vui->unix_server_index != ~0) ? "server" : "client",
                       strerror (vui->sock_errno));
 
+      vlib_cli_output (vm, " configured mode: %U\n",
+                      format_vhost_user_operation_mode, vui->operation_mode);
       vlib_cli_output (vm, " rx placement: ");
       vec_foreach (vhc, vum->cpus)
       {
        vec_foreach (vhiq, vhc->rx_queues)
        {
          if (vhiq->vhost_iface_index == vui - vum->vhost_user_interfaces)
-           vlib_cli_output (vm, "   thread %d on vring %d\n",
-                            vhc - vum->cpus, VHOST_VRING_IDX_TX (vhiq->qid));
+           {
+             vlib_cli_output (vm, "   thread %d on vring %d\n",
+                              vhc - vum->cpus,
+                              VHOST_VRING_IDX_TX (vhiq->qid));
+             vlib_cli_output (vm, "   mode: %U\n",
+                              format_vhost_user_operation_mode,
+                              vhc->operation_mode);
+           }
        }
       }
 
@@ -3078,6 +3394,9 @@ done:
  * in the name to be specified. If instance already exists, name will be used
  * anyway and multiple instances will have the same name. Use with caution.
  *
+ * - <b>mode [interrupt | polling]</b> - Optional parameter specifying
+ * the input thread polling policy.
+ *
  * @cliexpar
  * Example of how to create a vhost interface with VPP as the client and all features enabled:
  * @cliexstart{create vhost-user socket /tmp/vhost1.sock}
@@ -3094,14 +3413,16 @@ done:
 /* *INDENT-OFF* */
 VLIB_CLI_COMMAND (vhost_user_connect_command, static) = {
     .path = "create vhost-user",
-    .short_help = "create vhost-user socket <socket-filename> [server] [feature-mask <hex>] [hwaddr <mac-addr>] [renumber <dev_instance>]",
+    .short_help = "create vhost-user socket <socket-filename> [server] "
+    "[feature-mask <hex>] [hwaddr <mac-addr>] [renumber <dev_instance>] "
+    "[mode {interrupt | polling}]",
     .function = vhost_user_connect_command_fn,
 };
 /* *INDENT-ON* */
 
 /*?
  * Delete a vHost User interface using the interface name or the
- * software interface index. Use the '<em>show interfaces</em>'
+ * software interface index. Use the '<em>show interface</em>'
  * command to determine the software interface index. On deletion,
  * the linux socket will not be deleted.
  *