Add jumbo frames support to non-dpdk vhost interfaces. 01/301/4
authorPierre Pfister <ppfister@cisco.com>
Fri, 12 Feb 2016 13:18:42 +0000 (13:18 +0000)
committerGerrit Code Review <gerrit@fd.io>
Thu, 18 Feb 2016 00:50:55 +0000 (00:50 +0000)
This code provided inter-VM (2 cores per VM) throughput of
22Gbps using iperf through VPP (1 core) with 9k frames.
With the same setup and pktgen running on both sides, it
reached 5Mpps with no packets drop (Equivalent to before the patch).
During the tests the average vector length was about 1, which
likely means that VPP is not the bottleneck.

The patch also includes some generic functions for vlib buffers
allowing for chained buffer construction whether or not DPDK is enabled.

Change-Id: Icfd1803e84b2b4578f305ab730576211f6242d6a
Signed-off-by: Pierre Pfister <ppfister@cisco.com>
vlib/vlib/buffer.c
vlib/vlib/buffer_funcs.h
vlib/vlib/dpdk_buffer.c
vnet/vnet/devices/virtio/vhost-user.c

index 4463f7f..332b430 100644 (file)
@@ -1220,6 +1220,34 @@ u32 vlib_buffer_add_data (vlib_main_t * vm,
   return bi;
 }
 
