vhost: Improve data path 26/4026/8
authorPierre Pfister <ppfister@cisco.com>
Tue, 29 Nov 2016 15:36:14 +0000 (15:36 +0000)
committerDamjan Marion <dmarion.lists@gmail.com>
Tue, 6 Dec 2016 16:22:16 +0000 (16:22 +0000)
This patch significantly improves vhost Tx and Rx
performance (20% Mpps improvement in PVP scenario).

Most significant change consists in the split of
virtio ring parsing and memory copies.
Other minor changes mostly consist in getting
rid of unnecessary variables.

Change-Id: Ia94e12ab9a6c114e3f7fef376a9343823d051209
Signed-off-by: Pierre Pfister <ppfister@cisco.com>
vnet/vnet/devices/virtio/vhost-user.c
vnet/vnet/devices/virtio/vhost-user.h

index 1d7180c..6741e93 100644 (file)
 #define DBG_VQ(args...)
 #endif
 
+/*
+ * When an RX queue is down but active, received packets
+ * must be discarded. This value controls up to how many
+ * packets will be discarded during each round.
+ */
+#define VHOST_USER_DOWN_DISCARD_COUNT 256
+
+/*
+ * When the number of available buffers gets under this threshold,
+ * RX node will start discarding packets.
+ */
+#define VHOST_USER_RX_BUFFER_STARVATION 32
+
+/*
+ * On the receive side, the host should free descriptors as soon
+ * as possible in order to avoid TX drop in the VM.
+ * This value controls the number of copy operations that are stacked
+ * before copy is done for all and descriptors are given back to
+ * the guest.
+ * The value 64 was obtained by testing (48 and 128 were not as good).
+ */
+#define VHOST_USER_RX_COPY_THRESHOLD 64
+
 #define UNIX_GET_FD(unixfd_idx) \
     (unixfd_idx != ~0) ? \
        pool_elt_at_index (unix_main.file_pool, \
@@ -86,7 +109,8 @@ vlib_node_registration_t vhost_user_input_node;
 
 #define foreach_vhost_user_tx_func_error      \
   _(NONE, "no error")  \
-  _(NOT_READY, "vhost user state error")  \
+  _(NOT_READY, "vhost vring not ready")  \
+  _(DOWN, "vhost interface is down")  \
   _(PKT_DROP_NOBUF, "tx packet drops (no available descriptors)")  \
   _(PKT_DROP_NOMRG, "tx packet drops (cannot merge descriptors)")  \
   _(MMAP_FAIL, "mmap failure") \
@@ -587,14 +611,19 @@ vhost_user_if_disconnect (vhost_user_intf_t * vui)
 }
 
 #define VHOST_LOG_PAGE 0x1000
-always_inline void
-vhost_user_log_dirty_pages (vhost_user_intf_t * vui, u64 addr, u64 len)
+static_always_inline void
+vhost_user_log_dirty_pages_2 (vhost_user_intf_t * vui,
+                             u64 addr, u64 len, u8 is_host_address)
 {
   if (PREDICT_TRUE (vui->log_base_addr == 0
                    || !(vui->features & (1 << FEAT_VHOST_F_LOG_ALL))))
     {
       return;
     }
+  if (is_host_address)
+    {
+      addr = (u64) map_user_mem (vui, (uword) addr);
+    }
   if (PREDICT_FALSE ((addr + len - 1) / VHOST_LOG_PAGE / 8 >= vui->log_size))
     {
       DBG_SOCK ("vhost_user_log_dirty_pages(): out of range\n");
@@ -610,6 +639,12 @@ vhost_user_log_dirty_pages (vhost_user_intf_t * vui, u64 addr, u64 len)
     }
 }
 
