session tcp: support pacer idle timeouts 18/23418/13
authorFlorin Coras <fcoras@cisco.com>
Thu, 14 Nov 2019 03:09:47 +0000 (19:09 -0800)
committerDave Barach <openvpp@barachs.net>
Wed, 20 Nov 2019 18:34:07 +0000 (18:34 +0000)
Type: feature

To avoid excessive bursts, pacer must be provided with an estimated rtt
for the connection. That's used to compute an idle timeout, i.e., time
after which the bucket is reset to 1 mtu due to inactivity. For now,
idle timeout is computed as 5% of the rtt.

Change-Id: Ia0b752fe7b4ad0ce97b477fb886b0133a2321541
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/plugins/unittest/tcp_test.c
src/vnet/session/session_node.c
src/vnet/session/transport.c
src/vnet/session/transport.h
src/vnet/session/transport_types.h
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp.h
src/vnet/tcp/tcp_input.c
src/vnet/tcp/tcp_output.c

index c14a74c..dbdb5a9 100644 (file)
@@ -849,7 +849,7 @@ tcp_test_delivery (vlib_main_t * vm, unformat_input_t * input)
   /* Init data structures */
   memset (tc, 0, sizeof (*tc));
   session_main.wrk[thread_index].last_vlib_time = 1;
-  transport_connection_tx_pacer_update (&tc->connection, rate);
+  transport_connection_tx_pacer_update (&tc->connection, rate, 1e6);
 
   tcp_bt_init (tc);
   bt = tc->bt;
index e1114b7..a072bfa 100644 (file)
@@ -873,16 +873,29 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
       return SESSION_TX_NO_DATA;
     }
 
-  ctx->snd_space = transport_connection_snd_space (ctx->tc, ctx->snd_mss);
+  ctx->snd_space = transport_connection_snd_space (ctx->tc);
 
   /* This flow queue is "empty" so it should be re-evaluated before
    * the ones that have data to send. */
-  if (ctx->snd_space == 0)
+  if (!ctx->snd_space)
     {
       session_evt_add_head_old (wrk, elt);
       return SESSION_TX_NO_DATA;
     }
 
+  if (transport_connection_is_tx_paced (ctx->tc))
+    {
+      u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
+      if (snd_space < TRANSPORT_PACER_MIN_BURST)
+       {
+         session_evt_add_head_old (wrk, elt);
+         return SESSION_TX_NO_DATA;
+       }
+      snd_space = clib_min (ctx->snd_space, snd_space);
+      ctx->snd_space = snd_space >= ctx->snd_mss ?
+       snd_space - snd_space % ctx->snd_mss : snd_space;
+    }
+
   /* Allow enqueuing of a new event */
   svm_fifo_unset_event (ctx->s->tx_fifo);
 
@@ -891,7 +904,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
 
   if (PREDICT_FALSE (!ctx->max_len_to_snd))
     {
-      transport_connection_tx_pacer_reset_bucket (ctx->tc);
+      transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
       return SESSION_TX_NO_DATA;
     }
 
index 902c740..c8c5835 100644 (file)
@@ -599,9 +599,9 @@ format_transport_pacer (u8 * s, va_list * args)
 
   now = transport_us_time_now (thread_index);
   diff = now - pacer->last_update;
-  s = format (s, "rate %lu bucket %lu t/p %.3f last_update %U",
+  s = format (s, "rate %lu bucket %lu t/p %.3f last_update %U idle %u",
              pacer->bytes_per_sec, pacer->bucket, pacer->tokens_per_period,
-             format_clib_us_time, diff);
+             format_clib_us_time, diff, pacer->idle_timeout_us);
   return s;
 }
 
@@ -611,15 +611,14 @@ spacer_max_burst (spacer_t * pacer, clib_us_time_t time_now)
   u64 n_periods = (time_now - pacer->last_update);
   u64 inc;
 
-  if (PREDICT_FALSE (n_periods > 5e4))
+  if (PREDICT_FALSE (n_periods > pacer->idle_timeout_us))
     {
       pacer->last_update = time_now;
       pacer->bucket = TRANSPORT_PACER_MIN_BURST;
       return TRANSPORT_PACER_MIN_BURST;
     }
 
-  if (n_periods > 0
-      && (inc = (f32) n_periods * pacer->tokens_per_period) > 10)
+  if ((inc = (f32) n_periods * pacer->tokens_per_period) > 10)
     {
       pacer->last_update = time_now;
       pacer->bucket = clib_min (pacer->bucket + inc, pacer->bytes_per_sec);
@@ -636,11 +635,14 @@ spacer_update_bucket (spacer_t * pacer, u32 bytes)
 }
 
 static inline void
-spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec)
+spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec,
+                     clib_us_time_t rtt)
 {
   ASSERT (rate_bytes_per_sec != 0);
   pacer->bytes_per_sec = rate_bytes_per_sec;
   pacer->tokens_per_period = rate_bytes_per_sec * CLIB_US_TIME_PERIOD;
+  pacer->idle_timeout_us = clib_max (rtt * TRANSPORT_PACER_IDLE_FACTOR,
+                                    TRANSPORT_PACER_MIN_IDLE);
 }
 
 static inline u64
@@ -658,17 +660,19 @@ spacer_reset (spacer_t * pacer, clib_us_time_t time_now, u64 bucket)
 
 void
 transport_connection_tx_pacer_reset (transport_connection_t * tc,
-                                    u64 rate_bytes_per_sec, u32 start_bucket)
+                                    u64 rate_bytes_per_sec, u32 start_bucket,
+                                    clib_us_time_t rtt)
 {
-  spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec);
+  spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec, rtt);
   spacer_reset (&tc->pacer, transport_us_time_now (tc->thread_index),
                start_bucket);
 }
 
 void
-transport_connection_tx_pacer_reset_bucket (transport_connection_t * tc)
+transport_connection_tx_pacer_reset_bucket (transport_connection_t * tc,
+                                           u32 bucket)
 {
-  spacer_reset (&tc->pacer, transport_us_time_now (tc->thread_index), 0);
+  spacer_reset (&tc->pacer, transport_us_time_now (tc->thread_index), bucket);
 }
 
 void
@@ -678,14 +682,14 @@ transport_connection_tx_pacer_init (transport_connection_t * tc,
 {
   tc->flags |= TRANSPORT_CONNECTION_F_IS_TX_PACED;
   transport_connection_tx_pacer_reset (tc, rate_bytes_per_sec,
-                                      initial_bucket);
+                                      initial_bucket, 1e6);
 }
 
 void
 transport_connection_tx_pacer_update (transport_connection_t * tc,
-                                     u64 bytes_per_sec)
+                                     u64 bytes_per_sec, clib_us_time_t rtt)
 {
-  spacer_set_pace_rate (&tc->pacer, bytes_per_sec);
+  spacer_set_pace_rate (&tc->pacer, bytes_per_sec, rtt);
 }
 
 u32
@@ -695,24 +699,6 @@ transport_connection_tx_pacer_burst (transport_connection_t * tc)
                           transport_us_time_now (tc->thread_index));
 }
 
-u32
-transport_connection_snd_space (transport_connection_t * tc, u16 mss)
-{
-  u32 snd_space, max_paced_burst;
-
-  snd_space = tp_vfts[tc->proto].send_space (tc);
-  if (snd_space && transport_connection_is_tx_paced (tc))
-    {
-      clib_us_time_t now = transport_us_time_now (tc->thread_index);
-      max_paced_burst = spacer_max_burst (&tc->pacer, now);
-      max_paced_burst =
-       (max_paced_burst < TRANSPORT_PACER_MIN_BURST) ? 0 : max_paced_burst;
-      snd_space = clib_min (snd_space, max_paced_burst);
-      return snd_space >= mss ? snd_space - snd_space % mss : snd_space;
-    }
-  return snd_space;
-}
-
 u64
 transport_connection_tx_pacer_rate (transport_connection_t * tc)
 {
index df246a9..adc695f 100644 (file)
@@ -22,6 +22,8 @@
 #define TRANSPORT_PACER_MIN_MSS        1460
 #define TRANSPORT_PACER_MIN_BURST      TRANSPORT_PACER_MIN_MSS
 #define TRANSPORT_PACER_MAX_BURST      (43 * TRANSPORT_PACER_MIN_MSS)
+#define TRANSPORT_PACER_MIN_IDLE       100
+#define TRANSPORT_PACER_IDLE_FACTOR    0.05
 
 typedef struct _transport_options_t
 {
@@ -148,6 +150,17 @@ transport_app_rx_evt (transport_proto_t tp, u32 conn_index, u32 thread_index)
   return tp_vfts[tp].app_rx_evt (tc);
 }
 
+/**
+ * Get maximum tx burst allowed for transport connection
+ *
+ * @param tc           transport connection
+ */
+static inline u32
+transport_connection_snd_space (transport_connection_t * tc)
+{
+  return tp_vfts[tc->proto].send_space (tc);
+}
+
 void transport_register_protocol (transport_proto_t transport_proto,
                                  const transport_proto_vft_t * vft,
                                  fib_protocol_t fib_proto, u32 output_node);
@@ -174,7 +187,8 @@ transport_elog_track_index (transport_connection_t * tc)
 
 void transport_connection_tx_pacer_reset (transport_connection_t * tc,
                                          u64 rate_bytes_per_sec,
-                                         u32 initial_bucket);
+                                         u32 initial_bucket,
+                                         clib_us_time_t rtt);
 /**
  * Initialize tx pacer for connection
  *
@@ -191,18 +205,13 @@ void transport_connection_tx_pacer_init (transport_connection_t * tc,
  *
  * @param tc                   transport connection
  * @param bytes_per_sec                new pacing rate
+ * @param rtt                  connection rtt that is used to compute
+ *                             inactivity time after which pacer bucket is
+ *                             reset to 1 mtu
  */
 void transport_connection_tx_pacer_update (transport_connection_t * tc,
-                                          u64 bytes_per_sec);
-
-/**
- * Get maximum tx burst allowed for transport connection
- *
- * @param tc           transport connection
- * @param time_now     current cpu time as returned by @ref clib_cpu_time_now
- * @param mss          transport's mss
- */
-u32 transport_connection_snd_space (transport_connection_t * tc, u16 mss);
+                                          u64 bytes_per_sec,
+                                          clib_us_time_t rtt);
 
 /**
  * Get tx pacer max burst
@@ -225,9 +234,10 @@ u64 transport_connection_tx_pacer_rate (transport_connection_t * tc);
  * Reset tx pacer bucket
  *
  * @param tc           transport connection
- * @param time_now     current cpu time
+ * @param bucket       value the bucket will be reset to
  */
