vec_validate (smm->free_event_vector, num_threads - 1);
vec_validate (smm->vpp_event_queues, num_threads - 1);
vec_validate (smm->peekers_rw_locks, num_threads - 1);
+ vec_validate (smm->dispatch_period, num_threads - 1);
+ vec_validate (smm->last_vlib_time, num_threads - 1);
vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES);
for (i = 0; i < TRANSPORT_N_PROTO; i++)
_vec_len (smm->pending_event_vector[i]) = 0;
vec_validate (smm->pending_disconnects[i], 0);
_vec_len (smm->pending_disconnects[i]) = 0;
+
+ smm->last_vlib_time[i] = vlib_time_now (vlib_mains[i]);
+
if (num_threads > 1)
clib_rwlock_init (&smm->peekers_rw_locks[i]);
}
/* Enable transports */
transport_enable_disable (vm, 1);
-
+ transport_init_tx_pacers_period ();
return 0;
}
/** per-worker session context */
session_tx_context_t *ctx;
+ /** Our approximation of a "complete" dispatch loop period */
+ f64 *dispatch_period;
+
+ /** vlib_time_now last time around the track */
+ f64 *last_vlib_time;
+
/** vpp fifo event queue */
svm_msg_q_t **vpp_event_queues;
return s->server_tx_fifo->nitems;
}
+always_inline f64
+transport_dispatch_period (u32 thread_index)
+{
+ return session_manager_main.dispatch_period[thread_index];
+}
+
+always_inline f64
+transport_time_now (u32 thread_index)
+{
+ return session_manager_main.last_vlib_time[thread_index];
+}
+
always_inline u32
session_get_index (stream_session_t * s)
{
ctx->transport_vft = transport_protocol_get_vft (tp);
ctx->tc = session_tx_get_transport (ctx, peek_data);
ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
- ctx->snd_space = ctx->transport_vft->send_space (ctx->tc);
+ ctx->snd_space =
+ transport_connection_max_tx_burst (ctx->tc, vm->clib_time.last_cpu_time);
if (ctx->snd_space == 0 || ctx->snd_mss == 0)
{
vec_add1 (smm->pending_event_vector[thread_index], *e);
_vec_len (smm->tx_buffers[thread_index]) = n_bufs;
*n_tx_packets += ctx->n_segs_per_evt;
+ transport_connection_update_tx_stats (ctx->tc, ctx->max_len_to_snd);
vlib_put_next_frame (vm, node, next_index, n_left_to_next);
/* If we couldn't dequeue all bytes mark as partially read */
return session_get_if_valid (e->fifo->master_session_index, thread_index);
}
+static void
+session_update_dispatch_period (session_manager_main_t * smm, f64 now,
+ u32 thread_index)
+{
+ f64 sample, prev_period = smm->dispatch_period[thread_index], a = 0.8;
+
+ sample = now - smm->last_vlib_time[thread_index];
+ smm->dispatch_period[thread_index] = a * sample + (1 - a) * prev_period;
+ smm->last_vlib_time[thread_index] = now;
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
/*
* Update transport time
*/
+ session_update_dispatch_period (smm, now, thread_index);
transport_update_time (now, thread_index);
/*
*/
static clib_spinlock_t local_endpoints_lock;
+/*
+ * Period used by transport pacers. Initialized by session layer
+ */
+static double transport_pacer_period;
+
+#define TRANSPORT_PACER_MIN_MSS 1460
+
u8 *
format_transport_proto (u8 * s, va_list * args)
{
return 0;
}
+#define SPACER_CPU_TICKS_PER_PERIOD_SHIFT 10
+#define SPACER_CPU_TICKS_PER_PERIOD (1 << SPACER_CPU_TICKS_PER_PERIOD_SHIFT)
+
+u8 *
+format_transport_pacer (u8 * s, va_list * args)
+{
+ spacer_t *pacer = va_arg (*args, spacer_t *);
+
+ s = format (s, "bucket %u max_burst %u tokens/period %.3f last_update %x",
+ pacer->bucket, pacer->max_burst_size, pacer->tokens_per_period,
+ pacer->last_update);
+ return s;
+}
+
+static inline u32
+spacer_max_burst (spacer_t * pacer, u64 norm_time_now)
+{
+ u64 n_periods = norm_time_now - pacer->last_update;
+
+ pacer->last_update = norm_time_now;
+ pacer->bucket += n_periods * pacer->tokens_per_period;
+ return clib_min (pacer->bucket, pacer->max_burst_size);
+}
+
+static inline void
+spacer_update_bucket (spacer_t * pacer, u32 bytes)
+{
+ ASSERT (pacer->bucket >= bytes);
+ pacer->bucket -= bytes;
+}
+
+static inline void
+spacer_update_max_burst_size (spacer_t * pacer, u32 max_burst_bytes)
+{
+ pacer->max_burst_size = clib_max (max_burst_bytes, TRANSPORT_PACER_MIN_MSS);
+}
+
+static inline void
+spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec)
+{
+ ASSERT (rate_bytes_per_sec != 0);
+ pacer->tokens_per_period = rate_bytes_per_sec / transport_pacer_period;
+}
+
+void
+transport_connection_tx_pacer_init (transport_connection_t * tc,
+ u32 rate_bytes_per_sec, u32 burst_bytes)
+{
+ vlib_main_t *vm = vlib_get_main ();
+ u64 time_now = vm->clib_time.last_cpu_time;
+ spacer_t *pacer = &tc->pacer;
+
+ tc->flags |= TRANSPORT_CONNECTION_F_IS_TX_PACED;
+ spacer_update_max_burst_size (&tc->pacer, burst_bytes);
+ spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec);
+ pacer->last_update = time_now >> SPACER_CPU_TICKS_PER_PERIOD_SHIFT;
+ pacer->bucket = burst_bytes;
+}
+
+void
+transport_connection_tx_pacer_update (transport_connection_t * tc,
+ u64 bytes_per_sec)
+{
+ u32 burst_size;
+
+ burst_size = bytes_per_sec * transport_dispatch_period (tc->thread_index);
+ spacer_set_pace_rate (&tc->pacer, bytes_per_sec);
+ spacer_update_max_burst_size (&tc->pacer, burst_size);
+}
+
+u32
+transport_connection_max_tx_burst (transport_connection_t * tc, u64 time_now)
+{
+ u32 snd_space, max_paced_burst;
+ u32 mss;
+
+ snd_space = tp_vfts[tc->proto].send_space (tc);
+ if (transport_connection_is_tx_paced (tc))
+ {
+ time_now >>= SPACER_CPU_TICKS_PER_PERIOD_SHIFT;
+ max_paced_burst = spacer_max_burst (&tc->pacer, time_now);
+ mss = tp_vfts[tc->proto].send_mss (tc);
+ max_paced_burst = (max_paced_burst < mss) ? 0 : max_paced_burst;
+ snd_space = clib_min (snd_space, max_paced_burst);
+ snd_space = snd_space - snd_space % mss;
+ }
+ return snd_space;
+}
+
+void
+transport_connection_update_tx_stats (transport_connection_t * tc, u32 bytes)
+{
+ tc->stats.tx_bytes += bytes;
+ if (transport_connection_is_tx_paced (tc))
+ spacer_update_bucket (&tc->pacer, bytes);
+}
+
+void
+transport_init_tx_pacers_period (void)
+{
+ f64 cpu_freq = os_cpu_clock_frequency ();
+ transport_pacer_period = cpu_freq / SPACER_CPU_TICKS_PER_PERIOD;
+}
+
void
transport_update_time (f64 time_now, u8 thread_index)
{
/*
* Protocol independent transport properties associated to a session
*/
+typedef struct _transport_stats
+{
+ u64 tx_bytes;
+} transport_stats_t;
+
+typedef struct _spacer
+{
+ u64 bucket;
+ u32 max_burst_size;
+ f32 tokens_per_period;
+ u64 last_update;
+} spacer_t;
+
typedef struct _transport_connection
{
/** Connection ID */
/*fib_node_index_t rmt_fei;
dpo_id_t rmt_dpo; */
+ u8 flags; /**< Transport specific flags */
+ transport_stats_t stats; /**< Transport connection stats */
+ spacer_t pacer; /**< Simple transport pacer */
+
#if TRANSPORT_DEBUG
elog_track_t elog_track; /**< Event logging */
u32 cc_stat_tstamp; /**< CC stats timestamp */
#define c_rmt_fei connection.rmt_fei
#define c_rmt_dpo connection.rmt_dpo
#define c_opaque_id connection.opaque_conn_id
+#define c_stats connection.stats
+#define c_pacer connection.pacer
+#define c_flags connection.flags
} transport_connection_t;
+#define TRANSPORT_CONNECTION_F_IS_TX_PACED 1 << 0
+
typedef enum _transport_proto
{
TRANSPORT_PROTO_TCP,
void transport_update_time (f64 time_now, u8 thread_index);
void transport_enable_disable (vlib_main_t * vm, u8 is_en);
+/**
+ * Initialize tx pacer for connection
+ *
+ * @param tc transport connection
+ * @param rate_bytes_per_second initial byte rate
+ * @param burst_bytes initial burst size in bytes
+ */
+void transport_connection_tx_pacer_init (transport_connection_t * tc,
+ u32 rate_bytes_per_sec,
+ u32 burst_bytes);
+
+/**
+ * Update tx pacer pacing rate
+ *
+ * @param tc transport connection
+ * @param bytes_per_sec new pacing rate
+ */
+void transport_connection_tx_pacer_update (transport_connection_t * tc,
+ u64 bytes_per_sec);
+
+/**
+ * Get maximum tx burst allowed for transport connection
+ *
+ * @param tc transport connection
+ * @param time_now current cpu time as returned by @ref clib_cpu_time_now
+ */
+u32 transport_connection_max_tx_burst (transport_connection_t * tc,
+ u64 time_now);
+
+/**
+ * Initialize period for tx pacers
+ *
+ * Defines a unit of time with respect to number of cpu cycles that is to
+ * be used by all tx pacers.
+ */
+void transport_init_tx_pacers_period (void);
+
+/**
+ * Check if transport connection is paced
+ */
+always_inline u8
+transport_connection_is_tx_paced (transport_connection_t * tc)
+{
+ return (tc->flags & TRANSPORT_CONNECTION_F_IS_TX_PACED);
+}
+
+u8 *format_transport_pacer (u8 * s, va_list * args);
+
+/**
+ * Update tx byte stats for transport connection
+ *
+ * If tx pacing is enabled, this also updates pacer bucket to account for the
+ * amount of bytes that have been sent.
+ *
+ * @param tc transport connection
+ * @param pkts packets recently sent
+ * @param bytes bytes recently sent
+ */
+void transport_connection_update_tx_stats (transport_connection_t * tc,
+ u32 bytes);
+
#endif /* SRC_VNET_SESSION_TRANSPORT_INTERFACE_H_ */
/*
tc->snd_una_max = tc->snd_nxt;
}
+void
+tcp_enable_pacing (tcp_connection_t * tc)
+{
+ u32 max_burst, byte_rate;
+ max_burst = 16 * tc->snd_mss;
+ byte_rate = 2 << 16;
+ transport_connection_tx_pacer_init (&tc->connection, byte_rate, max_burst);
+ tc->mrtt_us = (u32) ~ 0;
+}
+
/** Initialize tcp connection variables
*
* Should be called after having received a msg from the peer, i.e., a SYN or
if (!tc->c_is_ip4 && ip6_address_is_link_local_unicast (&tc->c_rmt_ip6))
tcp_add_del_adjacency (tc, 1);
- // tcp_connection_fib_attach (tc);
+ /* tcp_connection_fib_attach (tc); */
+
+ if (transport_connection_is_tx_paced (&tc->connection)
+ || tcp_main.tx_pacing)
+ tcp_enable_pacing (tc);
}
static int
s = format (s, " limited_transmit %u\n", tc->limited_transmit - tc->iss);
s = format (s, " tsecr %u tsecr_last_ack %u\n", tc->rcv_opts.tsecr,
tc->tsecr_last_ack);
- s = format (s, " rto %u rto_boff %u srtt %u rttvar %u rtt_ts %u ", tc->rto,
- tc->rto_boff, tc->srtt, tc->rttvar, tc->rtt_ts);
+ s = format (s, " rto %u rto_boff %u srtt %u rttvar %u rtt_ts %2.5f ",
+ tc->rto, tc->rto_boff, tc->srtt, tc->rttvar, tc->rtt_ts);
s = format (s, "rtt_seq %u\n", tc->rtt_seq);
s = format (s, " tsval_recent %u tsval_recent_age %u\n", tc->tsval_recent,
tcp_time_now () - tc->tsval_recent_age);
if (tc->state >= TCP_STATE_ESTABLISHED)
- s = format (s, " scoreboard: %U\n", format_tcp_scoreboard, &tc->sack_sb,
- tc);
+ {
+ s = format (s, " scoreboard: %U\n", format_tcp_scoreboard, &tc->sack_sb,
+ tc);
+ if (transport_connection_is_tx_paced (&tc->connection))
+ s = format (s, " pacer: %U\n", format_transport_pacer,
+ &tc->connection.pacer);
+ }
if (vec_len (tc->snd_sacks))
s = format (s, " sacks tx: %U\n", format_tcp_sacks, tc);
};
/* *INDENT-ON* */
+void
+tcp_update_pacer (tcp_connection_t * tc)
+{
+ f64 srtt;
+
+ if (!transport_connection_is_tx_paced (&tc->connection))
+ return;
+
+ srtt = clib_min ((f64) tc->srtt * TCP_TICK, tc->mrtt_us);
+ transport_connection_tx_pacer_update (&tc->connection,
+ ((f64) tc->cwnd) / srtt);
+}
+
static void
tcp_timer_keep_handler (u32 conn_index)
{
else if (unformat (input, "max-rx-fifo %U", unformat_memory_size,
&tm->max_rx_fifo))
;
+ else if (unformat (input, "tx-pacing"))
+ tm->tx_pacing = 1;
else
return clib_error_return (0, "unknown input `%U'",
format_unformat_error, input);
};
#define TCP_SCOREBOARD_TRACE (0)
-#define TCP_MAX_SACK_BLOCKS 15 /**< Max number of SACK blocks stored */
+#define TCP_MAX_SACK_BLOCKS 32 /**< Max number of SACK blocks stored */
#define TCP_INVALID_SACK_HOLE_INDEX ((u32)~0)
typedef struct _scoreboard_trace_elt
u32 rto_boff; /**< Index for RTO backoff */
u32 srtt; /**< Smoothed RTT */
u32 rttvar; /**< Smoothed mean RTT difference. Approximates variance */
- u32 rtt_ts; /**< Timestamp for tracked ACK */
u32 rtt_seq; /**< Sequence number for tracked ACK */
+ f64 rtt_ts; /**< Timestamp for tracked ACK */
+ f64 mrtt_us; /**< High precision mrtt from tracked acks */
u16 mss; /**< Our max seg size that includes options */
u32 limited_transmit; /**< snd_nxt when limited transmit starts */
u32 last_v6_address_rotor;
ip6_address_t *ip6_src_addresses;
+ /** Enable tx pacing for new connections */
+ u8 tx_pacing;
+
u8 punt_unknown4;
u8 punt_unknown6;
return tcp_main.wrk_ctx[vlib_get_thread_index ()].time_now;
}
+always_inline f64
+tcp_time_now_us (u32 thread_index)
+{
+ return transport_time_now (thread_index);
+}
+
always_inline u32
tcp_set_time_now (u32 thread_index)
{
void tcp_connection_timers_reset (tcp_connection_t * tc);
void tcp_init_snd_vars (tcp_connection_t * tc);
void tcp_connection_init_vars (tcp_connection_t * tc);
+void tcp_update_pacer (tcp_connection_t * tc);
+
+always_inline void
+tcp_cc_rcv_ack (tcp_connection_t * tc)
+{
+ tc->cc_algo->rcv_ack (tc);
+ tcp_update_pacer (tc);
+ tc->tsecr_last_ack = tc->rcv_opts.tsecr;
+}
always_inline void
tcp_connection_force_ack (tcp_connection_t * tc, vlib_buffer_t * b)
if (tc->rtt_ts && seq_geq (ack, tc->rtt_seq))
{
- mrtt = tcp_time_now () - tc->rtt_ts;
+ tc->mrtt_us = tcp_time_now_us (tc->c_thread_index) - tc->rtt_ts;
+ mrtt = clib_max ((u32) (tc->mrtt_us * THZ), 1);
}
/* As per RFC7323 TSecr can be used for RTTM only if the segment advances
* snd_una, i.e., the left side of the send window:
* seq_lt (tc->snd_una, ack). This is a condition for calling update_rtt */
else if (tcp_opts_tstamp (&tc->rcv_opts) && tc->rcv_opts.tsecr)
{
- mrtt = tcp_time_now () - tc->rcv_opts.tsecr;
+ mrtt = clib_max (tcp_time_now () - tc->rcv_opts.tsecr, 1);
}
/* Ignore dubious measurements */
tc->snd_nxt = tc->snd_una_max;
tc->snd_rxt_bytes = 0;
- /* HACK: since we don't have an output pacer, force slow start */
- tc->cwnd = 20 * tc->snd_mss;
-
tcp_fastrecovery_off (tc);
tcp_fastrecovery_1_smss_off (tc);
tcp_fastrecovery_first_off (tc);
+
+ /* Update pacer because our cwnd changed. Also makes sure
+ * that we recompute the max burst size */
+ tcp_update_pacer (tc);
+
TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 3);
}
ASSERT (!tcp_in_cong_recovery (tc) || tcp_is_lost_fin (tc));
/* Congestion avoidance */
- tc->cc_algo->rcv_ack (tc);
- tc->tsecr_last_ack = tc->rcv_opts.tsecr;
+ tcp_cc_rcv_ack (tc);
/* If a cumulative ack, make sure dupacks is 0 */
tc->rcv_dupacks = 0;
tc->snd_nxt = tc->snd_una_max;
/* Treat as congestion avoidance ack */
- tc->cc_algo->rcv_ack (tc);
- tc->tsecr_last_ack = tc->rcv_opts.tsecr;
+ tcp_cc_rcv_ack (tc);
return;
}
/* Post RTO timeout don't try anything fancy */
if (tcp_in_recovery (tc))
{
- tc->cc_algo->rcv_ack (tc);
- tc->tsecr_last_ack = tc->rcv_opts.tsecr;
+ tcp_cc_rcv_ack (tc);
transport_add_tx_event (&tc->connection);
return;
}
/* If not tracking an ACK, start tracking */
if (tc->rtt_ts == 0 && !tcp_in_cong_recovery (tc))
{
- tc->rtt_ts = tcp_time_now ();
+ tc->rtt_ts = tcp_time_now_us (tc->c_thread_index);
tc->rtt_seq = tc->snd_nxt;
}
if (PREDICT_FALSE (!tcp_timer_is_active (tc, TCP_TIMER_RETRANSMIT)))