Add support for tcp/session buffer chains
[vpp.git] / src / vnet / session / node.c
index 8681105..ce7c386 100644 (file)
@@ -70,6 +70,58 @@ static u32 session_type_to_next[] = {
   SESSION_QUEUE_NEXT_IP6_LOOKUP,
 };
 
+always_inline void
+session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
+                           u8 thread_index, svm_fifo_t * fifo,
+                           vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg,
+                           u32 * left_to_snd0, u16 * n_bufs, u32 * rx_offset,
+                           u16 deq_per_buf, u8 peek_data)
+{
+  vlib_buffer_t *chain_b0, *prev_b0;
+  u32 chain_bi0;
+  u16 len_to_deq0, n_bytes_read;
+  u8 *data0, j;
+
+  chain_bi0 = bi0;
+  chain_b0 = b0;
+  for (j = 1; j < n_bufs_per_seg; j++)
+    {
+      prev_b0 = chain_b0;
+      len_to_deq0 = clib_min (*left_to_snd0, deq_per_buf);
+
+      *n_bufs -= 1;
+      chain_bi0 = smm->tx_buffers[thread_index][*n_bufs];
+      _vec_len (smm->tx_buffers[thread_index]) = *n_bufs;
+
+      chain_b0 = vlib_get_buffer (vm, chain_bi0);
+      chain_b0->current_data = 0;
+      data0 = vlib_buffer_get_current (chain_b0);
+      if (peek_data)
+       {
+         n_bytes_read = svm_fifo_peek (fifo, *rx_offset, len_to_deq0, data0);
+         *rx_offset += n_bytes_read;
+       }
+      else
+       {
+         n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0);
+       }
+      ASSERT (n_bytes_read == len_to_deq0);
+      chain_b0->current_length = n_bytes_read;
+      b0->total_length_not_including_first_buffer += chain_b0->current_length;
+
+      /* update previous buffer */
+      prev_b0->next_buffer = chain_bi0;
+      prev_b0->flags |= VLIB_BUFFER_NEXT_PRESENT;
+
+      /* update current buffer */
+      chain_b0->next_buffer = 0;
+
+      *left_to_snd0 -= n_bytes_read;
+      if (*left_to_snd0 == 0)
+       break;
+    }
+}
+
 always_inline int
 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                session_manager_main_t * smm,
@@ -78,16 +130,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                int *n_tx_packets, u8 peek_data)
 {
   u32 n_trace = vlib_get_trace_count (vm, node);
-  u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
-  u32 n_frame_bytes, n_frames_per_evt;
+  u32 left_to_snd0, max_len_to_snd0, len_to_deq0, snd_space0;
+  u32 n_bufs_per_evt, n_frames_per_evt;
   transport_connection_t *tc0;
   transport_proto_vft_t *transport_vft;
   u32 next_index, next0, *to_next, n_left_to_next, bi0;
   vlib_buffer_t *b0;
-  u32 rx_offset = 0, max_dequeue0;
-  u16 snd_mss0;
+  u32 rx_offset = 0, max_dequeue0, n_bytes_per_seg;
+  u16 snd_mss0, n_bufs_per_seg, n_bufs;
   u8 *data0;
   int i, n_bytes_read;
+  u32 n_bytes_per_buf, deq_per_buf;
 
   next_index = next0 = session_type_to_next[s0->session_type];
 
@@ -95,8 +148,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
 
   /* Make sure we have space to send and there's something to dequeue */
-  snd_space0 = transport_vft->send_space (tc0);
   snd_mss0 = transport_vft->send_mss (tc0);
+  snd_space0 = transport_vft->send_space (tc0);
 
   /* Can't make any progress */
   if (snd_space0 == 0 || snd_mss0 == 0)
@@ -119,18 +172,30 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   /* Nothing to read return */
   if (max_dequeue0 == 0)
-    {
-      return 0;
-    }
+    return 0;
 
   /* Ensure we're not writing more than transport window allows */
-  max_len_to_snd0 = clib_min (max_dequeue0, snd_space0);
+  if (max_dequeue0 < snd_space0)
+    {
+      /* Constrained by tx queue. Try to send only fully formed segments */
+      max_len_to_snd0 = (max_dequeue0 > snd_mss0) ?
+       max_dequeue0 - max_dequeue0 % snd_mss0 : max_dequeue0;
+      /* TODO Nagle ? */
+    }
+  else
+    {
+      max_len_to_snd0 = snd_space0;
+    }
 
-  /* TODO check if transport is willing to send len_to_snd0
-   * bytes (Nagle) */
+  n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm,
+                                                      VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
+  n_bytes_per_seg = MAX_HDRS_LEN + snd_mss0;
+  n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf);
+  n_bufs_per_evt = (ceil ((double) max_len_to_snd0 / n_bytes_per_seg))
+    * n_bufs_per_seg;
+  n_frames_per_evt = ceil ((double) n_bufs_per_evt / VLIB_FRAME_SIZE);
 
