From 70f879d2852dfc042ad0911a4a6e4a1714c0eb83 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Fri, 13 Mar 2020 17:54:42 +0000 Subject: [PATCH] session tcp udp: consolidate transport snd apis Type: improvement Use only one api to retrieve transport send parameters. Additionally, allow transports to request postponing and descheduling of events. With this, tcp now requests descheduling of sessions when the connections are stuck probing for zero snd_wnd Signed-off-by: Florin Coras Change-Id: I722c974f3e68fa15424c519a1fffacda43af050c --- src/vnet/session/session.c | 13 +++++++ src/vnet/session/session.h | 5 +-- src/vnet/session/session_node.c | 76 +++++++++++++++++++------------------- src/vnet/session/transport.c | 18 +++++++++ src/vnet/session/transport.h | 48 +++++++++++++++++++++--- src/vnet/session/transport_types.h | 12 ++++-- src/vnet/tcp/tcp.c | 65 ++++++++++++++------------------ src/vnet/tcp/tcp.h | 2 + src/vnet/tcp/tcp_input.c | 4 +- src/vnet/tcp/tcp_output.c | 10 ++++- src/vnet/udp/udp.c | 25 ++++++------- 11 files changed, 175 insertions(+), 103 deletions(-) diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index e9cda361f37..15d949c76b5 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -145,6 +145,19 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) } } +void +sesssion_reschedule_tx (transport_connection_t * tc) +{ + session_worker_t *wrk = session_main_get_worker (tc->thread_index); + session_evt_elt_t *elt; + + ASSERT (tc->thread_index == vlib_get_thread_index ()); + + elt = session_evt_alloc_new (wrk); + elt->evt.session_index = tc->s_index; + elt->evt.event_type = SESSION_IO_EVT_TX; +} + static void session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt) { diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index e85637283a7..777984b519b 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -48,14 +48,12 @@ typedef struct session_tx_context_ session_t *s; transport_proto_vft_t *transport_vft; transport_connection_t *tc; + transport_send_params_t sp; u32 max_dequeue; - u32 snd_space; u32 left_to_snd; - u32 tx_offset; u32 max_len_to_snd; u16 deq_per_first_buf; u16 deq_per_buf; - u16 snd_mss; u16 n_segs_per_evt; u8 n_bufs_per_seg; CLIB_CACHE_LINE_ALIGN_MARK (cacheline1); @@ -429,6 +427,7 @@ void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, void *rpc_args); void session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio); +void sesssion_reschedule_tx (transport_connection_t * tc); transport_connection_t *session_get_transport (session_t * s); void session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index ad24f429fbc..b1c2428874e 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -567,7 +567,7 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx, b->total_length_not_including_first_buffer = 0; chain_b = b; - left_from_seg = clib_min (ctx->snd_mss - b->current_length, + left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length, ctx->left_to_snd); to_deq = left_from_seg; for (j = 1; j < ctx->n_bufs_per_seg; j++) @@ -583,8 +583,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, - ctx->tx_offset, len_to_deq, data); - ctx->tx_offset += n_bytes_read; + ctx->sp.tx_offset, len_to_deq, data); + ctx->sp.tx_offset += n_bytes_read; } else { @@ -651,12 +651,12 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { - n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->tx_offset, + n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data0); ASSERT (n_bytes_read > 0); /* Keep track of progress locally, transport is also supposed to * increment it independently when pushing the header */ - ctx->tx_offset += n_bytes_read; + ctx->sp.tx_offset += n_bytes_read; } else { @@ -756,13 +756,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { /* Offset in rx fifo from where to peek data */ - ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc); - if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue)) + if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue)) { ctx->max_len_to_snd = 0; return; } - ctx->max_dequeue -= ctx->tx_offset; + ctx->max_dequeue -= ctx->sp.tx_offset; } else { @@ -782,34 +781,34 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, ASSERT (ctx->max_dequeue > 0); /* Ensure we're not writing more than transport window allows */ - if (ctx->max_dequeue < ctx->snd_space) + if (ctx->max_dequeue < ctx->sp.snd_space) { /* Constrained by tx queue. Try to send only fully formed segments */ - ctx->max_len_to_snd = - (ctx->max_dequeue > ctx->snd_mss) ? - ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue; + ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ? + (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) : + ctx->max_dequeue; /* TODO Nagle ? */ } else { /* Expectation is that snd_space0 is already a multiple of snd_mss */ - ctx->max_len_to_snd = ctx->snd_space; + ctx->max_len_to_snd = ctx->sp.snd_space; } /* Check if we're tx constrained by the node */ - ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss); + ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss); if (ctx->n_segs_per_evt > max_segs) { ctx->n_segs_per_evt = max_segs; - ctx->max_len_to_snd = max_segs * ctx->snd_mss; + ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss; } n_bytes_per_buf = vlib_buffer_get_default_data_size (vm); ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN); - n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->snd_mss; + n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss; ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf); - ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf); - ctx->deq_per_first_buf = clib_min (ctx->snd_mss, + ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf); + ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN); } @@ -817,12 +816,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, always_inline void session_tx_maybe_reschedule (session_worker_t * wrk, session_tx_context_t * ctx, - session_evt_elt_t * elt, u8 is_peek) + session_evt_elt_t * elt) { session_t *s = ctx->s; svm_fifo_unset_event (s->tx_fifo); - if (svm_fifo_max_dequeue_cons (s->tx_fifo) > (is_peek ? ctx->tx_offset : 0)) + if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset) if (svm_fifo_set_event (s->tx_fifo)) session_evt_add_head_old (wrk, elt); } @@ -880,20 +879,23 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, } } - ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc); - if (PREDICT_FALSE (ctx->snd_mss == 0)) - { - session_evt_add_old (wrk, elt); - return SESSION_TX_NO_DATA; - } - - ctx->snd_space = transport_connection_snd_space (ctx->tc); + transport_connection_snd_params (ctx->tc, &ctx->sp); - /* This flow queue is "empty" so it should be re-evaluated before - * the ones that have data to send. */ - if (!ctx->snd_space) + if (!ctx->sp.snd_space) { - session_evt_add_head_old (wrk, elt); + /* This flow queue is "empty" so it should be re-evaluated before + * the ones that have data to send. */ + if (PREDICT_TRUE (!ctx->sp.flags)) + session_evt_add_head_old (wrk, elt); + /* Request to postpone the session, e.g., zero-wnd and transport + * is not currently probing */ + else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE) + session_evt_add_old (wrk, elt); + /* If the deschedule flag was set, remove session from scheduler. + * Transport is responsible for rescheduling this session. */ + else + transport_connection_deschedule (ctx->tc); + return SESSION_TX_NO_DATA; } @@ -905,9 +907,9 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, session_evt_add_head_old (wrk, elt); return SESSION_TX_NO_DATA; } - snd_space = clib_min (ctx->snd_space, snd_space); - ctx->snd_space = snd_space >= ctx->snd_mss ? - snd_space - snd_space % ctx->snd_mss : snd_space; + snd_space = clib_min (ctx->sp.snd_space, snd_space); + ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ? + snd_space - snd_space % ctx->sp.snd_mss : snd_space; } /* Check how much we can pull. */ @@ -916,7 +918,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, if (PREDICT_FALSE (!ctx->max_len_to_snd)) { transport_connection_tx_pacer_reset_bucket (ctx->tc, 0); - session_tx_maybe_reschedule (wrk, ctx, elt, peek_data); + session_tx_maybe_reschedule (wrk, ctx, elt); return SESSION_TX_NO_DATA; } @@ -1019,7 +1021,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, if (ctx->max_len_to_snd < ctx->max_dequeue) session_evt_add_old (wrk, elt); else - session_tx_maybe_reschedule (wrk, ctx, elt, peek_data); + session_tx_maybe_reschedule (wrk, ctx, elt); if (!peek_data && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM) diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index c8c58357afd..e27aaf3ff6b 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -103,6 +103,8 @@ format_transport_connection (u8 * s, va_list * args) indent = format_get_indent (s) + 1; s = format (s, "%Upacer: %U\n", format_white_space, indent, format_transport_pacer, &tc->pacer, tc->thread_index); + s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent, + tc->flags); } return s; } @@ -719,6 +721,22 @@ transport_connection_tx_pacer_update_bytes (transport_connection_t * tc, spacer_update_bucket (&tc->pacer, bytes); } +void +transport_connection_reschedule (transport_connection_t * tc) +{ + tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED; + if (transport_max_tx_dequeue (tc)) + sesssion_reschedule_tx (tc); + else + { + session_t *s = session_get (tc->s_index, tc->thread_index); + svm_fifo_unset_event (s->tx_fifo); + if (svm_fifo_max_dequeue_cons (s->tx_fifo)) + if (svm_fifo_set_event (s->tx_fifo)) + sesssion_reschedule_tx (tc); + } +} + void transport_update_time (clib_time_type_t time_now, u8 thread_index) { diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index adc695f5e5a..b2be990947c 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -32,6 +32,21 @@ typedef struct _transport_options_t u8 half_open_has_fifos; } transport_options_t; +typedef enum transport_snd_flags_ +{ + TRANSPORT_SND_F_DESCHED = 1 << 0, + TRANSPORT_SND_F_POSTPONE = 1 << 1, + TRANSPORT_SND_N_FLAGS +} __clib_packed transport_snd_flags_t; + +typedef struct transport_send_params_ +{ + u32 snd_space; + u32 tx_offset; + u16 snd_mss; + transport_snd_flags_t flags; +} transport_send_params_t; + /* * Transport protocol virtual function table */ @@ -54,9 +69,8 @@ typedef struct _transport_proto_vft */ u32 (*push_header) (transport_connection_t * tconn, vlib_buffer_t * b); - u16 (*send_mss) (transport_connection_t * tc); - u32 (*send_space) (transport_connection_t * tc); - u32 (*tx_fifo_offset) (transport_connection_t * tc); + int (*send_params) (transport_connection_t * tconn, + transport_send_params_t *sp); void (*update_time) (f64 time_now, u8 thread_index); void (*flush_data) (transport_connection_t *tconn); int (*custom_tx) (void *session, u32 max_burst_size); @@ -151,16 +165,38 @@ transport_app_rx_evt (transport_proto_t tp, u32 conn_index, u32 thread_index) } /** - * Get maximum tx burst allowed for transport connection + * Get send parameters for transport connection + * + * These include maximum tx burst, mss, tx offset and other flags + * transport might want to provide to sessin layer * * @param tc transport connection + * @param sp send paramaters + * */ static inline u32 -transport_connection_snd_space (transport_connection_t * tc) +transport_connection_snd_params (transport_connection_t * tc, + transport_send_params_t * sp) { - return tp_vfts[tc->proto].send_space (tc); + return tp_vfts[tc->proto].send_params (tc, sp); } +static inline u8 +transport_connection_is_descheduled (transport_connection_t * tc) +{ + if (tc->flags & TRANSPORT_CONNECTION_F_DESCHED) + return 1; + return 0; +} + +static inline void +transport_connection_deschedule (transport_connection_t * tc) +{ + tc->flags |= TRANSPORT_CONNECTION_F_DESCHED; +} + +void transport_connection_reschedule (transport_connection_t * tc); + void transport_register_protocol (transport_proto_t transport_proto, const transport_proto_vft_t * vft, fib_protocol_t fib_proto, u32 output_node); diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h index 459fb0c5833..323d261ad89 100644 --- a/src/vnet/session/transport_types.h +++ b/src/vnet/session/transport_types.h @@ -43,9 +43,15 @@ typedef enum transport_service_type_ typedef enum transport_connection_flags_ { TRANSPORT_CONNECTION_F_IS_TX_PACED = 1 << 0, - TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1, /**< Don't register connection in lookup - Does not apply to local apps and - transports using the network layer (udp/tcp) */ + /** + * Don't register connection in lookup. Does not apply to local apps + * and transports using the network layer (udp/tcp) + */ + TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1, + /** + * Connection descheduled by the session layer. + */ + TRANSPORT_CONNECTION_F_DESCHED = 1 << 2, } transport_connection_flags_t; typedef struct _spacer diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c index 7712ade4416..4a0ffc137e5 100644 --- a/src/vnet/tcp/tcp.c +++ b/src/vnet/tcp/tcp.c @@ -1195,29 +1195,6 @@ tcp_session_cal_goal_size (tcp_connection_t * tc) return goal_size > tc->snd_mss ? goal_size : tc->snd_mss; } -/** - * Compute maximum segment size for session layer. - * - * Since the result needs to be the actual data length, it first computes - * the tcp options to be used in the next burst and subtracts their - * length from the connection's snd_mss. - */ -static u16 -tcp_session_send_mss (transport_connection_t * trans_conn) -{ - tcp_connection_t *tc = (tcp_connection_t *) trans_conn; - - /* Ensure snd_mss does accurately reflect the amount of data we can push - * in a segment. This also makes sure that options are updated according to - * the current state of the connection. */ - tcp_update_burst_snd_vars (tc); - - if (PREDICT_FALSE (tc->cfg_flags & TCP_CFG_F_TSO)) - return tcp_session_cal_goal_size (tc); - - return tc->snd_mss; -} - always_inline u32 tcp_round_snd_space (tcp_connection_t * tc, u32 snd_space) { @@ -1281,23 +1258,39 @@ tcp_snd_space (tcp_connection_t * tc) return tcp_snd_space_inline (tc); } -static u32 -tcp_session_send_space (transport_connection_t * trans_conn) +static int +tcp_session_send_params (transport_connection_t * trans_conn, + transport_send_params_t * sp) { tcp_connection_t *tc = (tcp_connection_t *) trans_conn; - return clib_min (tcp_snd_space_inline (tc), - tc->snd_wnd - (tc->snd_nxt - tc->snd_una)); -} -static u32 -tcp_session_tx_fifo_offset (transport_connection_t * trans_conn) -{ - tcp_connection_t *tc = (tcp_connection_t *) trans_conn; + /* Ensure snd_mss does accurately reflect the amount of data we can push + * in a segment. This also makes sure that options are updated according to + * the current state of the connection. */ + tcp_update_burst_snd_vars (tc); - ASSERT (seq_geq (tc->snd_nxt, tc->snd_una)); + if (PREDICT_FALSE (tc->cfg_flags & TCP_CFG_F_TSO)) + sp->snd_mss = tcp_session_cal_goal_size (tc); + else + sp->snd_mss = tc->snd_mss; + + sp->snd_space = clib_min (tcp_snd_space_inline (tc), + tc->snd_wnd - (tc->snd_nxt - tc->snd_una)); + ASSERT (seq_geq (tc->snd_nxt, tc->snd_una)); /* This still works if fast retransmit is on */ - return (tc->snd_nxt - tc->snd_una); + sp->tx_offset = tc->snd_nxt - tc->snd_una; + + sp->flags = 0; + if (!tc->snd_wnd) + { + if (tcp_timer_is_active (tc, TCP_TIMER_PERSIST)) + sp->flags = TRANSPORT_SND_F_DESCHED; + else + sp->flags = TRANSPORT_SND_F_POSTPONE; + } + + return 0; } static void @@ -1509,10 +1502,8 @@ const static transport_proto_vft_t tcp_proto = { .close = tcp_session_close, .cleanup = tcp_session_cleanup, .reset = tcp_session_reset, - .send_mss = tcp_session_send_mss, - .send_space = tcp_session_send_space, + .send_params = tcp_session_send_params, .update_time = tcp_update_time, - .tx_fifo_offset = tcp_session_tx_fifo_offset, .flush_data = tcp_session_flush_data, .custom_tx = tcp_session_custom_tx, .format_connection = format_tcp_session, diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h index 6094ac5110a..8fa9013e31a 100644 --- a/src/vnet/tcp/tcp.h +++ b/src/vnet/tcp/tcp.h @@ -1244,6 +1244,8 @@ tcp_persist_timer_update (tcp_connection_t * tc) always_inline void tcp_persist_timer_reset (tcp_connection_t * tc) { + if (transport_connection_is_descheduled (&tc->connection)) + transport_connection_reschedule (&tc->connection); tcp_timer_reset (tc, TCP_TIMER_PERSIST); } diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c index 0e8e68b1339..4f31d21c3c1 100755 --- a/src/vnet/tcp/tcp_input.c +++ b/src/vnet/tcp/tcp_input.c @@ -1312,7 +1312,9 @@ tcp_update_snd_wnd (tcp_connection_t * tc, u32 seq, u32 ack, u32 snd_wnd) } else { - tcp_persist_timer_reset (tc); + if (PREDICT_FALSE (tcp_timer_is_active (tc, TCP_TIMER_PERSIST))) + tcp_persist_timer_reset (tc); + if (PREDICT_FALSE (!tcp_in_recovery (tc) && tc->rto_boff > 0)) { tc->rto_boff = 0; diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c index 6ed478fd1bc..b77713e1538 100644 --- a/src/vnet/tcp/tcp_output.c +++ b/src/vnet/tcp/tcp_output.c @@ -1637,7 +1637,7 @@ tcp_timer_persist_handler (tcp_connection_t * tc) /* Problem already solved or worse */ if (tc->state == TCP_STATE_CLOSED || tc->snd_wnd > tc->snd_mss || (tc->flags & TCP_CONN_FINSNT)) - return; + goto update_scheduler; available_bytes = transport_max_tx_dequeue (&tc->connection); offset = tc->snd_nxt - tc->snd_una; @@ -1651,7 +1651,7 @@ tcp_timer_persist_handler (tcp_connection_t * tc) } if (available_bytes <= offset) - return; + goto update_scheduler; /* Increment RTO backoff */ tc->rto_boff += 1; @@ -1665,6 +1665,7 @@ tcp_timer_persist_handler (tcp_connection_t * tc) tcp_persist_timer_set (tc); return; } + b = vlib_get_buffer (vm, bi); data = tcp_init_buffer (vm, b); @@ -1693,6 +1694,11 @@ tcp_timer_persist_handler (tcp_connection_t * tc) /* Just sent new data, enable retransmit */ tcp_retransmit_timer_update (tc); + +update_scheduler: + + if (transport_connection_is_descheduled (&tc->connection)) + transport_connection_reschedule (&tc->connection); } /** diff --git a/src/vnet/udp/udp.c b/src/vnet/udp/udp.c index 34cebec93d2..109f4683e7f 100644 --- a/src/vnet/udp/udp.c +++ b/src/vnet/udp/udp.c @@ -273,18 +273,17 @@ format_udp_listener_session (u8 * s, va_list * args) return format (s, "%U", format_udp_connection, uc, verbose); } -u16 -udp_send_mss (transport_connection_t * t) -{ - /* TODO figure out MTU of output interface */ - return 1460; -} - -u32 -udp_send_space (transport_connection_t * t) +static int +udp_session_send_params (transport_connection_t * tconn, + transport_send_params_t * sp) { /* No constraint on TX window */ - return ~0; + sp->snd_space = ~0; + /* TODO figure out MTU of output interface */ + sp->snd_mss = 1460; + sp->tx_offset = 0; + sp->flags = 0; + return 0; } int @@ -357,8 +356,7 @@ static const transport_proto_vft_t udp_proto = { .get_half_open = udp_session_get_half_open, .close = udp_session_close, .cleanup = udp_session_cleanup, - .send_mss = udp_send_mss, - .send_space = udp_send_space, + .send_params = udp_session_send_params, .format_connection = format_udp_session, .format_half_open = format_udp_half_open_session, .format_listener = format_udp_listener_session, @@ -412,8 +410,7 @@ static const transport_proto_vft_t udpc_proto = { .get_half_open = udp_session_get_half_open, .close = udp_session_close, .cleanup = udp_session_cleanup, - .send_mss = udp_send_mss, - .send_space = udp_send_space, + .send_params = udp_session_send_params, .format_connection = format_udp_session, .format_half_open = format_udp_half_open_session, .format_listener = format_udp_listener_session, -- 2.16.6