-void transport_connection_tx_pacer_reset_bucket (transport_connection_t * tc);
+void transport_connection_tx_pacer_reset_bucket (transport_connection_t * tc,
+                                                u32 bucket);
 
 /**
  * Check if transport connection is paced
index b39a07c..214a638 100644 (file)
@@ -54,6 +54,7 @@ typedef struct _spacer
   u64 bucket;
   clib_us_time_t last_update;
   f32 tokens_per_period;
+  u32 idle_timeout_us;
 } spacer_t;
 
 #define TRANSPORT_CONN_ID_LEN  44
index c0b50ce..8672901 100644 (file)
@@ -1396,8 +1396,11 @@ tcp_connection_tx_pacer_update (tcp_connection_t * tc)
   if (!transport_connection_is_tx_paced (&tc->connection))
     return;
 
+  f64 srtt = clib_min ((f64) tc->srtt * TCP_TICK, tc->mrtt_us);
+
   transport_connection_tx_pacer_update (&tc->connection,
-                                       tcp_cc_get_pacing_rate (tc));
+                                       tcp_cc_get_pacing_rate (tc),
+                                       srtt * CLIB_US_TIME_FREQ);
 }
 
 void
@@ -1406,7 +1409,8 @@ tcp_connection_tx_pacer_reset (tcp_connection_t * tc, u32 window,
 {
   f64 srtt = clib_min ((f64) tc->srtt * TCP_TICK, tc->mrtt_us);
   u64 rate = (u64) window / srtt;
-  transport_connection_tx_pacer_reset (&tc->connection, rate, start_bucket);
+  transport_connection_tx_pacer_reset (&tc->connection, rate, start_bucket,
+                                      srtt * CLIB_US_TIME_FREQ);
 }
 
 static void
index 7dd88bf..e9247eb 100644 (file)
@@ -394,7 +394,6 @@ typedef struct _tcp_connection
   u32 prr_start;       /**< snd_una when prr starts */
   u32 rxt_delivered;   /**< Rxt bytes delivered during current cc event */
   u32 rxt_head;                /**< snd_una last time we re rxted the head */
-  u32 prev_dsegs_out;  /**< Number of dsegs after last ack */
   u32 tsecr_last_ack;  /**< Timestamp echoed to us in last healthy ACK */
   u32 snd_congestion;  /**< snd_una_max when congestion is detected */
   u32 tx_fifo_size;    /**< Tx fifo size. Used to constrain cwnd */
index 172dcd2..c94e5ba 100755 (executable)
@@ -575,21 +575,6 @@ tcp_estimate_initial_rtt (tcp_connection_t * tc)
   tcp_update_rto (tc);
 }
 
-always_inline u8
-tcp_recovery_no_snd_space (tcp_connection_t * tc)
-{
-  u32 space;
-
-  ASSERT (tcp_in_cong_recovery (tc));
-
-  if (tcp_in_recovery (tc))
-    space = tcp_available_output_snd_space (tc);
-  else
-    space = tcp_fastrecovery_prr_snd_space (tc);
-
-  return (space < tc->snd_mss + tc->burst_acked);
-}
-
 /**
  * Dequeue bytes for connections that have received acks in last burst
  */
@@ -610,33 +595,26 @@ tcp_handle_postponed_dequeues (tcp_worker_ctx_t * wrk)
       tc = tcp_connection_get (pending_deq_acked[i], thread_index);
       tc->flags &= ~TCP_CONN_DEQ_PENDING;
 