-  n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE;
-  n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes);
+  deq_per_buf = clib_min (snd_mss0, n_bytes_per_buf);
 
   n_bufs = vec_len (smm->tx_buffers[thread_index]);
   left_to_snd0 = max_len_to_snd0;
@@ -141,9 +206,9 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
        {
          vec_validate (smm->tx_buffers[thread_index],
                        n_bufs + VLIB_FRAME_SIZE - 1);
-         n_bufs +=
-           vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs],
-                              VLIB_FRAME_SIZE);
+         n_bufs += vlib_buffer_alloc (vm,
+                                      &smm->tx_buffers[thread_index][n_bufs],
+                                      VLIB_FRAME_SIZE);
 
          /* buffer shortage
           * XXX 0.9 because when debugging we might not get a full frame */
@@ -160,11 +225,14 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
        }
 
       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
-      while (left_to_snd0 && n_left_to_next)
+      while (left_to_snd0 && n_left_to_next >= n_bufs_per_seg)
        {
+         /*
+          * Handle first buffer in chain separately
+          */
+
          /* Get free buffer */
-         n_bufs--;
-         bi0 = smm->tx_buffers[thread_index][n_bufs];
+         bi0 = smm->tx_buffers[thread_index][--n_bufs];
          _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
 
          b0 = vlib_get_buffer (vm, bi0);
@@ -172,52 +240,19 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
          b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID
            | VNET_BUFFER_LOCALLY_ORIGINATED;
          b0->current_data = 0;
+         b0->total_length_not_including_first_buffer = 0;
 
          /* RX on the local interface. tx in default fib */
          vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
          vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
 
-         /* usual speculation, or the enqueue_x1 macro will barf */
-         to_next[0] = bi0;
-         to_next += 1;
-         n_left_to_next -= 1;
-
-         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
-         if (PREDICT_FALSE (n_trace > 0))
-           {
-             session_queue_trace_t *t0;
-             vlib_trace_buffer (vm, node, next_index, b0,
-                                1 /* follow_chain */ );
-             vlib_set_trace_count (vm, node, --n_trace);
-             t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
-             t0->session_index = s0->session_index;
-             t0->server_thread_index = s0->thread_index;
-           }
-
-         len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
-
-         /* *INDENT-OFF* */
-         SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
-             ed->data[0] = e0->event_id;
-             ed->data[1] = max_dequeue0;
-             ed->data[2] = len_to_deq0;
-             ed->data[3] = left_to_snd0;
-         }));
-         /* *INDENT-ON* */
+         len_to_deq0 = clib_min (left_to_snd0, deq_per_buf);
 
-         /* Make room for headers */
          data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
-
-         /* Dequeue the data
-          * TODO 1) peek instead of dequeue
-          *      2) buffer chains */
          if (peek_data)
            {
-             n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
-                                           rx_offset, len_to_deq0, data0);
-             if (n_bytes_read <= 0)
-               goto dequeue_fail;
-
+             n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset,
+                                           len_to_deq0, data0);
              /* Keep track of progress locally, transport is also supposed to
               * increment it independently when pushing the header */
              rx_offset += n_bytes_read;
@@ -225,20 +260,57 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
          else
            {
              n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
-                                                     s0->pid, len_to_deq0,
-                                                     data0);
-             if (n_bytes_read <= 0)
-               goto dequeue_fail;
+                                                     len_to_deq0, data0);
            }
 
-         b0->current_length = n_bytes_read;
+         if (n_bytes_read <= 0)
+           goto dequeue_fail;
 
-         /* Ask transport to push header */
-         transport_vft->push_header (tc0, b0);
+         b0->current_length = n_bytes_read;
 
          left_to_snd0 -= n_bytes_read;
          *n_tx_packets = *n_tx_packets + 1;
 