+u16
+vlib_buffer_chain_append_data_with_alloc(vlib_main_t *vm,
+                             u32 free_list_index,
+                             vlib_buffer_t *first,
+                             vlib_buffer_t **last,
+                             void * data, u16 data_len) {
+  vlib_buffer_t *l = *last;
+  u32 n_buffer_bytes = vlib_buffer_free_list_buffer_size (vm, free_list_index);
+  u16 copied = 0;
+  ASSERT(n_buffer_bytes >= l->current_length + l->current_data);
+  while (data_len) {
+    u16 max = n_buffer_bytes - l->current_length - l->current_data;
+    if (max == 0) {
+      if (1 != vlib_buffer_alloc_from_free_list (vm, &l->next_buffer, 1, free_list_index))
+        return copied;
+      *last = l = vlib_buffer_chain_buffer(vm, first, l, l->next_buffer);
+      max = n_buffer_bytes - l->current_length - l->current_data;
+    }
+
+    u16 len = (data_len > max)?max:data_len;
+    memcpy(vlib_buffer_get_current (l) + l->current_length, data + copied, len);
+    vlib_buffer_chain_increase_length(first, l, len);
+    data_len -= len;
+    copied += len;
+  }
+  return copied;
+}
+
 static void vlib_serialize_tx (serialize_main_header_t * m, serialize_stream_t * s)
 {
   vlib_main_t * vm;
index 452cdcb..2ea506a 100644 (file)
 
 #include <vppinfra/hash.h>
 
+#if DPDK == 1
+#undef always_inline // dpdk and clib use conflicting always_inline macros.
+#include <rte_config.h>
+#include <rte_mbuf.h>
+#include <rte_memcpy.h>
+
+#if CLIB_DEBUG > 0
+#define always_inline static inline
+#else
+#define always_inline static inline __attribute__ ((__always_inline__))
+#endif
+#endif
+
 /** \file
     vlib buffer access methods.
 */
@@ -398,6 +411,102 @@ u32 vlib_buffer_add_data (vlib_main_t * vm,
                          u32 buffer_index,
                          void * data, u32 n_data_bytes);
 
+/*
+ * vlib_buffer_chain_* functions provide a way to create long buffers.
+ * When DPDK is enabled, the 'hidden' DPDK header is taken care of transparently.
+ */
+
+/* Initializes the buffer as an empty packet with no chained buffers. */
+always_inline void
+vlib_buffer_chain_init(vlib_buffer_t *first)
+{
+  first->total_length_not_including_first_buffer = 0;
+  first->current_length = 0;
+  first->flags &= ~VLIB_BUFFER_NEXT_PRESENT;
+  first->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
+#if DPDK == 1
+  (((struct rte_mbuf *) first) - 1)->nb_segs = 1;
+  (((struct rte_mbuf *) first) - 1)->next = 0;
+  (((struct rte_mbuf *) first) - 1)->pkt_len = 0;
+  (((struct rte_mbuf *) first) - 1)->data_len = 0;
+  (((struct rte_mbuf *) first) - 1)->data_off = RTE_PKTMBUF_HEADROOM + first->current_data;
+#endif
+}
+
+/* The provided next_bi buffer index is appended to the end of the packet. */
+always_inline vlib_buffer_t *
+vlib_buffer_chain_buffer(vlib_main_t *vm,
+                    vlib_buffer_t *first,
+                    vlib_buffer_t *last,
+                    u32 next_bi)
+{
+  vlib_buffer_t *next_buffer = vlib_get_buffer(vm, next_bi);
+  last->next_buffer = next_bi;
+  last->flags |= VLIB_BUFFER_NEXT_PRESENT;
+  next_buffer->current_length = 0;
+  next_buffer->flags &= ~VLIB_BUFFER_NEXT_PRESENT;
+#if DPDK == 1
+  (((struct rte_mbuf *) first) - 1)->nb_segs++;
+  (((struct rte_mbuf *) last) - 1)->next = (((struct rte_mbuf *) next_buffer) - 1);
+  (((struct rte_mbuf *) next_buffer) - 1)->data_len = 0;
+  (((struct rte_mbuf *) next_buffer) - 1)->data_off = RTE_PKTMBUF_HEADROOM + next_buffer->current_data;
+  (((struct rte_mbuf *) next_buffer) - 1)->next = 0;
+#endif
+  return next_buffer;
+}
+
+/* Increases or decreases the packet length.
+ * It does not allocate or deallocate new buffers.
+ * Therefore, the added length must be compatible
+ * with the last buffer. */
+always_inline void
+vlib_buffer_chain_increase_length(vlib_buffer_t *first,
+                             vlib_buffer_t *last,
+                             i32 len)
+{
+  last->current_length += len;
+  if (first != last)
+    first->total_length_not_including_first_buffer += len;
+#if DPDK == 1
+  (((struct rte_mbuf *) first) - 1)->pkt_len += len;
+  (((struct rte_mbuf *) last) - 1)->data_len += len;
+#endif
+}
+
+/* Copy data to the end of the packet and increases its length.
+ * It does not allocate new buffers.
+ * Returns the number of copied bytes. */
+always_inline u16
+vlib_buffer_chain_append_data(vlib_main_t *vm,
+                             u32 free_list_index,
+                             vlib_buffer_t *first,
+                             vlib_buffer_t *last,
+                             void *data, u16 data_len)
+{
+  u32 n_buffer_bytes = vlib_buffer_free_list_buffer_size (vm, free_list_index);
+  ASSERT(n_buffer_bytes >= last->current_length + last->current_data);
+  u16 len = clib_min(data_len, n_buffer_bytes - last->current_length - last->current_data);
+#if DPDK == 1
+  rte_memcpy(vlib_buffer_get_current (last) + last->current_length, data, len);
+#else
+  memcpy(vlib_buffer_get_current (last) + last->current_length, data, len);
+#endif
+  vlib_buffer_chain_increase_length(first, last, len);
+  return len;
+}
+
+/* Copy data to the end of the packet and increases its length.
+ * Allocates additional buffers from the free list if necessary.
+ * Returns the number of copied bytes.
+ * 'last' value is modified whenever new buffers are allocated and
+ * chained and points to the last buffer in the chain. */
+u16
+vlib_buffer_chain_append_data_with_alloc(vlib_main_t *vm,
+                             u32 free_list_index,
+                             vlib_buffer_t *first,
+                             vlib_buffer_t **last,
+                             void * data, u16 data_len);
+
 format_function_t format_vlib_buffer, format_vlib_buffer_and_data, format_vlib_buffer_contents;
 
 typedef struct {
index 9db84a1..145720d 100644 (file)
@@ -882,6 +882,34 @@ u32 vlib_buffer_add_data (vlib_main_t * vm,
   return bi;
 }
 
+u16
+vlib_buffer_chain_append_data_with_alloc(vlib_main_t *vm,
+                             u32 free_list_index,
+                             vlib_buffer_t *first,
+                             vlib_buffer_t **last,
+                             void * data, u16 data_len) {
+  vlib_buffer_t *l = *last;
+  u32 n_buffer_bytes = vlib_buffer_free_list_buffer_size (vm, free_list_index);
+  u16 copied = 0;
+  ASSERT(n_buffer_bytes >= l->current_length + l->current_data);
+  while (data_len) {
+    u16 max = n_buffer_bytes - l->current_length - l->current_data;
+    if (max == 0) {
+      if (1 != vlib_buffer_alloc_from_free_list (vm, &l->next_buffer, 1, free_list_index))
+        return copied;
+      *last = l = vlib_buffer_chain_buffer(vm, first, l, l->next_buffer);
+      max = n_buffer_bytes - l->current_length - l->current_data;
+    }
+
+    u16 len = (data_len > max)?max:data_len;
+    rte_memcpy(vlib_buffer_get_current (l) + l->current_length, data + copied, len);
+    vlib_buffer_chain_increase_length(first, l, len);
+    data_len -= len;
+    copied += len;
+  }
+  return copied;
+}
+
 clib_error_t *
 vlib_buffer_pool_create(vlib_main_t * vm, unsigned num_mbufs,
                         unsigned mbuf_size, unsigned socket_id)
index 8dca33b..180393d 100644 (file)
@@ -61,6 +61,8 @@
 vlib_node_registration_t vhost_user_input_node;
 
 #define foreach_vhost_user_tx_func_error      \
+  _(NONE, "no error")  \
+  _(NOT_READY, "vhost user state error")  \
   _(PKT_DROP_NOBUF, "tx packet drops (no available descriptors)")  \
   _(MMAP_FAIL, "mmap failure")
 
@@ -79,6 +81,8 @@ static char * vhost_user_tx_func_error_strings[] = {
 
 #define foreach_vhost_user_input_func_error      \
   _(NO_ERROR, "no error")  \
+  _(NO_BUFFER, "no available buffer")  \
+  _(MMAP_FAIL, "mmap failure")  \
   _(UNDERSIZED_FRAME, "undersized ethernet frame received (< 14 bytes)")
 
 typedef enum {
@@ -753,25 +757,20 @@ static u32 vhost_user_if_input ( vlib_main_t * vm,
 {
   vhost_user_vring_t * txvq = &vui->vrings[VHOST_NET_VRING_IDX_TX];
   vhost_user_vring_t * rxvq = &vui->vrings[VHOST_NET_VRING_IDX_RX];
-  uword n_rx_packets = 0;
+  uword n_rx_packets = 0, n_rx_bytes = 0;
   uword n_left;
-  u32 bi;
   u32 n_left_to_next, * to_next;
-  u32 next_index = VHOST_USER_RX_NEXT_ETHERNET_INPUT;
-  uword n_rx_bytes = 0;
+  u32 next_index = 0;
+  u32 next0;
   uword n_trace = vlib_get_trace_count (vm, node);
   u16 qsz_mask;
+  u32 cpu_index, rx_len, drops, flush;
   f64 now = vlib_time_now (vm);
-  u32 cpu_index;
 
   vec_reset_length (vui->d_trace_buffers);
-  u32 free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
 
   /* no descriptor ptr - bail out */
-  if (PREDICT_FALSE(!txvq->desc))
-    return 0;
-
-  if (PREDICT_FALSE(!txvq->avail))
+  if (PREDICT_FALSE(!txvq->desc || !txvq->avail))
     return 0;
 
   /* do we have pending intterupts ? */
@@ -790,98 +789,118 @@ static u32 vhost_user_if_input ( vlib_main_t * vm,
   if (txvq->avail->idx == txvq->last_avail_idx)
     return 0;
 
-  cpu_index = os_get_cpu_number();
-
   if (PREDICT_TRUE(txvq->avail->idx > txvq->last_avail_idx))
     n_left = txvq->avail->idx - txvq->last_avail_idx;
   else /* wrapped */
     n_left = (u16) -1 - txvq->last_avail_idx + txvq->avail->idx;
 
   if (PREDICT_FALSE(!vui->admin_up)) {
-      /* if intf is admin down, just drop all packets waiting in the ring */
-      txvq->last_avail_idx = txvq->last_used_idx = txvq->avail->idx;
-      CLIB_MEMORY_BARRIER();
-      txvq->used->idx = txvq->last_used_idx;
-      vhost_user_send_call(vm, txvq);
-
-      return 0;
+    /* if intf is admin down, just drop all packets waiting in the ring */
+    txvq->last_avail_idx = txvq->last_used_idx = txvq->avail->idx;
+    CLIB_MEMORY_BARRIER();
+    txvq->used->idx = txvq->last_used_idx;
+    vhost_user_send_call(vm, txvq);
+    return 0;
   }
 
-  if (PREDICT_FALSE(n_left > txvq->qsz)) {
+  if (PREDICT_FALSE(n_left > txvq->qsz))
     return 0;
-  }
 
-  if (PREDICT_FALSE(n_left > VLIB_FRAME_SIZE))
+  qsz_mask = txvq->qsz - 1;
+  cpu_index = os_get_cpu_number();
+  drops = 0;
+  flush = 0;
+
+  if (n_left > VLIB_FRAME_SIZE)
     n_left = VLIB_FRAME_SIZE;
 
-  /* Make sure we have some RX buffers. */
-  {
-    uword l = vec_len (vum->rx_buffers[cpu_index]);
-    uword n_alloc;
+  /* Allocate some buffers.
+   * Note that buffers that are chained for jumbo
+   * frames are allocated separately using a slower path.
+   * The idea is to be certain to have enough buffers at least
+   * to cycle through the descriptors without having to check for errors.
+   * For jumbo frames, the bottleneck is memory copy anyway.
+   */
+  if (PREDICT_FALSE(!vum->rx_buffers[cpu_index])) {
+    vec_alloc (vum->rx_buffers[cpu_index], VLIB_FRAME_SIZE);
+
+    if (PREDICT_FALSE(!vum->rx_buffers[cpu_index]))
+      flush = n_left; //Drop all input
+  }
 
-    if (l < n_left)
-      {
-        if (! vum->rx_buffers[cpu_index]) {
-          vec_alloc (vum->rx_buffers[cpu_index], 2 * VLIB_FRAME_SIZE );
-        }
+  if (PREDICT_FALSE(_vec_len(vum->rx_buffers[cpu_index]) < n_left)) {
+    _vec_len(vum->rx_buffers[cpu_index]) +=
+        vlib_buffer_alloc_from_free_list(vm, vum->rx_buffers[cpu_index] + _vec_len(vum->rx_buffers[cpu_index]),
+                                         VLIB_FRAME_SIZE - _vec_len(vum->rx_buffers[cpu_index]),
+                                         VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
 
-        n_alloc = vlib_buffer_alloc_from_free_list
-            (vm, vum->rx_buffers[cpu_index] + l, 2 * VLIB_FRAME_SIZE - l,
-             free_list_index);
-        if (n_alloc == 0)
-          return 0;
-        _vec_len (vum->rx_buffers[cpu_index]) = l + n_alloc;
-      }
+    if (PREDICT_FALSE(n_left > _vec_len(vum->rx_buffers[cpu_index])))
+      flush = n_left - _vec_len(vum->rx_buffers[cpu_index]);
   }
 
-  qsz_mask = txvq->qsz - 1;
+  if (PREDICT_FALSE(flush)) {
+    //Remove some input buffers
+    drops += flush;
+    n_left -= flush;
+    vlib_error_count(vm, vhost_user_input_node.index,
+                     VHOST_USER_INPUT_FUNC_ERROR_NO_BUFFER, flush);
+    while (flush) {
+      u16 desc_chain_head = txvq->avail->ring[txvq->last_avail_idx & qsz_mask];
+      txvq->last_avail_idx++;
+      txvq->used->ring[txvq->last_used_idx & qsz_mask].id = desc_chain_head;
+      txvq->used->ring[txvq->last_used_idx & qsz_mask].len = 0;
+      txvq->last_used_idx++;
+      flush--;
+    }
+  }
 
+  rx_len = vec_len(vum->rx_buffers[cpu_index]); //vector might be null
   while (n_left > 0) {
     vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
 
     while (n_left > 0 && n_left_to_next > 0) {
-      vlib_buffer_t * b;
-           u16 desc_chain_head = txvq->avail->ring[txvq->last_avail_idx & qsz_mask];
-           u16 desc_current = desc_chain_head;
-           uword i_rx = vec_len (vum->rx_buffers[cpu_index]) - 1;
+      vlib_buffer_t *b_head, *b_current;
+      u32 bi_head, bi_current;
+      u16 desc_chain_head, desc_current;
+      u8 error = VHOST_USER_INPUT_FUNC_ERROR_NO_ERROR;
 
-      bi = vum->rx_buffers[cpu_index][i_rx];
-      b = vlib_get_buffer (vm, bi);
-
-      vlib_prefetch_buffer_with_index (vm, vum->rx_buffers[cpu_index][i_rx-1], STORE);
+      desc_chain_head = desc_current = txvq->avail->ring[txvq->last_avail_idx & qsz_mask];
+      bi_head = bi_current = vum->rx_buffers[cpu_index][--rx_len];
+      b_head = b_current = vlib_get_buffer (vm, bi_head);
+      vlib_buffer_chain_init(b_head);
 
       uword offset;
-      if (PREDICT_TRUE(vui->is_any_layout))
-        offset = vui->virtio_net_hdr_sz;
-      else if (!(txvq->desc[desc_current].flags & VIRTQ_DESC_F_NEXT))
-        /* WSA case, no ANYLAYOUT but single buffer */
+      if (PREDICT_TRUE(vui->is_any_layout) ||
+          !(txvq->desc[desc_current].flags & VIRTQ_DESC_F_NEXT)) {
+        /* ANYLAYOUT or single buffer */
         offset = vui->virtio_net_hdr_sz;
-      else
+      } else {
         /* CSR case without ANYLAYOUT, skip 1st buffer */
         offset = txvq->desc[desc_current].len;
-
-      uword ptr=0;
+      }
 
       while(1) {
         void * buffer_addr = map_guest_mem(vui, txvq->desc[desc_current].addr);
-        CLIB_PREFETCH (&txvq->desc[txvq->desc[desc_current].next], sizeof (vring_desc_t), READ);
+        if (PREDICT_FALSE(buffer_addr == 0)) {
+          error = VHOST_USER_INPUT_FUNC_ERROR_MMAP_FAIL;
+          break;
+        }
 
 #if VHOST_USER_COPY_TX_HDR == 1
-        if (PREDICT_TRUE(offset)) {
+        if (PREDICT_TRUE(offset))
           rte_memcpy(b->pre_data, buffer_addr, sizeof(virtio_net_hdr_t)); /* 12 byte hdr is not used on tx */
-        }
 #endif
 
         if (txvq->desc[desc_current].len > offset) {
           u16 len = txvq->desc[desc_current].len - offset;
+          u16 copied = vlib_buffer_chain_append_data_with_alloc(vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX,
+                                                   b_head, &b_current, buffer_addr + offset, len);
 
-          if (PREDICT_FALSE(len > VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES))
-            len = VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES;
-
-          rte_memcpy(vlib_buffer_get_current (b) + ptr,
-               buffer_addr + offset, len);
+          if (copied != len) {
+            error = VHOST_USER_INPUT_FUNC_ERROR_NO_BUFFER;
+            break;
+          }
         }
-        ptr += txvq->desc[desc_current].len - offset;
         offset = 0;
 
         /* if next flag is set, take next desc in the chain */
@@ -891,71 +910,60 @@ static u32 vhost_user_if_input ( vlib_main_t * vm,
           break;
       }
 
+      /* consume the descriptor and return it as used */
       txvq->last_avail_idx++;
-
-      /* returning buffer */
       txvq->used->ring[txvq->last_used_idx & qsz_mask].id = desc_chain_head;
-      txvq->used->ring[txvq->last_used_idx & qsz_mask].len = ptr + vui->virtio_net_hdr_sz;
-
+      txvq->used->ring[txvq->last_used_idx & qsz_mask].len = 0;
       txvq->last_used_idx++;
 
-      b->current_length = ptr;
-
-      if(PREDICT_FALSE(b->current_length < 14)) {
-          vlib_error_count(vm, vhost_user_input_node.index,
-                           VHOST_USER_INPUT_FUNC_ERROR_UNDERSIZED_FRAME, 1);
-          goto skip_frame;
+      if(PREDICT_FALSE(b_head->current_length < 14 &&
+                       error == VHOST_USER_INPUT_FUNC_ERROR_NO_ERROR)) {
+        error = VHOST_USER_INPUT_FUNC_ERROR_UNDERSIZED_FRAME;
       }
 
-      b->flags = 0;
-      b->current_data = 0;
-      b->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID;
-      n_rx_bytes += ptr;
-      _vec_len (vum->rx_buffers[cpu_index]) = i_rx;
+      VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b);
 
-           /*
-            * Turn this on if you run into
-            * "bad monkey" contexts, and you want to know exactly
-            * which nodes they've visited... See .../vlib/vlib/buffer.h
-            */
-           VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b);
+      vnet_buffer (b_head)->sw_if_index[VLIB_RX] = vui->sw_if_index;
+      vnet_buffer (b_head)->sw_if_index[VLIB_TX] = (u32)~0;
+      b_head->error = node->errors[error];
 
-      vnet_buffer (b)->sw_if_index[VLIB_RX] = vui->sw_if_index;
-      vnet_buffer (b)->sw_if_index[VLIB_TX] = (u32)~0;
-      b->error = node->errors[0];
+      if (PREDICT_FALSE (n_trace > n_rx_packets))
+        vec_add1 (vui->d_trace_buffers, bi_head);
+
+      if (PREDICT_FALSE(error)) {
+        drops++;
+        next0 = VHOST_USER_RX_NEXT_DROP;
+      } else {
+        n_rx_bytes += b_head->current_length + b_head->total_length_not_including_first_buffer;
+        n_rx_packets++;
+        next0 = VHOST_USER_RX_NEXT_ETHERNET_INPUT;
+      }
 
-      to_next[0] = bi;
+      to_next[0] = bi_head;
       to_next++;
       n_left_to_next--;
       vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
-                                 to_next, n_left_to_next,
-                                 bi, next_index);
-
-      if (PREDICT_FALSE (n_trace > n_rx_packets))
-            vec_add1 (vui->d_trace_buffers, bi);
-
-      n_rx_packets++;
-skip_frame:
-           n_left--;
+                                       to_next, n_left_to_next,
+                                       bi_head, next0);
+      n_left--;
     }
 
-    /* give buffers back to driver */
-    CLIB_MEMORY_BARRIER();
-    txvq->used->idx = txvq->last_used_idx;
-
     vlib_put_next_frame (vm, node, next_index, n_left_to_next);
   }
 
+  if (PREDICT_TRUE(vum->rx_buffers[cpu_index] != 0))
+    _vec_len(vum->rx_buffers[cpu_index]) = rx_len;
+
+  /* give buffers back to driver */
+  CLIB_MEMORY_BARRIER();
+  txvq->used->idx = txvq->last_used_idx;
+
   if (PREDICT_FALSE (vec_len (vui->d_trace_buffers) > 0))
   {
-      vhost_user_rx_trace (vm, node, vui, VHOST_NET_VRING_IDX_TX);
-      vlib_set_trace_count (vm, node, n_trace - vec_len (vui->d_trace_buffers));
+    vhost_user_rx_trace (vm, node, vui, VHOST_NET_VRING_IDX_TX);
+    vlib_set_trace_count (vm, node, n_trace - vec_len (vui->d_trace_buffers));
   }
 
-  /* if no packets received we're done */
-  if(!n_rx_packets)
-    return 0;
-
   /* interrupt (call) handling */
   if((txvq->callfd > 0) && !(txvq->avail->flags & 1)) {
     txvq->n_since_last_int += n_rx_packets;
@@ -964,6 +972,13 @@ skip_frame:
       vhost_user_send_call(vm, txvq);
   }
 
+  if (PREDICT_FALSE(drops)) {
+    vlib_increment_simple_counter
+    (vnet_main.interface_main.sw_if_counters
+     + VNET_INTERFACE_COUNTER_DROP, os_get_cpu_number(),
+     vui->sw_if_index, drops);
+  }
+
   /* increase rx counters */
   vlib_increment_combined_counter
   (vnet_main.interface_main.combined_sw_if_counters
@@ -1028,20 +1043,19 @@ vhost_user_intfc_tx (vlib_main_t * vm,
   u16 used_index;
   vhost_user_main_t * vum = &vhost_user_main;
   uword n_packets = 0;
-  uword n_avail_desc;
   vnet_interface_output_runtime_t * rd = (void *) node->runtime_data;
   vhost_user_intf_t * vui = vec_elt_at_index (vum->vhost_user_interfaces, rd->dev_instance);
   vhost_user_vring_t * rxvq = &vui->vrings[VHOST_NET_VRING_IDX_RX];
   u16 qsz_mask;
+  u8 error = VHOST_USER_TX_FUNC_ERROR_NONE;
 
   if (PREDICT_FALSE(!vui->is_up))
      goto done2;
 
-  if (PREDICT_FALSE(!rxvq->desc))
+  if (PREDICT_FALSE(!rxvq->desc || !rxvq->avail || vui->sock_errno != 0)) {
+     error = VHOST_USER_TX_FUNC_ERROR_NOT_READY;
      goto done2;
-
-  if (PREDICT_FALSE(!rxvq->avail))
-    goto done2;
+  }
 
   if (PREDICT_FALSE(vui->lockp != 0))
     {
@@ -1049,225 +1063,136 @@ vhost_user_intfc_tx (vlib_main_t * vm,
         ;
     }
 
-
   /* only bit 0 of avail.flags is used so we don't want to deal with this
      interface if any other bit is set */
-  if (PREDICT_FALSE(rxvq->avail->flags & 0xFFFE))
-      goto done2;
-
-  if (PREDICT_FALSE((rxvq->avail->idx == rxvq->last_avail_idx) ||
-                    vui->sock_errno != 0)) {
-     vlib_simple_counter_main_t * cm;
-     vnet_main_t * vnm = vnet_get_main();
-
-     cm = vec_elt_at_index (vnm->interface_main.sw_if_counters,
-                            VNET_INTERFACE_COUNTER_TX_ERROR);
-     vlib_increment_simple_counter (cm, os_get_cpu_number(),
-                                    0, frame->n_vectors);
-
-     vlib_error_count (vm, node->node_index,
-                       VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF,
-                       frame->n_vectors);
+  if (PREDICT_FALSE(rxvq->avail->flags & 0xFFFE)) {
+    error = VHOST_USER_TX_FUNC_ERROR_NOT_READY;
+    goto done2;
+  }
+
+  if (PREDICT_FALSE((rxvq->avail->idx == rxvq->last_avail_idx))) {
+     error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF;
      goto done2;
    }
 
-   if (PREDICT_TRUE(rxvq->avail->idx > rxvq->last_avail_idx))
-     n_avail_desc = rxvq->avail->idx - rxvq->last_avail_idx;
-   else /* wrapped */
-     n_avail_desc = (u16) -1 - rxvq->last_avail_idx + rxvq->avail->idx;
-
-  DBG_VQ("rxvq->avail->idx %d rxvq->last_avail_idx %d n_avail_desc %d",
-    rxvq->avail->idx, rxvq->last_avail_idx, n_avail_desc);
-
   n_left = n_packets = frame->n_vectors;
-  if (PREDICT_FALSE(n_packets > n_avail_desc)) {
-    vlib_simple_counter_main_t * cm;
-    vnet_main_t * vnm = vnet_get_main();
-
-    cm = vec_elt_at_index (vnm->interface_main.sw_if_counters,
-                           VNET_INTERFACE_COUNTER_TX_ERROR);
-    vlib_increment_simple_counter (cm, os_get_cpu_number(),
-                                   0, frame->n_vectors);
-
-    vlib_error_count (vm, node->node_index,
-                      VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF,
-                      n_packets - n_avail_desc);
-    n_left = n_packets = n_avail_desc;
-  }
-
   used_index = rxvq->used->idx;
   qsz_mask = rxvq->qsz - 1; /* qsz is always power of 2 */
 
-  while (n_left >= 4)
-  {
-      vlib_buffer_t * b0, * b1;
-      u16 desc_chain_head0,desc_chain_head1;
-      u16 desc_current0,desc_current1;
-      uword offset0, offset1;
-      u16 bytes_left0, bytes_left1;
-      void *buffer_addr0, *buffer_addr1;
-
-      vlib_prefetch_buffer_with_index (vm, buffers[2], LOAD);
-      vlib_prefetch_buffer_with_index (vm, buffers[3], LOAD);
-
-      b0 = vlib_get_buffer (vm, buffers[0]);
-      b1 = vlib_get_buffer (vm, buffers[1]);
-      buffers+=2;
-      n_left-=2;
-
-      desc_current0 = desc_chain_head0 = rxvq->avail->ring[rxvq->last_avail_idx & qsz_mask];
-      desc_current1 = desc_chain_head1 = rxvq->avail->ring[(rxvq->last_avail_idx+1) & qsz_mask];
-
-      offset0 = vui->virtio_net_hdr_sz;
-
-      offset1 = vui->virtio_net_hdr_sz;
-
-      bytes_left0 = b0->current_length;
-      bytes_left1 = b1->current_length;
-
-      buffer_addr0 = map_guest_mem(vui, rxvq->desc[desc_current0].addr);
-      buffer_addr1 = map_guest_mem(vui, rxvq->desc[desc_current1].addr);
-
-      if (PREDICT_FALSE(!buffer_addr0)) {
-        vlib_error_count (vm, node->node_index, VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL, 1);
-        goto done;
-      }
-      if (PREDICT_FALSE(!buffer_addr1)) {
-        vlib_error_count (vm, node->node_index, VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL, 1);
-        goto done;
-      }
-
-      virtio_net_hdr_mrg_rxbuf_t * hdr0 = (virtio_net_hdr_mrg_rxbuf_t *) buffer_addr0;
-      virtio_net_hdr_mrg_rxbuf_t * hdr1 = (virtio_net_hdr_mrg_rxbuf_t *) buffer_addr1;
-      hdr0->hdr.flags = 0;
-      hdr1->hdr.flags = 0;
-      hdr0->hdr.gso_type = 0;
-      hdr1->hdr.gso_type = 0;
-
-      if (vui->virtio_net_hdr_sz == 12) {
-        hdr0->num_buffers = 1;
-        hdr1->num_buffers = 1;
-      }
-
-      buffer_addr0 += offset0;
-      buffer_addr1 += offset1;
-
-      if (PREDICT_FALSE(!vui->is_any_layout && rxvq->desc[desc_current0].flags & VIRTQ_DESC_F_NEXT))
-        rxvq->desc[desc_current0].len = vui->virtio_net_hdr_sz;
-
-      if (PREDICT_FALSE(!vui->is_any_layout && rxvq->desc[desc_current1].flags & VIRTQ_DESC_F_NEXT))
-        rxvq->desc[desc_current1].len = vui->virtio_net_hdr_sz;
-
-      while(1) {
-        if (rxvq->desc[desc_current0].len - offset0 > 0 ) {
-          u16 bytes_to_copy = bytes_left0 > (rxvq->desc[desc_current0].len - offset0) ? (rxvq->desc[desc_current0].len - offset0) : bytes_left0;
-          rte_memcpy(buffer_addr0, vlib_buffer_get_current (b0) + b0->current_length - bytes_left0, bytes_to_copy);
-          bytes_left0 -= bytes_to_copy;
-        }
-
-        if (rxvq->desc[desc_current0].flags & VIRTQ_DESC_F_NEXT ) {
-          offset0 = 0;
-          desc_current0 = rxvq->desc[desc_current1].next;
-          buffer_addr0 = map_guest_mem(vui, rxvq->desc[desc_current0].addr);
-          if (PREDICT_FALSE(!buffer_addr0)) {
-            vlib_error_count (vm, node->node_index, VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL, 1);
-            goto done;
-          }
-        }
-        else
-          break;
-      }
-
-      while(1) {
-        if (rxvq->desc[desc_current1].len - offset1 > 0 ) {
-          u16 bytes_to_copy = bytes_left1 > (rxvq->desc[desc_current1].len - offset1) ? (rxvq->desc[desc_current1].len - offset1) : bytes_left1;
-          rte_memcpy(buffer_addr1, vlib_buffer_get_current (b1) + b1->current_length - bytes_left1, bytes_to_copy);
-          bytes_left1 -= bytes_to_copy;
-        }
-
-        if (rxvq->desc[desc_current1].flags & VIRTQ_DESC_F_NEXT ) {
-          offset1 = 0;
-          desc_current1 = rxvq->desc[desc_current1].next;
-          buffer_addr1 = map_guest_mem(vui, rxvq->desc[desc_current1].addr);
-          if (PREDICT_FALSE(!buffer_addr1)) {
-            vlib_error_count (vm, node->node_index, VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL, 1);
-            goto done;
-          }
-        }
-        else
-          break;
-      }
-
-      rxvq->used->ring[used_index & qsz_mask].id = desc_chain_head0;
-      rxvq->used->ring[used_index & qsz_mask].len = b0->current_length + vui->virtio_net_hdr_sz;
-      used_index+=1;
-      rxvq->used->ring[used_index & qsz_mask].id = desc_chain_head1;
-      rxvq->used->ring[used_index & qsz_mask].len = b1->current_length + vui->virtio_net_hdr_sz;
-      used_index+=1;
-      rxvq->last_avail_idx+=2;
-  }
-
   while (n_left > 0)
   {
-      vlib_buffer_t * b0;
-      u16 desc_chain_head;
-      u16 desc_current;
+      vlib_buffer_t *b0, *current_b0;
+      u16 desc_chain_head, desc_current, desc_len;
       void *buffer_addr;
+      uword offset;
+
+      if (n_left >= 2)
+        vlib_prefetch_buffer_with_index (vm, buffers[1], LOAD);
 
       b0 = vlib_get_buffer (vm, buffers[0]);
       buffers++;
       n_left--;
 
-      desc_chain_head = rxvq->avail->ring[rxvq->last_avail_idx & qsz_mask];
-      desc_current = desc_chain_head;
-
-      uword offset = vui->virtio_net_hdr_sz;
+      if (PREDICT_FALSE(rxvq->last_avail_idx == rxvq->avail->idx)) {
+        error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF;
+        goto done;
+      }
 
-      u16 bytes_left = b0->current_length;
-      buffer_addr = map_guest_mem(vui, rxvq->desc[desc_current].addr);
-      if (PREDICT_FALSE(!buffer_addr)) {
-        vlib_error_count (vm, node->node_index, VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL, 1);
+      desc_current = desc_chain_head = rxvq->avail->ring[rxvq->last_avail_idx & qsz_mask];
+      offset = vui->virtio_net_hdr_sz;
+      desc_len = offset;
+      if (PREDICT_FALSE(!(buffer_addr = map_guest_mem(vui, rxvq->desc[desc_current].addr)))) {
+        error = VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL;
         goto done;
       }
+      CLIB_PREFETCH(buffer_addr, clib_min(rxvq->desc[desc_current].len, 500), STORE);
 
       virtio_net_hdr_mrg_rxbuf_t * hdr = (virtio_net_hdr_mrg_rxbuf_t *) buffer_addr;
       hdr->hdr.flags = 0;
       hdr->hdr.gso_type = 0;
 
-      if (vui->virtio_net_hdr_sz == 12) {
+      if (vui->virtio_net_hdr_sz == 12)
         hdr->num_buffers = 1;
-      }
 
+      u16 bytes_left = b0->current_length;
       buffer_addr += offset;
+      current_b0 = b0;
 
-      if (PREDICT_FALSE(!vui->is_any_layout && rxvq->desc[desc_current].flags & VIRTQ_DESC_F_NEXT))
+      //FIXME: This was in the code but I don't think it is valid
+      /*if (PREDICT_FALSE(!vui->is_any_layout && (rxvq->desc[desc_current].flags & VIRTQ_DESC_F_NEXT))) {
         rxvq->desc[desc_current].len = vui->virtio_net_hdr_sz;
+      }*/
 
       while(1) {
-        if (rxvq->desc[desc_current].len - offset > 0 ) {
-          u16 bytes_to_copy = bytes_left > (rxvq->desc[desc_current].len - offset) ? (rxvq->desc[desc_current].len - offset) : bytes_left;
-          rte_memcpy(buffer_addr, vlib_buffer_get_current (b0) + b0->current_length - bytes_left, bytes_to_copy);
-          bytes_left -= bytes_to_copy;
+        if (!bytes_left) { //Get new input
+          if (current_b0->flags & VLIB_BUFFER_NEXT_PRESENT) {
+            current_b0 = vlib_get_buffer(vm, current_b0->next_buffer);
+            bytes_left = current_b0->current_length;
+          } else {
+            //End of packet
+            break;
+          }
         }
 
-        if (rxvq->desc[desc_current].flags & VIRTQ_DESC_F_NEXT ) {
-          offset = 0;
-          desc_current = rxvq->desc[desc_current].next;
-          buffer_addr = map_guest_mem(vui, rxvq->desc[desc_current].addr);
-          if (PREDICT_FALSE(!buffer_addr)) {
-            vlib_error_count (vm, node->node_index, VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL, 1);
+        if (rxvq->desc[desc_current].len <= offset) { //Get new output
+          if (rxvq->desc[desc_current].flags & VIRTQ_DESC_F_NEXT) {
+            offset = 0;
+            desc_current = rxvq->desc[desc_current].next;
+            if (PREDICT_FALSE(!(buffer_addr = map_guest_mem(vui, rxvq->desc[desc_current].addr)))) {
+              used_index -= hdr->num_buffers - 1;
+              rxvq->last_avail_idx -= hdr->num_buffers - 1;
+              error = VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL;
+              goto done;
+            }
+          } else if (vui->virtio_net_hdr_sz == 12) { //MRG is available
+
+            //Move from available to used buffer
+            rxvq->used->ring[used_index & qsz_mask].id = desc_chain_head;
+            rxvq->used->ring[used_index & qsz_mask].len = desc_len;
+            rxvq->last_avail_idx++;
+            used_index++;
+            hdr->num_buffers++;
+
+            if (PREDICT_FALSE(rxvq->last_avail_idx == rxvq->avail->idx)) {
+              //Dequeue queued descriptors for this packet
+              used_index -= hdr->num_buffers - 1;
+              rxvq->last_avail_idx -= hdr->num_buffers - 1;
+              error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF;
+              goto done;
+            }
+
+            //Look at next one
+            desc_chain_head = rxvq->avail->ring[rxvq->last_avail_idx & qsz_mask];
+            desc_current = desc_chain_head;
+            desc_len = 0;
+            offset = 0;
+            if (PREDICT_FALSE(!(buffer_addr = map_guest_mem(vui, rxvq->desc[desc_current].addr)))) {
+              //Dequeue queued descriptors for this packet
+              used_index -= hdr->num_buffers - 1;
+              rxvq->last_avail_idx -= hdr->num_buffers - 1;
+              error = VHOST_USER_TX_FUNC_ERROR_MMAP_FAIL;
+              goto done;
+            }
+          } else {
+            error = VHOST_USER_TX_FUNC_ERROR_PKT_DROP_NOBUF;
             goto done;
           }
         }
-        else 
-          break;
+
+        u16 bytes_to_copy = bytes_left > (rxvq->desc[desc_current].len - offset) ? (rxvq->desc[desc_current].len - offset) : bytes_left;
+        rte_memcpy(buffer_addr, vlib_buffer_get_current (current_b0) + current_b0->current_length - bytes_left, bytes_to_copy);
+
+        bytes_left -= bytes_to_copy;
+        offset += bytes_to_copy;
+        buffer_addr += bytes_to_copy;
+        desc_len += bytes_to_copy;
       }
 
+      //Move from available to used ring
       rxvq->used->ring[used_index & qsz_mask].id = desc_chain_head;
-      rxvq->used->ring[used_index & qsz_mask].len = b0->current_length + vui->virtio_net_hdr_sz;
-
-      used_index++;
+      rxvq->used->ring[used_index & qsz_mask].len = desc_len;
       rxvq->last_avail_idx++;
+      used_index++;
   }
 
 done:
@@ -1287,6 +1212,16 @@ done2:
   if (PREDICT_FALSE(vui->lockp != 0))
       *vui->lockp = 0;
 
+  if (PREDICT_FALSE(n_left && error != VHOST_USER_TX_FUNC_ERROR_NONE)) {
+    vlib_error_count(vm, node->node_index, error, n_left);
+    vlib_increment_simple_counter
+    (vnet_main.interface_main.sw_if_counters
+     + VNET_INTERFACE_COUNTER_DROP,
+     os_get_cpu_number(),
+     vui->sw_if_index,
+     n_left);
+  }
+
   vlib_buffer_free (vm, vlib_frame_args (frame), frame->n_vectors);
   return frame->n_vectors;
 }
@@ -1316,6 +1251,7 @@ VNET_DEVICE_CLASS (vhost_user_dev_class,static) = {
   .format_device_name = format_vhost_user_interface_name,
   .name_renumber = vhost_user_name_renumber,
   .admin_up_down_function = vhost_user_interface_admin_up_down,
+  .no_flatten_output_chains = 1,
 };
 
 static uword
@@ -1520,6 +1456,9 @@ static void vhost_user_create_ethernet(vnet_main_t * vnm, vlib_main_t * vm,
      0 /* flag change */);
   if (error)
     clib_error_report (error);
+
+  vnet_hw_interface_t *hi = vnet_get_hw_interface (vnm, vui->hw_if_index);
+  hi->max_l3_packet_bytes[VLIB_RX] = hi->max_l3_packet_bytes[VLIB_TX] = 9000;
 }
 
 // initialize vui with specified attributes