tcp/session: add tx pacer 81/12681/17
authorFlorin Coras <fcoras@cisco.com>
Tue, 22 May 2018 00:47:40 +0000 (17:47 -0700)
committerDamjan Marion <dmarion@me.com>
Thu, 25 Oct 2018 10:13:18 +0000 (10:13 +0000)
Adds tx pacing infrastructure for transport protocols that want to use
it. Particularly useful for connections with non-negligible rtt and
constrained network throughput as it avoids large tx bursts that lead to
local interface tx or network drops.

By default the pacer is disabled. To enabled it for tcp, add tx-pacing
to tcp's startup conf. We are still slightly inefficient in the handling
of incoming packets in established state so the pacer slightly affect
maximum throughput in low lacency scenarios.

Change-Id: Id445b2ffcd64cce015f75b773f7d722faa0f7ca9
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c
src/vnet/session/transport.c
src/vnet/session/transport.h
src/vnet/session/transport_interface.h
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp.h
src/vnet/tcp/tcp_input.c
src/vnet/tcp/tcp_output.c

index 189c537..1d421b9 100644 (file)
@@ -1355,6 +1355,8 @@ session_manager_main_enable (vlib_main_t * vm)
   vec_validate (smm->free_event_vector, num_threads - 1);
   vec_validate (smm->vpp_event_queues, num_threads - 1);
   vec_validate (smm->peekers_rw_locks, num_threads - 1);
+  vec_validate (smm->dispatch_period, num_threads - 1);
+  vec_validate (smm->last_vlib_time, num_threads - 1);
   vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES);
 
   for (i = 0; i < TRANSPORT_N_PROTO; i++)
@@ -1373,6 +1375,9 @@ session_manager_main_enable (vlib_main_t * vm)
       _vec_len (smm->pending_event_vector[i]) = 0;
       vec_validate (smm->pending_disconnects[i], 0);
       _vec_len (smm->pending_disconnects[i]) = 0;
+
+      smm->last_vlib_time[i] = vlib_time_now (vlib_mains[i]);
+
       if (num_threads > 1)
        clib_rwlock_init (&smm->peekers_rw_locks[i]);
     }
@@ -1419,7 +1424,7 @@ session_manager_main_enable (vlib_main_t * vm)
 
   /* Enable transports */
   transport_enable_disable (vm, 1);
-
+  transport_init_tx_pacers_period ();
   return 0;
 }
 
index 914e058..f0aa36c 100644 (file)
@@ -215,6 +215,12 @@ struct _session_manager_main
   /** per-worker session context */
   session_tx_context_t *ctx;
 
+  /** Our approximation of a "complete" dispatch loop period */
+  f64 *dispatch_period;
+
+  /** vlib_time_now last time around the track */
+  f64 *last_vlib_time;
+
   /** vpp fifo event queue */
   svm_msg_q_t **vpp_event_queues;
 
@@ -494,6 +500,18 @@ transport_tx_fifo_size (transport_connection_t * tc)
   return s->server_tx_fifo->nitems;
 }
 