+static_always_inline void
+vhost_user_log_dirty_pages (vhost_user_intf_t * vui, u64 addr, u64 len)
+{
+  vhost_user_log_dirty_pages_2 (vui, addr, len, 0);
+}
+
 #define vhost_user_log_dirty_ring(vui, vq, member) \
   if (PREDICT_FALSE(vq->log_used)) { \
     vhost_user_log_dirty_pages(vui, vq->log_guest_addr + STRUCT_OFFSET_OF(vring_used_t, member), \
@@ -1185,10 +1220,16 @@ vhost_user_init (vlib_main_t * vm)
   vum->coalesce_frames = 32;
   vum->coalesce_time = 1e-3;
 
-  vec_validate_aligned (vum->rx_buffers, tm->n_vlib_mains - 1,
-                       CLIB_CACHE_LINE_BYTES);
   vec_validate (vum->cpus, tm->n_vlib_mains - 1);
 
+  vhost_cpu_t *cpu;
+  vec_foreach (cpu, vum->cpus)
+  {
+    /* This is actually not necessary as validate already zeroes it
+     * Just keeping the loop here for later because I am lazy. */
+    cpu->rx_buffers_len = 0;
+  }
+
   /* find out which cpus will be used for input */
   vum->input_cpu_first_index = 0;
   vum->input_cpu_count = 1;
@@ -1217,15 +1258,6 @@ vhost_user_exit (vlib_main_t * vm)
 
 VLIB_MAIN_LOOP_EXIT_FUNCTION (vhost_user_exit);
 
-typedef struct
-{
-  u16 qid; /** The interface queue index (Not the virtio vring idx) */
-  u16 device_index; /** The device index */
-  u32 virtio_ring_flags; /** Runtime queue flags  **/
-  u16 first_desc_len; /** Length of the first data descriptor **/
-  virtio_net_hdr_mrg_rxbuf_t hdr; /** Virtio header **/
-} vhost_trace_t;
-
 static u8 *
 format_vhost_trace (u8 * s, va_list * va)
 {
@@ -1285,7 +1317,7 @@ vhost_user_rx_trace (vhost_trace_t * t,
   if (txvq->desc[desc_current].flags & VIRTQ_DESC_F_INDIRECT)
     {
       t->virtio_ring_flags |= 1 << VIRTIO_TRACE_F_INDIRECT;
-      //Header is the first here
+      /* Header is the first here */
       hdr_desc = map_guest_mem (vui, txvq->desc[desc_current].addr, &hint);
     }
   if (txvq->desc[desc_current].flags & VIRTQ_DESC_F_NEXT)
@@ -1324,6 +1356,110 @@ vhost_user_send_call (vlib_main_t * vm, vhost_user_vring_t * vq)
   vq->int_deadline = vlib_time_now (vm) + vum->coalesce_time;
 }
 
+static_always_inline u32
+vhost_user_input_copy (vhost_user_intf_t * vui, vhost_copy_t * cpy,
+                      u16 copy_len, u32 * map_hint)
+{
+  void *src0, *src1, *src2, *src3;
+  if (PREDICT_TRUE (copy_len >= 4))
+    {
+      if (PREDICT_FALSE (!(src2 = map_guest_mem (vui, cpy[0].src, map_hint))))
+       return 1;
+      if (PREDICT_FALSE (!(src3 = map_guest_mem (vui, cpy[1].src, map_hint))))
+       return 1;
+
+      while (PREDICT_TRUE (copy_len >= 4))
+       {
+         src0 = src2;
+         src1 = src3;
+
+         if (PREDICT_FALSE
+             (!(src2 = map_guest_mem (vui, cpy[2].src, map_hint))))
+           return 1;
+         if (PREDICT_FALSE
+             (!(src3 = map_guest_mem (vui, cpy[3].src, map_hint))))
+           return 1;
+
+         CLIB_PREFETCH (src2, 64, LOAD);
+         CLIB_PREFETCH (src3, 64, LOAD);
+
+         clib_memcpy ((void *) cpy[0].dst, src0, cpy[0].len);
+         clib_memcpy ((void *) cpy[1].dst, src1, cpy[1].len);
+         copy_len -= 2;
+         cpy += 2;
+       }
+    }
+  while (copy_len)
+    {
+      if (PREDICT_FALSE (!(src0 = map_guest_mem (vui, cpy->src, map_hint))))
+       return 1;
+      clib_memcpy ((void *) cpy->dst, src0, cpy->len);
+      copy_len -= 1;
+      cpy += 1;
+    }
+  return 0;
+}
+
+/**
+ * Try to discard packets from the tx ring (VPP RX path).
+ * Returns the number of discarded packets.
+ */
+u32
+vhost_user_rx_discard_packet (vlib_main_t * vm,
+                             vhost_user_intf_t * vui,
+                             vhost_user_vring_t * txvq, u32 discard_max)
+{
+  /*
+   * On the RX side, each packet corresponds to one descriptor
+   * (it is the same whether it is a shallow descriptor, chained, or indirect).
+   * Therefore, discarding a packet is like discarding a descriptor.
+   */
+  u32 discarded_packets = 0;
+  u32 avail_idx = txvq->avail->idx;
+  u16 qsz_mask = txvq->qsz - 1;
+  while (discarded_packets != discard_max)
+    {
+      if (avail_idx == txvq->last_avail_idx)
+       goto out;
+
+      u16 desc_chain_head =
+       txvq->avail->ring[txvq->last_avail_idx & qsz_mask];
+      txvq->last_avail_idx++;
+      txvq->used->ring[txvq->last_used_idx & qsz_mask].id = desc_chain_head;
+      txvq->used->ring[txvq->last_used_idx & qsz_mask].len = 0;
+      vhost_user_log_dirty_ring (vui, txvq,
+                                ring[txvq->last_used_idx & qsz_mask]);
+      txvq->last_used_idx++;
+      discarded_packets++;
+    }
+
+out:
+  CLIB_MEMORY_BARRIER ();
+  txvq->used->idx = txvq->last_used_idx;
+  vhost_user_log_dirty_ring (vui, txvq, idx);
+  return discarded_packets;
+}
+
+/*
+ * In case of overflow, we need to rewind the array of allocated buffers.
+ */
+static void
+vhost_user_input_rewind_buffers (vlib_main_t * vm,
+                                vhost_cpu_t * cpu, vlib_buffer_t * b_head)
+{
+  u32 bi_current = cpu->rx_buffers[cpu->rx_buffers_len];
+  vlib_buffer_t *b_current = vlib_get_buffer (vm, bi_current);
+  b_current->current_length = 0;
+  b_current->flags = 0;
+  while (b_current != b_head)
+    {
+      cpu->rx_buffers_len++;
+      bi_current = cpu->rx_buffers[cpu->rx_buffers_len];
+      b_current = vlib_get_buffer (vm, bi_current);
+      b_current->current_length = 0;
+      b_current->flags = 0;
+    }
+}
 
 static u32
 vhost_user_if_input (vlib_main_t * vm,
@@ -1332,26 +1468,28 @@ vhost_user_if_input (vlib_main_t * vm,
                     u16 qid, vlib_node_runtime_t * node)
 {
   vhost_user_vring_t *txvq = &vui->vrings[VHOST_VRING_IDX_TX (qid)];
-  vhost_user_vring_t *rxvq = &vui->vrings[VHOST_VRING_IDX_RX (qid)];
-  uword n_rx_packets = 0, n_rx_bytes = 0;
-  uword n_left;
+  u16 n_rx_packets = 0;
+  u32 n_rx_bytes = 0;
+  u16 n_left;
   u32 n_left_to_next, *to_next;
-  u32 next_index = 0;
-  u32 next0;
-  uword n_trace = vlib_get_trace_count (vm, node);
+  u32 next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
+  u32 n_trace = vlib_get_trace_count (vm, node);
   u16 qsz_mask;
-  u32 cpu_index, rx_len, drops, flush;
-  f64 now = vlib_time_now (vm);
-  u32 map_guest_hint_desc = 0;
-  u32 map_guest_hint_indirect = 0;
-  u32 *map_guest_hint_p = &map_guest_hint_desc;
+  u32 map_hint = 0;
+  u16 cpu_index = os_get_cpu_number ();
+  u16 copy_len = 0;
 
-  /* do we have pending interrupts ? */
-  if ((txvq->n_since_last_int) && (txvq->int_deadline < now))
-    vhost_user_send_call (vm, txvq);
+  {
+    /* do we have pending interrupts ? */
+    vhost_user_vring_t *rxvq = &vui->vrings[VHOST_VRING_IDX_RX (qid)];
+    f64 now = vlib_time_now (vm);
+
+    if ((txvq->n_since_last_int) && (txvq->int_deadline < now))
+      vhost_user_send_call (vm, txvq);
 
-  if ((rxvq->n_since_last_int) && (rxvq->int_deadline < now))
-    vhost_user_send_call (vm, rxvq);
+    if ((rxvq->n_since_last_int) && (rxvq->int_deadline < now))
+      vhost_user_send_call (vm, rxvq);
+  }
 
   if (PREDICT_FALSE (txvq->avail->flags & 0xFFFE))
     return 0;
@@ -1371,82 +1509,65 @@ vhost_user_if_input (vlib_main_t * vm,
        * client must not supply any new RX packets, but must process
        * and discard any TX packets."
        */
-
-      txvq->last_avail_idx = txvq->last_used_idx = txvq->avail->idx;
-      CLIB_MEMORY_BARRIER ();
-      txvq->used->idx = txvq->last_used_idx;
-      vhost_user_log_dirty_ring (vui, txvq, idx);
-      vhost_user_send_call (vm, txvq);
+      vhost_user_rx_discard_packet (vm, vui, txvq,
+                                   VHOST_USER_DOWN_DISCARD_COUNT);
       return 0;
     }
 
   if (PREDICT_FALSE (n_left == txvq->qsz))
     {
-      //Informational error logging when VPP is not receiving packets fast enough
+      /*
+       * Informational error logging when VPP is not
+       * receiving packets fast enough.
+       */
       vlib_error_count (vm, node->node_index,
                        VHOST_USER_INPUT_FUNC_ERROR_FULL_RX_QUEUE, 1);
     }
 
   qsz_mask = txvq->qsz - 1;
-  cpu_index = os_get_cpu_number ();
-  drops = 0;
-  flush = 0;
 
   if (n_left > VLIB_FRAME_SIZE)
     n_left = VLIB_FRAME_SIZE;
 
-  /* Allocate some buffers.
-   * Note that buffers that are chained for jumbo
-   * frames are allocated separately using a slower path.
-   * The idea is to be certain to have enough buffers at least
-   * to cycle through the descriptors without having to check for errors.
-   * For jumbo frames, the bottleneck is memory copy anyway.
+  /*
+   * For small packets (<2kB), we will not need more than one vlib buffer
+   * per packet. In case packets are bigger, we will just yeld at some point
+   * in the loop and come back later. This is not an issue as for big packet,
+   * processing cost really comes from the memory copy.
    */
-  if (PREDICT_FALSE (!vum->rx_buffers[cpu_index]))
+  if (PREDICT_FALSE (vum->cpus[cpu_index].rx_buffers_len < n_left + 1))
     {
-      vec_alloc (vum->rx_buffers[cpu_index], 2 * VLIB_FRAME_SIZE);
-
-      if (PREDICT_FALSE (!vum->rx_buffers[cpu_index]))
-       flush = n_left;         //Drop all input
-    }
-
-  if (PREDICT_FALSE (_vec_len (vum->rx_buffers[cpu_index]) < n_left))
-    {
-      u32 curr_len = _vec_len (vum->rx_buffers[cpu_index]);
-      _vec_len (vum->rx_buffers[cpu_index]) +=
+      u32 curr_len = vum->cpus[cpu_index].rx_buffers_len;
+      vum->cpus[cpu_index].rx_buffers_len +=
        vlib_buffer_alloc_from_free_list (vm,
-                                         vum->rx_buffers[cpu_index] +
+                                         vum->cpus[cpu_index].rx_buffers +
                                          curr_len,
-                                         2 * VLIB_FRAME_SIZE - curr_len,
+                                         VHOST_USER_RX_BUFFERS_N - curr_len,
                                          VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
 
-      if (PREDICT_FALSE (n_left > _vec_len (vum->rx_buffers[cpu_index])))
-       flush = n_left - _vec_len (vum->rx_buffers[cpu_index]);
-    }
-
-  if (PREDICT_FALSE (flush))
-    {
-      //Remove some input buffers
-      drops += flush;
-      n_left -= flush;
-      vlib_error_count (vm, vhost_user_input_node.index,
-                       VHOST_USER_INPUT_FUNC_ERROR_NO_BUFFER, flush);
-      while (flush)
+      if (PREDICT_FALSE
+         (vum->cpus[cpu_index].rx_buffers_len <
+          VHOST_USER_RX_BUFFER_STARVATION))
        {
-         u16 desc_chain_head =
-           txvq->avail->ring[txvq->last_avail_idx & qsz_mask];
-         txvq->last_avail_idx++;
-         txvq->used->ring[txvq->last_used_idx & qsz_mask].id =
-           desc_chain_head;
-         txvq->used->ring[txvq->last_used_idx & qsz_mask].len = 0;
-         vhost_user_log_dirty_ring (vui, txvq,
-                                    ring[txvq->last_used_idx & qsz_mask]);
-         txvq->last_used_idx++;
-         flush--;
+         /* In case of buffer starvation, discard some packets from the queue
+          * and log the event.
+          * We keep doing best effort for the remaining packets. */
+         u32 flush = (n_left + 1 > vum->cpus[cpu_index].rx_buffers_len) ?
+           n_left + 1 - vum->cpus[cpu_index].rx_buffers_len : 1;
+         flush = vhost_user_rx_discard_packet (vm, vui, txvq, flush);
+
+         n_left -= flush;
+         vlib_increment_simple_counter (vnet_main.
+                                        interface_main.sw_if_counters +
+                                        VNET_INTERFACE_COUNTER_DROP,
+                                        os_get_cpu_number (),
+                                        vui->sw_if_index, flush);
+
+         vlib_error_count (vm, vhost_user_input_node.index,
+                           VHOST_USER_INPUT_FUNC_ERROR_NO_BUFFER, flush);
        }
     }
 
-  rx_len = vec_len (vum->rx_buffers[cpu_index]);       //vector might be null
   while (n_left > 0)
     {
       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
@@ -1454,32 +1575,48 @@ vhost_user_if_input (vlib_main_t * vm,
       while (n_left > 0 && n_left_to_next > 0)
        {
          vlib_buffer_t *b_head, *b_current;
-         u32 bi_head, bi_current;
-         u16 desc_chain_head, desc_current;
-         u8 error = VHOST_USER_INPUT_FUNC_ERROR_NO_ERROR;
+         u32 bi_current;
+         u16 desc_current;
+         u32 desc_data_offset;
+         vring_desc_t *desc_table = txvq->desc;
 
-         if (PREDICT_TRUE (n_left > 1))
+         if (PREDICT_FALSE (vum->cpus[cpu_index].rx_buffers_len <= 1))
            {
-             u32 next_desc =
-               txvq->avail->ring[(txvq->last_avail_idx + 1) & qsz_mask];
-             void *buffer_addr =
-               map_guest_mem (vui, txvq->desc[next_desc].addr,
-                              &map_guest_hint_desc);
-             if (PREDICT_TRUE (buffer_addr != 0))
-               CLIB_PREFETCH (buffer_addr, 64, STORE);
-
-             u32 bi = vum->rx_buffers[cpu_index][rx_len - 2];
-             vlib_prefetch_buffer_with_index (vm, bi, STORE);
-             CLIB_PREFETCH (vlib_get_buffer (vm, bi)->data, 128, STORE);
+             /* Not enough rx_buffers
+              * Note: We yeld on 1 so we don't need to do an additional
+              * check for the next buffer prefetch.
+              */
+             n_left = 0;
+             break;
            }
 
-         desc_chain_head = desc_current =
-           txvq->avail->ring[txvq->last_avail_idx & qsz_mask];
-         bi_head = bi_current = vum->rx_buffers[cpu_index][--rx_len];
-         b_head = b_current = vlib_get_buffer (vm, bi_head);
-         vlib_buffer_chain_init (b_head);
+         desc_current = txvq->avail->ring[txvq->last_avail_idx & qsz_mask];
+         vum->cpus[cpu_index].rx_buffers_len--;
+         bi_current = (vum->cpus[cpu_index].rx_buffers)
+           [vum->cpus[cpu_index].rx_buffers_len];
+         b_head = b_current = vlib_get_buffer (vm, bi_current);
+         to_next[0] = bi_current;      //We do that now so we can forget about bi_current
+         to_next++;
+         n_left_to_next--;
+
+         vlib_prefetch_buffer_with_index (vm,
+                                          (vum->cpus[cpu_index].rx_buffers)
+                                          [vum->cpus[cpu_index].
+                                           rx_buffers_len - 1], LOAD);
+
+         /* Just preset the used descriptor id and length for later */
+         txvq->used->ring[txvq->last_used_idx & qsz_mask].id = desc_current;
+         txvq->used->ring[txvq->last_used_idx & qsz_mask].len = 0;
+         vhost_user_log_dirty_ring (vui, txvq,
+                                    ring[txvq->last_used_idx & qsz_mask]);
+
+         /* The buffer should already be initialized */
+         b_head->total_length_not_including_first_buffer = 0;
+         b_head->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
+
          if (PREDICT_FALSE (n_trace))
            {
+             //TODO: next_index is not exactly known at that point
              vlib_trace_buffer (vm, node, next_index, b_head,
                                 /* follow_chain */ 0);
              vhost_trace_t *t0 =
@@ -1489,148 +1626,173 @@ vhost_user_if_input (vlib_main_t * vm,
              vlib_set_trace_count (vm, node, n_trace);
            }
 
-         uword offset;
-         if (PREDICT_TRUE (vui->is_any_layout) ||
-             (!(txvq->desc[desc_current].flags & VIRTQ_DESC_F_NEXT) &&
-              !(txvq->desc[desc_current].flags & VIRTQ_DESC_F_INDIRECT)))
-           {
-             /* ANYLAYOUT or single buffer */
-             offset = vui->virtio_net_hdr_sz;
-           }
-         else
-           {
-             /* CSR case without ANYLAYOUT, skip 1st buffer */
-             offset = txvq->desc[desc_current].len;
-           }
-
-         vring_desc_t *desc_table = txvq->desc;
-         u32 desc_index = desc_current;
-         map_guest_hint_p = &map_guest_hint_desc;
-
+         /* This depends on the setup but is very consistent
+          * So I think the CPU branch predictor will make a pretty good job
+          * at optimizing the decision. */
          if (txvq->desc[desc_current].flags & VIRTQ_DESC_F_INDIRECT)
            {
              desc_table = map_guest_mem (vui, txvq->desc[desc_current].addr,
-                                         &map_guest_hint_desc);
-             desc_index = 0;
-             map_guest_hint_p = &map_guest_hint_indirect;
+                                         &map_hint);
+             desc_current = 0;
              if (PREDICT_FALSE (desc_table == 0))
                {
-                 error = VHOST_USER_INPUT_FUNC_ERROR_MMAP_FAIL;
+                 //FIXME: Handle error by shutdown the queue
                  goto out;
                }
            }
 
-         while (1)
+         if (PREDICT_TRUE (vui->is_any_layout) ||
+             (!(desc_table[desc_current].flags & VIRTQ_DESC_F_NEXT)))
            {
-             void *buffer_addr =
-               map_guest_mem (vui, desc_table[desc_index].addr,
-                              map_guest_hint_p);
-             if (PREDICT_FALSE (buffer_addr == 0))
-               {
-                 error = VHOST_USER_INPUT_FUNC_ERROR_MMAP_FAIL;
-                 goto out;
-               }
+             /* ANYLAYOUT or single buffer */
+             desc_data_offset = vui->virtio_net_hdr_sz;
+           }
+         else
+           {
+             /* CSR case without ANYLAYOUT, skip 1st buffer */
+             desc_data_offset = desc_table[desc_current].len;
+           }
 
-             if (PREDICT_TRUE
-                 (desc_table[desc_index].flags & VIRTQ_DESC_F_NEXT))
+         while (1)
+           {
+             /* Get more input if necessary. Or end of packet. */
+             if (desc_data_offset == desc_table[desc_current].len)
                {
-                 CLIB_PREFETCH (&desc_table[desc_table[desc_index].next],
-                                sizeof (vring_desc_t), STORE);
+                 if (PREDICT_FALSE (desc_table[desc_current].flags &
+                                    VIRTQ_DESC_F_NEXT))
+                   {
+                     desc_current = desc_table[desc_current].next;
+                     desc_data_offset = 0;
+                   }
+                 else
+                   {
+                     goto out;
+                   }
                }
 
-             if (desc_table[desc_index].len > offset)
+             /* Get more output if necessary. Or end of packet. */
+             if (PREDICT_FALSE
+                 (b_current->current_length == VLIB_BUFFER_DATA_SIZE))
                {
-                 u16 len = desc_table[desc_index].len - offset;
-                 u16 copied = vlib_buffer_chain_append_data_with_alloc (vm,
-                                                                        VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX,
-                                                                        b_head,
-                                                                        &b_current,
-                                                                        buffer_addr
-                                                                        +
-                                                                        offset,
-                                                                        len);
-                 if (copied != len)
+                 if (PREDICT_FALSE
+                     (vum->cpus[cpu_index].rx_buffers_len == 0))
                    {
-                     error = VHOST_USER_INPUT_FUNC_ERROR_NO_BUFFER;
-                     break;
+                     /*
+                      * Checking if there are some left buffers.
+                      * If not, just rewind the used buffers and stop.
+                      * Note: Scheduled copies are not cancelled. This is
+                      * not an issue as they would still be valid. Useless,
+                      * but valid.
+                      */
+                     vhost_user_input_rewind_buffers (vm,
+                                                      &vum->cpus[cpu_index],
+                                                      b_head);
+                     n_left = 0;
+                     goto stop;
                    }
+
+                 /* Get next output */
+                 vum->cpus[cpu_index].rx_buffers_len--;
+                 u32 bi_next =
+                   (vum->cpus[cpu_index].rx_buffers)[vum->cpus
+                                                     [cpu_index].rx_buffers_len];
+                 b_current->next_buffer = bi_next;
+                 b_current->flags |= VLIB_BUFFER_NEXT_PRESENT;
+                 bi_current = bi_next;
+                 b_current = vlib_get_buffer (vm, bi_current);
                }
-             offset = 0;
 
-             /* if next flag is set, take next desc in the chain */
-             if ((desc_table[desc_index].flags & VIRTQ_DESC_F_NEXT))
-               desc_index = desc_table[desc_index].next;
-             else
-               goto out;
+             /* Prepare a copy order executed later for the data */
+             vhost_copy_t *cpy = &vum->cpus[cpu_index].copy[copy_len];
+             copy_len++;
+             u32 desc_data_l =
+               desc_table[desc_current].len - desc_data_offset;
+             cpy->len = VLIB_BUFFER_DATA_SIZE - b_current->current_length;
+             cpy->len = (cpy->len > desc_data_l) ? desc_data_l : cpy->len;
+             cpy->dst = (uword) vlib_buffer_get_current (b_current);
+             cpy->src = desc_table[desc_current].addr + desc_data_offset;
+
+             desc_data_offset += cpy->len;
+
+             b_current->current_length += cpy->len;
+             b_head->total_length_not_including_first_buffer += cpy->len;
            }
+
        out:
+         CLIB_PREFETCH (&n_left, sizeof (n_left), LOAD);
+
+         n_rx_bytes += b_head->total_length_not_including_first_buffer;
+         n_rx_packets++;
+
+         b_head->total_length_not_including_first_buffer -=
+           b_head->current_length;
 
          /* consume the descriptor and return it as used */
          txvq->last_avail_idx++;
-         txvq->used->ring[txvq->last_used_idx & qsz_mask].id =
-           desc_chain_head;
-         txvq->used->ring[txvq->last_used_idx & qsz_mask].len = 0;
-         vhost_user_log_dirty_ring (vui, txvq,
-                                    ring[txvq->last_used_idx & qsz_mask]);
          txvq->last_used_idx++;
 
-         //It is important to free RX as fast as possible such that the TX
-         //process does not drop packets
-         if ((txvq->last_used_idx & 0x3f) == 0)        // Every 64 packets
-           txvq->used->idx = txvq->last_used_idx;
-
-         if (PREDICT_FALSE (b_head->current_length < 14 &&
-                            error == VHOST_USER_INPUT_FUNC_ERROR_NO_ERROR))
-           error = VHOST_USER_INPUT_FUNC_ERROR_UNDERSIZED_FRAME;
-
          VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b_head);
 
          vnet_buffer (b_head)->sw_if_index[VLIB_RX] = vui->sw_if_index;
          vnet_buffer (b_head)->sw_if_index[VLIB_TX] = (u32) ~ 0;
-         b_head->error = node->errors[error];
+         b_head->error = 0;
 
-         if (PREDICT_FALSE (error))
-           {
-             drops++;
-             next0 = VNET_DEVICE_INPUT_NEXT_DROP;
-           }
-         else
-           {
-             n_rx_bytes +=
-               b_head->current_length +
-               b_head->total_length_not_including_first_buffer;
-             n_rx_packets++;
-             next0 = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
-           }
+         {
+           u32 next0 = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
 
-         to_next[0] = bi_head;
-         to_next++;
-         n_left_to_next--;
+           /* redirect if feature path enabled */
+           vnet_feature_start_device_input_x1 (vui->sw_if_index, &next0,
+                                               b_head, 0);
 
-         /* redirect if feature path enabled */
-         vnet_feature_start_device_input_x1 (vui->sw_if_index, &next0,
-                                             b_head, 0);
+           u32 bi = to_next[-1];       //Cannot use to_next[-1] in the macro
+           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
+                                            to_next, n_left_to_next,
+                                            bi, next0);
+         }
 
-         vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
-                                          to_next, n_left_to_next,
-                                          bi_head, next0);
          n_left--;
-         if (PREDICT_FALSE (!n_left))
+
+         /*
+          * Although separating memory copies from virtio ring parsing
+          * is beneficial, we can offer to perform the copies from time
+          * to time in order to free some space in the ring.
+          */
+         if (PREDICT_FALSE (copy_len >= VHOST_USER_RX_COPY_THRESHOLD))
            {
-             // I NEED SOME MORE !
-             u32 remain = (u16) (txvq->avail->idx - txvq->last_avail_idx);
-             remain = (remain > VLIB_FRAME_SIZE - n_rx_packets) ?
-               VLIB_FRAME_SIZE - n_rx_packets : remain;
-             remain = (remain > rx_len) ? rx_len : remain;
-             n_left = remain;
+             if (PREDICT_FALSE
+                 (vhost_user_input_copy (vui, vum->cpus[cpu_index].copy,
+                                         copy_len, &map_hint)))
+               {
+                 clib_warning
+                   ("Memory mapping error on interface hw_if_index=%d "
+                    "(Shutting down - Switch interface down and up to restart)",
+                    vui->hw_if_index);
+                 vui->admin_up = 0;
+                 copy_len = 0;
+                 break;
+               }
+             copy_len = 0;
+
+             /* give buffers back to driver */
+             CLIB_MEMORY_BARRIER ();
+             txvq->used->idx = txvq->last_used_idx;
+             vhost_user_log_dirty_ring (vui, txvq, idx);
            }
        }
-
+    stop:
       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
     }
 
-  if (PREDICT_TRUE (vum->rx_buffers[cpu_index] != 0))
-    _vec_len (vum->rx_buffers[cpu_index]) = rx_len;
+  /* Do the memory copies */
+  if (PREDICT_FALSE
+      (vhost_user_input_copy (vui, vum->cpus[cpu_index].copy,
+                             copy_len, &map_hint)))
+    {
+      clib_warning ("Memory mapping error on interface hw_if_index=%d "
+                   "(Shutting down - Switch interface down and up to restart)",
+                   vui->hw_if_index);
+      vui->admin_up = 0;
+    }
 
   /* give buffers back to driver */
   CLIB_MEMORY_BARRIER ();
@@ -1646,14 +1808,6 @@ vhost_user_if_input (vlib_main_t * vm,
        vhost_user_send_call (vm, txvq);
     }
 
-  if (PREDICT_FALSE (drops))
-    {
-      vlib_increment_simple_counter
-       (vnet_main.interface_main.sw_if_counters
-        + VNET_INTERFACE_COUNTER_DROP, os_get_cpu_number (),
-        vui->sw_if_index, drops);
-    }
-
   /* increase rx counters */
   vlib_increment_combined_counter
     (vnet_main.interface_main.combined_sw_if_counters
@@ -1680,8 +1834,6 @@ vhost_user_input (vlib_main_t * vm,
     n_rx_packets += vhost_user_if_input (vm, vum, vui, vhiq->qid, node);
   }
 
-  //TODO: One call might return more than 256 packets here.
-  //But this is supposed to be the vector size.
   return n_rx_packets;
 }
 
@@ -1726,7 +1878,7 @@ vhost_user_tx_trace (vhost_trace_t * t,
   if (rxvq->desc[desc_current].flags & VIRTQ_DESC_F_INDIRECT)
     {
       t->virtio_ring_flags |= 1 << VIRTIO_TRACE_F_INDIRECT;
-      //Header is the first here
+      /* Header is the first here */
       hdr_desc = map_guest_mem (vui, rxvq->desc[desc_current].addr, &hint);
     }
   if (rxvq->desc[desc_current].flags & VIRTQ_DESC_F_NEXT)
@@ -1742,290 +1894,348 @@ vhost_user_tx_trace (vhost_trace_t * t,
   t->first_desc_len = hdr_desc ? hdr_desc->len : 0;
 }
 
+static_always_inline u32
+vhost_user_tx_copy (vhost_user_intf_t * vui, vhost_copy_t * cpy,
+                   u16 copy_len, u32 * map_hint)
+{
+  void *dst0, *dst1, *dst2, *dst3;
+  if (PREDICT_TRUE (copy_len >= 4))
+    {
+      if (PREDICT_FALSE (!(dst2 = map_guest_mem (vui, cpy[0].dst, map_hint))))
+       return 1;
+      if (PREDICT_FALSE (!(dst3 = map_guest_mem (vui, cpy[1].dst, map_hint))))
+       return 1;
+      while (PREDICT_TRUE (copy_len >= 4))
+       {
+         dst0 = dst2;
+         dst1 = dst3;
+
+         if (PREDICT_FALSE
+             (!(dst2 = map_guest_mem (vui, cpy[2].dst, map_hint))))
+           return 1;
+         if (PREDICT_FALSE
+             (!(dst3 = map_guest_mem (vui, cpy[3].dst, map_hint))))
+           return 1;
+
+         CLIB_PREFETCH ((void *) cpy[2].src, 64, LOAD);
+         CLIB_PREFETCH ((void *) cpy[3].src, 64, LOAD);
+
+         clib_memcpy (dst0, (void *) cpy[0].src, cpy[0].len);
+         clib_memcpy (dst1, (void *) cpy[1].src, cpy[1].len);
+
+         vhost_user_log_dirty_pages_2 (vui, cpy[0].dst, cpy[0].len, 1);
+         vhost_user_log_dirty_pages_2 (vui, cpy[1].dst, cpy[1].len, 1);
+         copy_len -= 2;
+         cpy += 2;
+       }
+    }
+  while (copy_len)
+    {
+      if (PREDICT_FALSE (!(dst0 = map_guest_mem (vui, cpy->dst, map_hint))))
+       return 1;
+      clib_memcpy (dst0, (void *) cpy->src, cpy->len);
+      vhost_user_log_dirty_pages_2 (vui, cpy->dst, cpy->len, 1);
+      copy_len -= 1;
+      cpy += 1;
+    }
+  return 0;
+}
+
+
 static uword
 vhost_user_tx (vlib_main_t * vm,
               vlib_node_runtime_t * node, vlib_frame_t * frame)
 {
   u32 *buffers = vlib_frame_args (frame);
-  u32 n_left = 0;
+  u32 n_left = frame->n_vectors;
   vhost_user_main_t *vum = &vhost_user_main;
-  uword n_packets = 0;
   vnet_interface_output_runtime_t *rd = (void *) node->runtime_data;
   vhost_user_intf_t *vui =
     pool_elt_at_index (vum->vhost_user_interfaces, rd->dev_instance);
   u32 qid = ~0;
   vhost_user_vring_t *rxvq;
   u16 qsz_mask;
-  u8 error = VHOST_USER_TX_FUNC_ERROR_NONE;
+  u8 error;
   u32 cpu_index = os_get_cpu_number ();
-  n_left = n_packets = frame->n_vectors;
-  u32 map_guest_hint_desc = 0;
-  u32 map_guest_hint_indirect = 0;
-  u32 *map_guest_hint_p = &map_guest_hint_desc;
-  vhost_trace_t *current_trace = 0;
-  int n_retry;
+  u32 map_hint = 0;
+  u8 retry = 8;
+  u16 copy_len;
+  u16 tx_headers_len;
+
+  if (PREDICT_FALSE (!vui->admin_up))
+    {
+      error = VHOST_USER_TX_FUNC_ERROR_DOWN;
+      goto done3;
+    }
 
-  if (PREDICT_FALSE (!vui->is_up || !vui->admin_up))
+  if (PREDICT_FALSE (!vui->is_up))
     {
       error = VHOST_USER_TX_FUNC_ERROR_NOT_READY;
       goto done3;
     }
 
   qid =
-    VHOST_VRING_IDX_RX (*vec_elt_at_index (vui->per_cpu_tx_qid, cpu_index));
+    VHOST_VRING_IDX_RX (*vec_elt_at_index
+                       (vui->per_cpu_tx_qid, os_get_cpu_number ()));
   rxvq = &vui->vrings[qid];
   if (PREDICT_FALSE (vui->use_tx_spinlock))
     vhost_user_vring_lock (vui, qid);
 
-  if (PREDICT_FALSE ((rxvq->avail->idx == rxvq->last_avail_idx)))
-    {
-      error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF;
-      goto done2;
-    }
-
   qsz_mask = rxvq->qsz - 1;    /* qsz is always power of 2 */
-  n_retry = 8;
 
-  while (n_left > 0 && n_retry--)
+retry:
+  error = VHOST_USER_TX_FUNC_ERROR_NONE;
+  tx_headers_len = 0;
+  copy_len = 0;
+  while (n_left > 0)
     {
+      vlib_buffer_t *b0, *current_b0;
+      u16 desc_head, desc_index, desc_len;
+      vring_desc_t *desc_table;
+      uword buffer_map_addr;
+      u32 buffer_len;
+      u16 bytes_left;
 
-      while (n_left > 0)
+      if (PREDICT_TRUE (n_left > 1))
+       vlib_prefetch_buffer_with_index (vm, buffers[1], LOAD);
+
+      b0 = vlib_get_buffer (vm, buffers[0]);
+
+      if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
        {
-         vlib_buffer_t *b0, *current_b0;
-         u16 desc_head, desc_index, desc_len;
-         vring_desc_t *desc_table;
-         void *buffer_addr;
-         u32 buffer_len;
+         vum->cpus[cpu_index].current_trace =
+           vlib_add_trace (vm, node, b0,
+                           sizeof (*vum->cpus[cpu_index].current_trace));
+         vhost_user_tx_trace (vum->cpus[cpu_index].current_trace,
+                              vui, qid / 2, b0, rxvq);
+       }
 
-         b0 = vlib_get_buffer (vm, buffers[0]);
+      if (PREDICT_FALSE (rxvq->last_avail_idx == rxvq->avail->idx))
+       {
+         error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF;
+         goto done;
+       }
 
-         if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
-           {
-             current_trace = vlib_add_trace (vm, node, b0,
-                                             sizeof (*current_trace));
-             vhost_user_tx_trace (current_trace, vui, qid / 2, b0, rxvq);
-           }
+      desc_table = rxvq->desc;
+      desc_head = desc_index =
+       rxvq->avail->ring[rxvq->last_avail_idx & qsz_mask];
 
-         if (PREDICT_FALSE (rxvq->last_avail_idx == rxvq->avail->idx))
+      /* Go deeper in case of indirect descriptor
+       * I don't know of any driver providing indirect for RX. */
+      if (PREDICT_FALSE (rxvq->desc[desc_head].flags & VIRTQ_DESC_F_INDIRECT))
+       {
+         if (PREDICT_FALSE
+             (rxvq->desc[desc_head].len < sizeof (vring_desc_t)))
            {
-             error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF;
+             error = VHOST_USER_TX_FUNC_ERROR_INDIRECT_OVERFLOW;
              goto done;
            }
-
-         desc_table = rxvq->desc;
-         map_guest_hint_p = &map_guest_hint_desc;
-         desc_head = desc_index =
-           rxvq->avail->ring[rxvq->last_avail_idx & qsz_mask];
-         if (rxvq->desc[desc_head].flags & VIRTQ_DESC_F_INDIRECT)
-           {
-             if (PREDICT_FALSE
-                 (rxvq->desc[desc_head].len < sizeof (vring_desc_t)))
-               {
-                 error = VHOST_USER_TX_FUNC_ERROR_INDIRECT_OVERFLOW;
-                 goto done;
-               }
-             if (PREDICT_FALSE
-                 (!(desc_table =
-                    map_guest_mem (vui, rxvq->desc[desc_index].addr,
-                                   &map_guest_hint_desc))))
-               {
-                 error = VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL;
-                 goto done;
-               }
-             desc_index = 0;
-             map_guest_hint_p = &map_guest_hint_indirect;
-           }
-
-         desc_len = vui->virtio_net_hdr_sz;
-
          if (PREDICT_FALSE
-             (!(buffer_addr =
-                map_guest_mem (vui, desc_table[desc_index].addr,
-                               map_guest_hint_p))))
+             (!(desc_table =
+                map_guest_mem (vui, rxvq->desc[desc_index].addr,
+                               &map_hint))))
            {
              error = VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL;
              goto done;
            }
-         buffer_len = desc_table[desc_index].len;
-
-         CLIB_PREFETCH (buffer_addr, CLIB_CACHE_LINE_BYTES, STORE);
+         desc_index = 0;
+       }
 
-         virtio_net_hdr_mrg_rxbuf_t *hdr =
-           (virtio_net_hdr_mrg_rxbuf_t *) buffer_addr;
-         hdr->hdr.flags = 0;
-         hdr->hdr.gso_type = 0;
-         if (vui->virtio_net_hdr_sz == 12)
-           hdr->num_buffers = 1;
+      desc_len = vui->virtio_net_hdr_sz;
+      buffer_map_addr = desc_table[desc_index].addr;
+      buffer_len = desc_table[desc_index].len;
 
-         vhost_user_log_dirty_pages (vui, desc_table[desc_index].addr,
-                                     vui->virtio_net_hdr_sz);
+      {
+       // Get a header from the header array
+       virtio_net_hdr_mrg_rxbuf_t *hdr =
+         &vum->cpus[cpu_index].tx_headers[tx_headers_len];
+       tx_headers_len++;
+       hdr->hdr.flags = 0;
+       hdr->hdr.gso_type = 0;
+       hdr->num_buffers = 1;   //This is local, no need to check
+
+       // Prepare a copy order executed later for the header
+       vhost_copy_t *cpy = &vum->cpus[cpu_index].copy[copy_len];
+       copy_len++;
+       cpy->len = vui->virtio_net_hdr_sz;
+       cpy->dst = buffer_map_addr;
+       cpy->src = (uword) hdr;
+      }
 
-         u16 bytes_left = b0->current_length;
-         buffer_addr += vui->virtio_net_hdr_sz;
-         buffer_len -= vui->virtio_net_hdr_sz;
-         current_b0 = b0;
-         while (1)
-           {
-             if (!bytes_left)
-               {               //Get new input
-                 if (current_b0->flags & VLIB_BUFFER_NEXT_PRESENT)
-                   {
-                     current_b0 =
-                       vlib_get_buffer (vm, current_b0->next_buffer);
-                     bytes_left = current_b0->current_length;
-                   }
-                 else
-                   {
-                     //End of packet
-                     break;
-                   }
+      buffer_map_addr += vui->virtio_net_hdr_sz;
+      buffer_len -= vui->virtio_net_hdr_sz;
+      bytes_left = b0->current_length;
+      current_b0 = b0;
+      while (1)
+       {
+         if (buffer_len == 0)
+           {                   //Get new output
+             if (desc_table[desc_index].flags & VIRTQ_DESC_F_NEXT)
+               {
+                 //Next one is chained
+                 desc_index = desc_table[desc_index].next;
+                 buffer_map_addr = desc_table[desc_index].addr;
+                 buffer_len = desc_table[desc_index].len;
                }
-
-             if (buffer_len == 0)
-               {               //Get new output
-                 if (desc_table[desc_index].flags & VIRTQ_DESC_F_NEXT)
+             else if (vui->virtio_net_hdr_sz == 12)    //MRG is available
+               {
+                 virtio_net_hdr_mrg_rxbuf_t *hdr =
+                   &vum->cpus[cpu_index].tx_headers[tx_headers_len - 1];
+
+                 //Move from available to used buffer
+                 rxvq->used->ring[rxvq->last_used_idx & qsz_mask].id =
+                   desc_head;
+                 rxvq->used->ring[rxvq->last_used_idx & qsz_mask].len =
+                   desc_len;
+                 vhost_user_log_dirty_ring (vui, rxvq,
+                                            ring[rxvq->last_used_idx &
+                                                 qsz_mask]);
+
+                 rxvq->last_avail_idx++;
+                 rxvq->last_used_idx++;
+                 hdr->num_buffers++;
+                 desc_len = 0;
+
+                 if (PREDICT_FALSE
+                     (rxvq->last_avail_idx == rxvq->avail->idx))
                    {
-                     //Next one is chained
-                     desc_index = desc_table[desc_index].next;
-                     if (PREDICT_FALSE
-                         (!(buffer_addr =
-                            map_guest_mem (vui, desc_table[desc_index].addr,
-                                           map_guest_hint_p))))
-                       {
-                         rxvq->last_used_idx -= hdr->num_buffers - 1;
-                         rxvq->last_avail_idx -= hdr->num_buffers - 1;
-                         error = VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL;
-                         goto done;
-                       }
-                     buffer_len = desc_table[desc_index].len;
+                     //Dequeue queued descriptors for this packet
+                     rxvq->last_used_idx -= hdr->num_buffers - 1;
+                     rxvq->last_avail_idx -= hdr->num_buffers - 1;
+                     error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF;
+                     goto done;
                    }
-                 else if (vui->virtio_net_hdr_sz == 12)        //MRG is available
-                   {
-                     //Move from available to used buffer
-                     rxvq->used->ring[rxvq->last_used_idx & qsz_mask].id =
-                       desc_head;
-                     rxvq->used->ring[rxvq->last_used_idx & qsz_mask].len =
-                       desc_len;
-                     vhost_user_log_dirty_ring (vui, rxvq,
-                                                ring[rxvq->last_used_idx &
-                                                     qsz_mask]);
-                     rxvq->last_avail_idx++;
-                     rxvq->last_used_idx++;
-                     hdr->num_buffers++;
 
+                 desc_table = rxvq->desc;
+                 desc_head = desc_index =
+                   rxvq->avail->ring[rxvq->last_avail_idx & qsz_mask];
+                 if (PREDICT_FALSE
+                     (rxvq->desc[desc_head].flags & VIRTQ_DESC_F_INDIRECT))
+                   {
+                     //It is seriously unlikely that a driver will put indirect descriptor
+                     //after non-indirect descriptor.
                      if (PREDICT_FALSE
-                         (rxvq->last_avail_idx == rxvq->avail->idx))
+                         (rxvq->desc[desc_head].len < sizeof (vring_desc_t)))
                        {
-                         //Dequeue queued descriptors for this packet
-                         rxvq->last_used_idx -= hdr->num_buffers - 1;
-                         rxvq->last_avail_idx -= hdr->num_buffers - 1;
-                         error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF;
+                         error = VHOST_USER_TX_FUNC_ERROR_INDIRECT_OVERFLOW;
                          goto done;
                        }
-
-                     desc_table = rxvq->desc;
-                     map_guest_hint_p = &map_guest_hint_desc;
-                     desc_head = desc_index =
-                       rxvq->avail->ring[rxvq->last_avail_idx & qsz_mask];
-                     if (PREDICT_FALSE
-                         (rxvq->
-                          desc[desc_head].flags & VIRTQ_DESC_F_INDIRECT))
-                       {
-                         //It is seriously unlikely that a driver will put indirect descriptor
-                         //after non-indirect descriptor.
-                         if (PREDICT_FALSE
-                             (rxvq->desc[desc_head].len <
-                              sizeof (vring_desc_t)))
-                           {
-                             error =
-                               VHOST_USER_TX_FUNC_ERROR_INDIRECT_OVERFLOW;
-                             goto done;
-                           }
-                         if (PREDICT_FALSE
-                             (!(desc_table =
-                                map_guest_mem (vui,
-                                               rxvq->desc[desc_index].addr,
-                                               &map_guest_hint_desc))))
-                           {
-                             error = VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL;
-                             goto done;
-                           }
-                         desc_index = 0;
-                         map_guest_hint_p = &map_guest_hint_indirect;
-                       }
-
                      if (PREDICT_FALSE
-                         (!(buffer_addr =
-                            map_guest_mem (vui, desc_table[desc_index].addr,
-                                           map_guest_hint_p))))
+                         (!(desc_table =
+                            map_guest_mem (vui,
+                                           rxvq->desc[desc_index].addr,
+                                           &map_hint))))
                        {
                          error = VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL;
                          goto done;
                        }
-                     buffer_len = desc_table[desc_index].len;
-                     CLIB_PREFETCH (buffer_addr, CLIB_CACHE_LINE_BYTES,
-                                    STORE);
-                   }
-                 else
-                   {
-                     error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOMRG;
-                     goto done;
+                     desc_index = 0;
                    }
+                 buffer_map_addr = desc_table[desc_index].addr;
+                 buffer_len = desc_table[desc_index].len;
+               }
+             else
+               {
+                 error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOMRG;
+                 goto done;
                }
-
-             u16 bytes_to_copy = bytes_left;
-             bytes_to_copy =
-               (bytes_to_copy > buffer_len) ? buffer_len : bytes_to_copy;
-             clib_memcpy (buffer_addr,
-                          vlib_buffer_get_current (current_b0) +
-                          current_b0->current_length - bytes_left,
-                          bytes_to_copy);
-
-             vhost_user_log_dirty_pages (vui,
-                                         desc_table[desc_index].addr +
-                                         desc_table[desc_index].len -
-                                         bytes_left - bytes_to_copy,
-                                         bytes_to_copy);
-
-             CLIB_PREFETCH (rxvq, sizeof (*rxvq), STORE);
-             bytes_left -= bytes_to_copy;
-             buffer_len -= bytes_to_copy;
-             buffer_addr += bytes_to_copy;
-             desc_len += bytes_to_copy;
            }
 
-         //Move from available to used ring
-         rxvq->used->ring[rxvq->last_used_idx & qsz_mask].id = desc_head;
-         rxvq->used->ring[rxvq->last_used_idx & qsz_mask].len = desc_len;
-         vhost_user_log_dirty_ring (vui, rxvq,
-                                    ring[rxvq->last_used_idx & qsz_mask]);
+         {
+           vhost_copy_t *cpy = &vum->cpus[cpu_index].copy[copy_len];
+           copy_len++;
+           cpy->len = bytes_left;
+           cpy->len = (cpy->len > buffer_len) ? buffer_len : cpy->len;
+           cpy->dst = buffer_map_addr;
+           cpy->src = (uword) vlib_buffer_get_current (current_b0) +
+             current_b0->current_length - bytes_left;
+
+           bytes_left -= cpy->len;
+           buffer_len -= cpy->len;
+           buffer_map_addr += cpy->len;
+           desc_len += cpy->len;
+
+           CLIB_PREFETCH (&rxvq->desc, sizeof (&rxvq->desc), LOAD);
+         }
 
-         rxvq->last_avail_idx++;
-         rxvq->last_used_idx++;
+         // Check if vlib buffer has more data. If not, get more or break.
+         if (PREDICT_TRUE (!bytes_left))
+           {
+             if (PREDICT_FALSE
+                 (current_b0->flags & VLIB_BUFFER_NEXT_PRESENT))
+               {
+                 current_b0 = vlib_get_buffer (vm, current_b0->next_buffer);
+                 bytes_left = current_b0->current_length;
+               }
+             else
+               {
+                 //End of packet
+                 break;
+               }
+           }
+       }
 
-         if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
-           current_trace->hdr = *hdr;
+      //Move from available to used ring
+      rxvq->used->ring[rxvq->last_used_idx & qsz_mask].id = desc_head;
+      rxvq->used->ring[rxvq->last_used_idx & qsz_mask].len = desc_len;
+      vhost_user_log_dirty_ring (vui, rxvq,
+                                ring[rxvq->last_used_idx & qsz_mask]);
+      rxvq->last_avail_idx++;
+      rxvq->last_used_idx++;
 
-         buffers++;
-         n_left--;             //At the end for error counting when 'goto done' is invoked
+      if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
+       {
+         vum->cpus[cpu_index].current_trace->hdr =
+           vum->cpus[cpu_index].tx_headers[tx_headers_len - 1];
        }
 
-    done:
-      CLIB_MEMORY_BARRIER ();
-      rxvq->used->idx = rxvq->last_used_idx;
-      vhost_user_log_dirty_ring (vui, rxvq, idx);
+      n_left--;                        //At the end for error counting when 'goto done' is invoked
+      buffers++;
+    }
+
+done:
+  //Do the memory copies
+  if (PREDICT_FALSE
+      (vhost_user_tx_copy (vui, vum->cpus[cpu_index].copy,
+                          copy_len, &map_hint)))
+    {
+      clib_warning ("Memory mapping error on interface hw_if_index=%d "
+                   "(Shutting down - Switch interface down and up to restart)",
+                   vui->hw_if_index);
+      vui->admin_up = 0;
+    }
+
+  CLIB_MEMORY_BARRIER ();
+  rxvq->used->idx = rxvq->last_used_idx;
+  vhost_user_log_dirty_ring (vui, rxvq, idx);
+
+  /*
+   * When n_left is set, error is always set to something too.
+   * In case error is due to lack of remaining buffers, we go back up and
+   * retry.
+   * The idea is that it is better to waste some time on packets
+   * that have been processed already than dropping them and get
+   * more fresh packets with a good likelyhood that they will be dropped too.
+   * This technique also gives more time to VM driver to pick-up packets.
+   * In case the traffic flows from physical to virtual interfaces, this
+   * technique will end-up leveraging the physical NIC buffer in order to
+   * absorb the VM's CPU jitter.
+   */
+  if (n_left && (error == VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF) && retry)
+    {
+      retry--;
+      goto retry;
     }
 
   /* interrupt (call) handling */
   if ((rxvq->callfd_idx != ~0) && !(rxvq->avail->flags & 1))
     {
-      rxvq->n_since_last_int += n_packets - n_left;
+      rxvq->n_since_last_int += frame->n_vectors - n_left;
 
       if (rxvq->n_since_last_int > vum->coalesce_frames)
        vhost_user_send_call (vm, rxvq);
     }
 
-done2:
   vhost_user_vring_unlock (vui, qid);
 
 done3:
index ab0128a..3083b61 100644 (file)
@@ -257,14 +257,42 @@ typedef struct
   u16 qid;
 } vhost_iface_and_queue_t;
 
+typedef struct
+{
+  uword dst;
+  uword src;
+  u32 len;
+} vhost_copy_t;
+
+typedef struct
+{
+  u16 qid; /** The interface queue index (Not the virtio vring idx) */
+  u16 device_index; /** The device index */
+  u32 virtio_ring_flags; /** Runtime queue flags  **/
+  u16 first_desc_len; /** Length of the first data descriptor **/
+  virtio_net_hdr_mrg_rxbuf_t hdr; /** Virtio header **/
+} vhost_trace_t;
+
+
+#define VHOST_USER_RX_BUFFERS_N (2 * VLIB_FRAME_SIZE + 2)
+#define VHOST_USER_COPY_ARRAY_N (4 * VLIB_FRAME_SIZE)
+
 typedef struct
 {
   vhost_iface_and_queue_t *rx_queues;
+  u32 rx_buffers_len;
+  u32 rx_buffers[VHOST_USER_RX_BUFFERS_N];
+
+  virtio_net_hdr_mrg_rxbuf_t tx_headers[VLIB_FRAME_SIZE];
+  vhost_copy_t copy[VHOST_USER_COPY_ARRAY_N];
+
+  /* This is here so it doesn't end-up
+   * using stack or registers. */
+  vhost_trace_t *current_trace;
 } vhost_cpu_t;
 
 typedef struct
 {
-  u32 **rx_buffers;
   u32 mtu_bytes;
   vhost_user_intf_t *vhost_user_interfaces;
   u32 *show_dev_instance_by_real_dev_instance;