-      if (tc->burst_acked)
-       {
-         /* Dequeue the newly ACKed bytes */
-         session_tx_fifo_dequeue_drop (&tc->connection, tc->burst_acked);
-         tcp_validate_txf_size (tc, tc->snd_una_max - tc->snd_una);
-
-         if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING))
-           {
-             if (seq_leq (tc->psh_seq, tc->snd_una))
-               tc->flags &= ~TCP_CONN_PSH_PENDING;
-           }
+      if (PREDICT_FALSE (!tc->burst_acked))
+       continue;
 
-         /* If everything has been acked, stop retransmit timer
-          * otherwise update. */
-         tcp_retransmit_timer_update (tc);
+      /* Dequeue the newly ACKed bytes */
+      session_tx_fifo_dequeue_drop (&tc->connection, tc->burst_acked);
+      tcp_validate_txf_size (tc, tc->snd_una_max - tc->snd_una);
 
-         /* Update pacer based on our new cwnd estimate */
-         tcp_connection_tx_pacer_update (tc);
+      if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING))
+       {
+         if (seq_leq (tc->psh_seq, tc->snd_una))
+           tc->flags &= ~TCP_CONN_PSH_PENDING;
        }
 
-      /* Reset the pacer if we've been idle, i.e., no data sent or if
-       * we're in recovery and snd space constrained */
-      if (tc->data_segs_out == tc->prev_dsegs_out
-         || (tcp_in_cong_recovery (tc) && tcp_recovery_no_snd_space (tc)))
-       transport_connection_tx_pacer_reset_bucket (&tc->connection);
+      /* If everything has been acked, stop retransmit timer
+       * otherwise update. */
+      tcp_retransmit_timer_update (tc);
+
+      /* Update pacer based on our new cwnd estimate */
+      tcp_connection_tx_pacer_update (tc);
 
-      tc->prev_dsegs_out = tc->data_segs_out;
       tc->burst_acked = 0;
     }
   _vec_len (wrk->pending_deq_acked) = 0;
@@ -1623,10 +1601,11 @@ process_ack:
   if (tc->cfg_flags & TCP_CFG_F_RATE_SAMPLE)
     tcp_bt_sample_delivery_rate (tc, &rs);
 
-  tcp_program_dequeue (wrk, tc);
-
   if (tc->bytes_acked)
-    tcp_update_rtt (tc, &rs, vnet_buffer (b)->tcp.ack_number);
+    {
+      tcp_program_dequeue (wrk, tc);
+      tcp_update_rtt (tc, &rs, vnet_buffer (b)->tcp.ack_number);
+    }
 
   TCP_EVT (TCP_EVT_ACK_RCVD, tc);
 
index 64e3b1a..c06bbf1 100644 (file)
@@ -411,7 +411,7 @@ tcp_update_burst_snd_vars (tcp_connection_t * tc)
   if (tc->snd_una == tc->snd_nxt)
     {
       tcp_cc_event (tc, TCP_CC_EVT_START_TX);
-      tcp_connection_tx_pacer_reset (tc, tc->cwnd, TRANSPORT_PACER_MIN_MSS);
+      tcp_connection_tx_pacer_reset (tc, tc->cwnd, TRANSPORT_PACER_MIN_BURST);
     }
 }
 
@@ -1875,9 +1875,9 @@ static int
 tcp_retransmit_sack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc,
                     u32 burst_size)
 {
-  u8 snd_limited = 0, can_rescue = 0, reset_pacer = 0;
   u32 n_written = 0, offset, max_bytes, n_segs = 0;
-  u32 bi, max_deq, burst_bytes, sent_bytes;
+  u8 snd_limited = 0, can_rescue = 0;
+  u32 bi, max_deq, burst_bytes;
   sack_scoreboard_hole_t *hole;
   vlib_main_t *vm = wrk->vm;
   vlib_buffer_t *b = 0;
@@ -1900,12 +1900,7 @@ tcp_retransmit_sack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc,
     snd_space = tcp_fastrecovery_prr_snd_space (tc);
 
   if (snd_space < tc->snd_mss)
-    {
-      reset_pacer = burst_bytes > tc->snd_mss;
-      goto done;
-    }
-
-  reset_pacer = snd_space < burst_bytes;
+    goto done;
 
   sb = &tc->sack_sb;
 
@@ -2021,17 +2016,7 @@ tcp_retransmit_sack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc,
 
 done:
 
-  if (reset_pacer)
-    {
-      transport_connection_tx_pacer_reset_bucket (&tc->connection);
-    }
-  else
-    {
-      sent_bytes = clib_min (n_segs * tc->snd_mss, burst_bytes);
-      transport_connection_tx_pacer_update_bytes (&tc->connection,
-                                                 sent_bytes);
-    }
-
+  transport_connection_tx_pacer_reset_bucket (&tc->connection, 0);
   return n_segs;
 }