+always_inline f64
+transport_dispatch_period (u32 thread_index)
+{
+  return session_manager_main.dispatch_period[thread_index];
+}
+
+always_inline f64
+transport_time_now (u32 thread_index)
+{
+  return session_manager_main.last_vlib_time[thread_index];
+}
+
 always_inline u32
 session_get_index (stream_session_t * s)
 {
index c1aea67..eb97439 100644 (file)
@@ -568,7 +568,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   ctx->transport_vft = transport_protocol_get_vft (tp);
   ctx->tc = session_tx_get_transport (ctx, peek_data);
   ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
-  ctx->snd_space = ctx->transport_vft->send_space (ctx->tc);
+  ctx->snd_space =
+    transport_connection_max_tx_burst (ctx->tc, vm->clib_time.last_cpu_time);
   if (ctx->snd_space == 0 || ctx->snd_mss == 0)
     {
       vec_add1 (smm->pending_event_vector[thread_index], *e);
@@ -685,6 +686,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
   *n_tx_packets += ctx->n_segs_per_evt;
+  transport_connection_update_tx_stats (ctx->tc, ctx->max_len_to_snd);
   vlib_put_next_frame (vm, node, next_index, n_left_to_next);
 
   /* If we couldn't dequeue all bytes mark as partially read */
@@ -743,6 +745,17 @@ session_event_get_session (session_event_t * e, u8 thread_index)
   return session_get_if_valid (e->fifo->master_session_index, thread_index);
 }
 
+static void
+session_update_dispatch_period (session_manager_main_t * smm, f64 now,
+                               u32 thread_index)
+{
+  f64 sample, prev_period = smm->dispatch_period[thread_index], a = 0.8;
+
+  sample = now - smm->last_vlib_time[thread_index];
+  smm->dispatch_period[thread_index] = a * sample + (1 - a) * prev_period;
+  smm->last_vlib_time[thread_index] = now;
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
@@ -764,6 +777,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   /*
    *  Update transport time
    */
+  session_update_dispatch_period (smm, now, thread_index);
   transport_update_time (now, thread_index);
 
   /*
index d74a218..c333c41 100644 (file)
@@ -42,6 +42,13 @@ static transport_endpoint_t *local_endpoints;
  */
 static clib_spinlock_t local_endpoints_lock;
 
+/*
+ * Period used by transport pacers. Initialized by session layer
+ */
+static double transport_pacer_period;
+
+#define TRANSPORT_PACER_MIN_MSS 1460
+
 u8 *
 format_transport_proto (u8 * s, va_list * args)
 {
@@ -376,6 +383,110 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt,
   return 0;
 }
 
+#define SPACER_CPU_TICKS_PER_PERIOD_SHIFT 10
+#define SPACER_CPU_TICKS_PER_PERIOD (1 << SPACER_CPU_TICKS_PER_PERIOD_SHIFT)
+
+u8 *
+format_transport_pacer (u8 * s, va_list * args)
+{
+  spacer_t *pacer = va_arg (*args, spacer_t *);
+
+  s = format (s, "bucket %u max_burst %u tokens/period %.3f last_update %x",
+             pacer->bucket, pacer->max_burst_size, pacer->tokens_per_period,
+             pacer->last_update);
+  return s;
+}
+
+static inline u32
+spacer_max_burst (spacer_t * pacer, u64 norm_time_now)
+{
+  u64 n_periods = norm_time_now - pacer->last_update;
+
+  pacer->last_update = norm_time_now;
+  pacer->bucket += n_periods * pacer->tokens_per_period;
+  return clib_min (pacer->bucket, pacer->max_burst_size);
+}
+
+static inline void
+spacer_update_bucket (spacer_t * pacer, u32 bytes)
+{
+  ASSERT (pacer->bucket >= bytes);
+  pacer->bucket -= bytes;
+}
+
+static inline void
+spacer_update_max_burst_size (spacer_t * pacer, u32 max_burst_bytes)
+{
+  pacer->max_burst_size = clib_max (max_burst_bytes, TRANSPORT_PACER_MIN_MSS);
+}
+
+static inline void
+spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec)
+{
+  ASSERT (rate_bytes_per_sec != 0);
+  pacer->tokens_per_period = rate_bytes_per_sec / transport_pacer_period;
+}
+
+void
+transport_connection_tx_pacer_init (transport_connection_t * tc,
+                                   u32 rate_bytes_per_sec, u32 burst_bytes)
+{
+  vlib_main_t *vm = vlib_get_main ();
+  u64 time_now = vm->clib_time.last_cpu_time;
+  spacer_t *pacer = &tc->pacer;
+
+  tc->flags |= TRANSPORT_CONNECTION_F_IS_TX_PACED;
+  spacer_update_max_burst_size (&tc->pacer, burst_bytes);
+  spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec);
+  pacer->last_update = time_now >> SPACER_CPU_TICKS_PER_PERIOD_SHIFT;
+  pacer->bucket = burst_bytes;
+}
+
+void
+transport_connection_tx_pacer_update (transport_connection_t * tc,
+                                     u64 bytes_per_sec)
+{
+  u32 burst_size;
+
+  burst_size = bytes_per_sec * transport_dispatch_period (tc->thread_index);
+  spacer_set_pace_rate (&tc->pacer, bytes_per_sec);
+  spacer_update_max_burst_size (&tc->pacer, burst_size);
+}
+
+u32
+transport_connection_max_tx_burst (transport_connection_t * tc, u64 time_now)
+{
+  u32 snd_space, max_paced_burst;
+  u32 mss;
+
+  snd_space = tp_vfts[tc->proto].send_space (tc);
+  if (transport_connection_is_tx_paced (tc))
+    {
+      time_now >>= SPACER_CPU_TICKS_PER_PERIOD_SHIFT;
+      max_paced_burst = spacer_max_burst (&tc->pacer, time_now);
+      mss = tp_vfts[tc->proto].send_mss (tc);
+      max_paced_burst = (max_paced_burst < mss) ? 0 : max_paced_burst;
+      snd_space = clib_min (snd_space, max_paced_burst);
+      snd_space = snd_space - snd_space % mss;
+    }
+  return snd_space;
+}
+
+void
+transport_connection_update_tx_stats (transport_connection_t * tc, u32 bytes)
+{
+  tc->stats.tx_bytes += bytes;
+  if (transport_connection_is_tx_paced (tc))
+    spacer_update_bucket (&tc->pacer, bytes);
+}
+
+void
+transport_init_tx_pacers_period (void)
+{
+  f64 cpu_freq = os_cpu_clock_frequency ();
+  transport_pacer_period = cpu_freq / SPACER_CPU_TICKS_PER_PERIOD;
+}
+
 void
 transport_update_time (f64 time_now, u8 thread_index)
 {
index e29f3ca..0736669 100644 (file)
 /*
  * Protocol independent transport properties associated to a session
  */
+typedef struct _transport_stats
+{
+  u64 tx_bytes;
+} transport_stats_t;
+
+typedef struct _spacer
+{
+  u64 bucket;
+  u32 max_burst_size;
+  f32 tokens_per_period;
+  u64 last_update;
+} spacer_t;
+
 typedef struct _transport_connection
 {
   /** Connection ID */
@@ -54,6 +67,10 @@ typedef struct _transport_connection
   /*fib_node_index_t rmt_fei;
      dpo_id_t rmt_dpo; */
 
+  u8 flags;                    /**< Transport specific flags */
+  transport_stats_t stats;     /**< Transport connection stats */
+  spacer_t pacer;              /**< Simple transport pacer */
+
 #if TRANSPORT_DEBUG
   elog_track_t elog_track;     /**< Event logging */
   u32 cc_stat_tstamp;          /**< CC stats timestamp */
@@ -79,8 +96,13 @@ typedef struct _transport_connection
 #define c_rmt_fei connection.rmt_fei
 #define c_rmt_dpo connection.rmt_dpo
 #define c_opaque_id connection.opaque_conn_id
+#define c_stats connection.stats
+#define c_pacer connection.pacer
+#define c_flags connection.flags
 } transport_connection_t;
 
+#define TRANSPORT_CONNECTION_F_IS_TX_PACED     1 << 0
+
 typedef enum _transport_proto
 {
   TRANSPORT_PROTO_TCP,
index 745a7db..ec9bd43 100644 (file)
@@ -102,6 +102,67 @@ transport_tx_fn_type_t transport_protocol_tx_fn_type (transport_proto_t tp);
 void transport_update_time (f64 time_now, u8 thread_index);
 void transport_enable_disable (vlib_main_t * vm, u8 is_en);
 
+/**
+ * Initialize tx pacer for connection
+ *
+ * @param tc                           transport connection
+ * @param rate_bytes_per_second                initial byte rate
+ * @param burst_bytes                  initial burst size in bytes
+ */
+void transport_connection_tx_pacer_init (transport_connection_t * tc,
+                                        u32 rate_bytes_per_sec,
+                                        u32 burst_bytes);
+
+/**
+ * Update tx pacer pacing rate
+ *
+ * @param tc                   transport connection
+ * @param bytes_per_sec                new pacing rate
+ */
+void transport_connection_tx_pacer_update (transport_connection_t * tc,
+                                          u64 bytes_per_sec);
+
+/**
+ * Get maximum tx burst allowed for transport connection
+ *
+ * @param tc           transport connection
+ * @param time_now     current cpu time as returned by @ref clib_cpu_time_now
+ */
+u32 transport_connection_max_tx_burst (transport_connection_t * tc,
+                                      u64 time_now);
+
+/**
+ * Initialize period for tx pacers
+ *
+ * Defines a unit of time with respect to number of cpu cycles that is to
+ * be used by all tx pacers.
+ */
+void transport_init_tx_pacers_period (void);
+
+/**
+ * Check if transport connection is paced
+ */
+always_inline u8
+transport_connection_is_tx_paced (transport_connection_t * tc)
+{
+  return (tc->flags & TRANSPORT_CONNECTION_F_IS_TX_PACED);
+}
+
+u8 *format_transport_pacer (u8 * s, va_list * args);
+
+/**
+ * Update tx byte stats for transport connection
+ *
+ * If tx pacing is enabled, this also updates pacer bucket to account for the
+ * amount of bytes that have been sent.
+ *
+ * @param tc           transport connection
+ * @param pkts         packets recently sent
+ * @param bytes                bytes recently sent
+ */
+void transport_connection_update_tx_stats (transport_connection_t * tc,
+                                          u32 bytes);
+
 #endif /* SRC_VNET_SESSION_TRANSPORT_INTERFACE_H_ */
 
 /*
index cb05b8c..626b499 100644 (file)
@@ -555,6 +555,16 @@ tcp_init_snd_vars (tcp_connection_t * tc)
   tc->snd_una_max = tc->snd_nxt;
 }
 
+void
+tcp_enable_pacing (tcp_connection_t * tc)
+{
+  u32 max_burst, byte_rate;
+  max_burst = 16 * tc->snd_mss;
+  byte_rate = 2 << 16;
+  transport_connection_tx_pacer_init (&tc->connection, byte_rate, max_burst);
+  tc->mrtt_us = (u32) ~ 0;
+}
+
 /** Initialize tcp connection variables
  *
  * Should be called after having received a msg from the peer, i.e., a SYN or
@@ -572,7 +582,11 @@ tcp_connection_init_vars (tcp_connection_t * tc)
   if (!tc->c_is_ip4 && ip6_address_is_link_local_unicast (&tc->c_rmt_ip6))
     tcp_add_del_adjacency (tc, 1);
 
-  //  tcp_connection_fib_attach (tc);
+  /*  tcp_connection_fib_attach (tc); */
+
+  if (transport_connection_is_tx_paced (&tc->connection)
+      || tcp_main.tx_pacing)
+    tcp_enable_pacing (tc);
 }
 
 static int
@@ -784,14 +798,19 @@ format_tcp_vars (u8 * s, va_list * args)
   s = format (s, " limited_transmit %u\n", tc->limited_transmit - tc->iss);
   s = format (s, " tsecr %u tsecr_last_ack %u\n", tc->rcv_opts.tsecr,
              tc->tsecr_last_ack);
-  s = format (s, " rto %u rto_boff %u srtt %u rttvar %u rtt_ts %u ", tc->rto,
-             tc->rto_boff, tc->srtt, tc->rttvar, tc->rtt_ts);
+  s = format (s, " rto %u rto_boff %u srtt %u rttvar %u rtt_ts %2.5f ",
+             tc->rto, tc->rto_boff, tc->srtt, tc->rttvar, tc->rtt_ts);
   s = format (s, "rtt_seq %u\n", tc->rtt_seq);
   s = format (s, " tsval_recent %u tsval_recent_age %u\n", tc->tsval_recent,
              tcp_time_now () - tc->tsval_recent_age);
   if (tc->state >= TCP_STATE_ESTABLISHED)
-    s = format (s, " scoreboard: %U\n", format_tcp_scoreboard, &tc->sack_sb,
-               tc);
+    {
+      s = format (s, " scoreboard: %U\n", format_tcp_scoreboard, &tc->sack_sb,
+                 tc);
+      if (transport_connection_is_tx_paced (&tc->connection))
+       s = format (s, " pacer: %U\n", format_transport_pacer,
+                   &tc->connection.pacer);
+    }
   if (vec_len (tc->snd_sacks))
     s = format (s, " sacks tx: %U\n", format_tcp_sacks, tc);
 
@@ -1129,6 +1148,19 @@ const static transport_proto_vft_t tcp_proto = {
 };
 /* *INDENT-ON* */
 
+void
+tcp_update_pacer (tcp_connection_t * tc)
+{
+  f64 srtt;
+
+  if (!transport_connection_is_tx_paced (&tc->connection))
+    return;
+
+  srtt = clib_min ((f64) tc->srtt * TCP_TICK, tc->mrtt_us);
+  transport_connection_tx_pacer_update (&tc->connection,
+                                       ((f64) tc->cwnd) / srtt);
+}
+
 static void
 tcp_timer_keep_handler (u32 conn_index)
 {
@@ -1408,6 +1440,8 @@ tcp_config_fn (vlib_main_t * vm, unformat_input_t * input)
       else if (unformat (input, "max-rx-fifo %U", unformat_memory_size,
                         &tm->max_rx_fifo))
        ;
+      else if (unformat (input, "tx-pacing"))
+       tm->tx_pacing = 1;
       else
        return clib_error_return (0, "unknown input `%U'",
                                  format_unformat_error, input);
index a036072..4ba3d5e 100644 (file)
@@ -160,7 +160,7 @@ enum
 };
 
 #define TCP_SCOREBOARD_TRACE (0)
-#define TCP_MAX_SACK_BLOCKS 15 /**< Max number of SACK blocks stored */
+#define TCP_MAX_SACK_BLOCKS 32 /**< Max number of SACK blocks stored */
 #define TCP_INVALID_SACK_HOLE_INDEX ((u32)~0)
 
 typedef struct _scoreboard_trace_elt
@@ -319,8 +319,9 @@ typedef struct _tcp_connection
   u32 rto_boff;                /**< Index for RTO backoff */
   u32 srtt;            /**< Smoothed RTT */
   u32 rttvar;          /**< Smoothed mean RTT difference. Approximates variance */
-  u32 rtt_ts;          /**< Timestamp for tracked ACK */
   u32 rtt_seq;         /**< Sequence number for tracked ACK */
+  f64 rtt_ts;          /**< Timestamp for tracked ACK */
+  f64 mrtt_us;         /**< High precision mrtt from tracked acks */
 
   u16 mss;             /**< Our max seg size that includes options */
   u32 limited_transmit;        /**< snd_nxt when limited transmit starts */
@@ -444,6 +445,9 @@ typedef struct _tcp_main
   u32 last_v6_address_rotor;
   ip6_address_t *ip6_src_addresses;
 
+  /** Enable tx pacing for new connections */
+  u8 tx_pacing;
+
   u8 punt_unknown4;
   u8 punt_unknown6;
 
@@ -692,6 +696,12 @@ tcp_time_now (void)
   return tcp_main.wrk_ctx[vlib_get_thread_index ()].time_now;
 }
 