+         /*
+          * Fill in the remaining buffers in the chain, if any
+          */
+         if (PREDICT_FALSE (n_bufs_per_seg > 1))
+           session_tx_fifo_chain_tail (smm, vm, thread_index,
+                                       s0->server_tx_fifo, b0, bi0,
+                                       n_bufs_per_seg, &left_to_snd0,
+                                       &n_bufs, &rx_offset, deq_per_buf,
+                                       peek_data);
+
+         /* Ask transport to push header after current_length and
+          * total_length_not_including_first_buffer are updated */
+         transport_vft->push_header (tc0, b0);
+
+         /* *INDENT-OFF* */
+         SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
+             ed->data[0] = e0->event_id;
+             ed->data[1] = max_dequeue0;
+             ed->data[2] = len_to_deq0;
+             ed->data[3] = left_to_snd0;
+         }));
+         /* *INDENT-ON* */
+
+         /* usual speculation, or the enqueue_x1 macro will barf */
+         to_next[0] = bi0;
+         to_next += 1;
+         n_left_to_next -= 1;
+
+         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
+         if (PREDICT_FALSE (n_trace > 0))
+           {
+             session_queue_trace_t *t0;
+             vlib_trace_buffer (vm, node, next_index, b0,
+                                1 /* follow_chain */ );
+             vlib_set_trace_count (vm, node, --n_trace);
+             t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
+             t0->session_index = s0->session_index;
+             t0->server_thread_index = s0->thread_index;
+           }
+
          vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
                                           to_next, n_left_to_next,
                                           bi0, next0);
@@ -296,6 +368,26 @@ session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
                                         n_tx_pkts, 0);
 }
 
+stream_session_t *
+session_event_get_session (session_fifo_event_t * e0, u8 thread_index)
+{
+  svm_fifo_t *f0;
+  stream_session_t *s0;
+  u32 session_index0;
+
+  f0 = e0->fifo;
+  session_index0 = f0->master_session_index;
+
+  /* $$$ add multiple event queues, per vpp worker thread */
+  ASSERT (f0->master_thread_index == thread_index);
+
+  s0 = stream_session_get_if_valid (session_index0, thread_index);
+
+  ASSERT (s0->thread_index == thread_index);
+
+  return s0;
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
@@ -306,13 +398,16 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   unix_shared_memory_queue_t *q;
   application_t *app;
   int n_tx_packets = 0;
-  u32 my_thread_index = vm->cpu_index;
+  u32 my_thread_index = vm->thread_index;
   int i, rv;
+  f64 now = vlib_time_now (vm);
+
+  SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, my_thread_index);
 
   /*
    *  Update TCP time
    */
-  tcp_update_time (vlib_time_now (vm), my_thread_index);
+  tcp_update_time (now, my_thread_index);
 
   /*
    * Get vpp queue events
@@ -362,34 +457,24 @@ skip_dequeue:
   n_events = vec_len (my_fifo_events);
   for (i = 0; i < n_events; i++)
     {
-      svm_fifo_t *f0;          /* $$$ prefetch 1 ahead maybe */
-      stream_session_t *s0;
-      u32 session_index0;
+      stream_session_t *s0;    /* $$$ prefetch 1 ahead maybe */
       session_fifo_event_t *e0;
 
       e0 = &my_fifo_events[i];
-      f0 = e0->fifo;
-      session_index0 = f0->server_session_index;
 
-      /* $$$ add multiple event queues, per vpp worker thread */
-      ASSERT (f0->server_thread_index == my_thread_index);
-
-      s0 = stream_session_get_if_valid (session_index0, my_thread_index);
-
-      if (CLIB_DEBUG && !s0)
+      switch (e0->event_type)
        {
-         clib_warning ("It's dead, Jim!");
-         continue;
-       }
-
-      if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
-       continue;
+       case FIFO_EVENT_APP_TX:
+         s0 = session_event_get_session (e0, my_thread_index);
 
-      ASSERT (s0->thread_index == my_thread_index);
+         if (CLIB_DEBUG && !s0)
+           {
+             clib_warning ("It's dead, Jim!");
+             continue;
+           }
 
-      switch (e0->event_type)
-       {
-       case FIFO_EVENT_SERVER_TX:
+         if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
+           continue;
          /* Spray packets in per session type frames, since they go to
           * different nodes */
          rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
@@ -400,10 +485,12 @@ skip_dequeue:
            goto done;
 
          break;
-       case FIFO_EVENT_SERVER_EXIT:
+       case FIFO_EVENT_DISCONNECT:
+         s0 = stream_session_get_from_handle (e0->session_handle);
          stream_session_disconnect (s0);
          break;
        case FIFO_EVENT_BUILTIN_RX:
+         s0 = session_event_get_session (e0, my_thread_index);
          svm_fifo_unset_event (s0->server_rx_fifo);
          /* Get session's server */
          app = application_get (s0->app_index);