Add support for tcp/session buffer chains
[vpp.git] / src / vnet / session / node.c
index 2d12ee2..ce7c386 100644 (file)
@@ -70,6 +70,58 @@ static u32 session_type_to_next[] = {
   SESSION_QUEUE_NEXT_IP6_LOOKUP,
 };
 
+always_inline void
+session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
+                           u8 thread_index, svm_fifo_t * fifo,
+                           vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg,
+                           u32 * left_to_snd0, u16 * n_bufs, u32 * rx_offset,
+                           u16 deq_per_buf, u8 peek_data)
+{
+  vlib_buffer_t *chain_b0, *prev_b0;
+  u32 chain_bi0;
+  u16 len_to_deq0, n_bytes_read;
+  u8 *data0, j;
+
+  chain_bi0 = bi0;
+  chain_b0 = b0;
+  for (j = 1; j < n_bufs_per_seg; j++)
+    {
+      prev_b0 = chain_b0;
+      len_to_deq0 = clib_min (*left_to_snd0, deq_per_buf);
+
+      *n_bufs -= 1;
+      chain_bi0 = smm->tx_buffers[thread_index][*n_bufs];
+      _vec_len (smm->tx_buffers[thread_index]) = *n_bufs;
+
+      chain_b0 = vlib_get_buffer (vm, chain_bi0);
+      chain_b0->current_data = 0;
+      data0 = vlib_buffer_get_current (chain_b0);
+      if (peek_data)
+       {
+         n_bytes_read = svm_fifo_peek (fifo, *rx_offset, len_to_deq0, data0);
+         *rx_offset += n_bytes_read;
+       }
+      else
+       {
+         n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0);
+       }
+      ASSERT (n_bytes_read == len_to_deq0);
+      chain_b0->current_length = n_bytes_read;
+      b0->total_length_not_including_first_buffer += chain_b0->current_length;
+
+      /* update previous buffer */
+      prev_b0->next_buffer = chain_bi0;
+      prev_b0->flags |= VLIB_BUFFER_NEXT_PRESENT;
+
+      /* update current buffer */
+      chain_b0->next_buffer = 0;
+
+      *left_to_snd0 -= n_bytes_read;
+      if (*left_to_snd0 == 0)
+       break;
+    }
+}
+
 always_inline int
 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                session_manager_main_t * smm,
@@ -78,16 +130,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                int *n_tx_packets, u8 peek_data)
 {
   u32 n_trace = vlib_get_trace_count (vm, node);
-  u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
-  u32 n_frame_bytes, n_frames_per_evt;
+  u32 left_to_snd0, max_len_to_snd0, len_to_deq0, snd_space0;
+  u32 n_bufs_per_evt, n_frames_per_evt;
   transport_connection_t *tc0;
   transport_proto_vft_t *transport_vft;
   u32 next_index, next0, *to_next, n_left_to_next, bi0;
   vlib_buffer_t *b0;
-  u32 rx_offset = 0, max_dequeue0;
-  u16 snd_mss0;
+  u32 rx_offset = 0, max_dequeue0, n_bytes_per_seg;
+  u16 snd_mss0, n_bufs_per_seg, n_bufs;
   u8 *data0;
   int i, n_bytes_read;
+  u32 n_bytes_per_buf, deq_per_buf;
 
   next_index = next0 = session_type_to_next[s0->session_type];
 
@@ -134,8 +187,15 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
       max_len_to_snd0 = snd_space0;
     }
 
-  n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE;
-  n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes);
+  n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm,
+                                                      VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
+  n_bytes_per_seg = MAX_HDRS_LEN + snd_mss0;
+  n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf);
+  n_bufs_per_evt = (ceil ((double) max_len_to_snd0 / n_bytes_per_seg))
+    * n_bufs_per_seg;
+  n_frames_per_evt = ceil ((double) n_bufs_per_evt / VLIB_FRAME_SIZE);
+
+  deq_per_buf = clib_min (snd_mss0, n_bytes_per_buf);
 
   n_bufs = vec_len (smm->tx_buffers[thread_index]);
   left_to_snd0 = max_len_to_snd0;
@@ -146,9 +206,9 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
        {
          vec_validate (smm->tx_buffers[thread_index],
                        n_bufs + VLIB_FRAME_SIZE - 1);
-         n_bufs +=
-           vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs],
-                              VLIB_FRAME_SIZE);
+         n_bufs += vlib_buffer_alloc (vm,
+                                      &smm->tx_buffers[thread_index][n_bufs],
+                                      VLIB_FRAME_SIZE);
 
          /* buffer shortage
           * XXX 0.9 because when debugging we might not get a full frame */