+always_inline f64
+tcp_time_now_us (u32 thread_index)
+{
+  return transport_time_now (thread_index);
+}
+
 always_inline u32
 tcp_set_time_now (u32 thread_index)
 {
@@ -706,6 +716,15 @@ void tcp_connection_timers_init (tcp_connection_t * tc);
 void tcp_connection_timers_reset (tcp_connection_t * tc);
 void tcp_init_snd_vars (tcp_connection_t * tc);
 void tcp_connection_init_vars (tcp_connection_t * tc);
+void tcp_update_pacer (tcp_connection_t * tc);
+
+always_inline void
+tcp_cc_rcv_ack (tcp_connection_t * tc)
+{
+  tc->cc_algo->rcv_ack (tc);
+  tcp_update_pacer (tc);
+  tc->tsecr_last_ack = tc->rcv_opts.tsecr;
+}
 
 always_inline void
 tcp_connection_force_ack (tcp_connection_t * tc, vlib_buffer_t * b)
index 39a538b..ac0e996 100644 (file)
@@ -462,14 +462,15 @@ tcp_update_rtt (tcp_connection_t * tc, u32 ack)
 
   if (tc->rtt_ts && seq_geq (ack, tc->rtt_seq))
     {
-      mrtt = tcp_time_now () - tc->rtt_ts;
+      tc->mrtt_us = tcp_time_now_us (tc->c_thread_index) - tc->rtt_ts;
+      mrtt = clib_max ((u32) (tc->mrtt_us * THZ), 1);
     }
   /* As per RFC7323 TSecr can be used for RTTM only if the segment advances
    * snd_una, i.e., the left side of the send window:
    * seq_lt (tc->snd_una, ack). This is a condition for calling update_rtt */
   else if (tcp_opts_tstamp (&tc->rcv_opts) && tc->rcv_opts.tsecr)
     {
-      mrtt = tcp_time_now () - tc->rcv_opts.tsecr;
+      mrtt = clib_max (tcp_time_now () - tc->rcv_opts.tsecr, 1);
     }
 
   /* Ignore dubious measurements */
@@ -1079,12 +1080,14 @@ tcp_cc_fastrecovery_exit (tcp_connection_t * tc)
   tc->snd_nxt = tc->snd_una_max;
   tc->snd_rxt_bytes = 0;
 
-  /* HACK: since we don't have an output pacer, force slow start */
-  tc->cwnd = 20 * tc->snd_mss;
-
   tcp_fastrecovery_off (tc);
   tcp_fastrecovery_1_smss_off (tc);
   tcp_fastrecovery_first_off (tc);
+
+  /* Update pacer because our cwnd changed. Also makes sure
+   * that we recompute the max burst size */
+  tcp_update_pacer (tc);
+
   TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 3);
 }
 
@@ -1153,8 +1156,7 @@ tcp_cc_update (tcp_connection_t * tc, vlib_buffer_t * b)
   ASSERT (!tcp_in_cong_recovery (tc) || tcp_is_lost_fin (tc));
 
   /* Congestion avoidance */
-  tc->cc_algo->rcv_ack (tc);
-  tc->tsecr_last_ack = tc->rcv_opts.tsecr;
+  tcp_cc_rcv_ack (tc);
 
   /* If a cumulative ack, make sure dupacks is 0 */
   tc->rcv_dupacks = 0;
@@ -1372,8 +1374,7 @@ partial_ack:
       tc->snd_nxt = tc->snd_una_max;
 
       /* Treat as congestion avoidance ack */
-      tc->cc_algo->rcv_ack (tc);
-      tc->tsecr_last_ack = tc->rcv_opts.tsecr;
+      tcp_cc_rcv_ack (tc);
       return;
     }
 
@@ -1391,8 +1392,7 @@ partial_ack:
   /* Post RTO timeout don't try anything fancy */
   if (tcp_in_recovery (tc))
     {
-      tc->cc_algo->rcv_ack (tc);
-      tc->tsecr_last_ack = tc->rcv_opts.tsecr;
+      tcp_cc_rcv_ack (tc);
       transport_add_tx_event (&tc->connection);
       return;
     }
index 2e6036b..f14a612 100644 (file)
@@ -1202,7 +1202,7 @@ tcp_push_header (tcp_connection_t * tc, vlib_buffer_t * b)
   /* If not tracking an ACK, start tracking */
   if (tc->rtt_ts == 0 && !tcp_in_cong_recovery (tc))
     {
-      tc->rtt_ts = tcp_time_now ();
+      tc->rtt_ts = tcp_time_now_us (tc->c_thread_index);
       tc->rtt_seq = tc->snd_nxt;
     }
   if (PREDICT_FALSE (!tcp_timer_is_active (tc, TCP_TIMER_RETRANSMIT)))