@@ -165,11 +225,14 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
        }
 
       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
-      while (left_to_snd0 && n_left_to_next)
+      while (left_to_snd0 && n_left_to_next >= n_bufs_per_seg)
        {
+         /*
+          * Handle first buffer in chain separately
+          */
+
          /* Get free buffer */
-         n_bufs--;
-         bi0 = smm->tx_buffers[thread_index][n_bufs];
+         bi0 = smm->tx_buffers[thread_index][--n_bufs];
          _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
 
          b0 = vlib_get_buffer (vm, bi0);
@@ -177,52 +240,19 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
          b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID
            | VNET_BUFFER_LOCALLY_ORIGINATED;
          b0->current_data = 0;
+         b0->total_length_not_including_first_buffer = 0;
 
          /* RX on the local interface. tx in default fib */
          vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
          vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
 
-         /* usual speculation, or the enqueue_x1 macro will barf */
-         to_next[0] = bi0;
-         to_next += 1;
-         n_left_to_next -= 1;
-
-         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
-         if (PREDICT_FALSE (n_trace > 0))
-           {
-             session_queue_trace_t *t0;
-             vlib_trace_buffer (vm, node, next_index, b0,
-                                1 /* follow_chain */ );
-             vlib_set_trace_count (vm, node, --n_trace);
-             t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
-             t0->session_index = s0->session_index;
-             t0->server_thread_index = s0->thread_index;
-           }
+         len_to_deq0 = clib_min (left_to_snd0, deq_per_buf);
 
-         len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
-
-         /* *INDENT-OFF* */
-         SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
-             ed->data[0] = e0->event_id;
-             ed->data[1] = max_dequeue0;
-             ed->data[2] = len_to_deq0;
-             ed->data[3] = left_to_snd0;
-         }));
-         /* *INDENT-ON* */
-
-         /* Make room for headers */
          data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
-
-         /* Dequeue the data
-          * TODO 1) peek instead of dequeue
-          *      2) buffer chains */
          if (peek_data)
            {
              n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset,
                                            len_to_deq0, data0);
-             if (n_bytes_read <= 0)
-               goto dequeue_fail;
-
              /* Keep track of progress locally, transport is also supposed to
               * increment it independently when pushing the header */
              rx_offset += n_bytes_read;
@@ -231,18 +261,56 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
            {
              n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
                                                      len_to_deq0, data0);
-             if (n_bytes_read <= 0)
-               goto dequeue_fail;
            }
 
-         b0->current_length = n_bytes_read;
+         if (n_bytes_read <= 0)
+           goto dequeue_fail;
 
-         /* Ask transport to push header */
-         transport_vft->push_header (tc0, b0);
+         b0->current_length = n_bytes_read;
 
          left_to_snd0 -= n_bytes_read;
          *n_tx_packets = *n_tx_packets + 1;
 
+         /*
+          * Fill in the remaining buffers in the chain, if any
+          */
+         if (PREDICT_FALSE (n_bufs_per_seg > 1))
+           session_tx_fifo_chain_tail (smm, vm, thread_index,
+                                       s0->server_tx_fifo, b0, bi0,
+                                       n_bufs_per_seg, &left_to_snd0,
+                                       &n_bufs, &rx_offset, deq_per_buf,
+                                       peek_data);
+
+         /* Ask transport to push header after current_length and
+          * total_length_not_including_first_buffer are updated */
+         transport_vft->push_header (tc0, b0);
+
+         /* *INDENT-OFF* */
+         SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
+             ed->data[0] = e0->event_id;
+             ed->data[1] = max_dequeue0;
+             ed->data[2] = len_to_deq0;
+             ed->data[3] = left_to_snd0;
+         }));
+         /* *INDENT-ON* */
+
+         /* usual speculation, or the enqueue_x1 macro will barf */
+         to_next[0] = bi0;
+         to_next += 1;
+         n_left_to_next -= 1;
+
+         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
+         if (PREDICT_FALSE (n_trace > 0))
+           {
+             session_queue_trace_t *t0;
+             vlib_trace_buffer (vm, node, next_index, b0,
+                                1 /* follow_chain */ );
+             vlib_set_trace_count (vm, node, --n_trace);
+             t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
+             t0->session_index = s0->session_index;
+             t0->server_thread_index = s0->thread_index;
+           }
+
          vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
                                           to_next, n_left_to_next,
                                           bi0, next0);