From: Florin Coras Date: Wed, 12 Dec 2018 18:56:01 +0000 (-0800) Subject: session/tcp: support tx flush mark X-Git-Tag: v19.04-rc0~181 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;ds=sidebyside;h=42ceddb7cd836a89a12b0b8e623b06bc4c0cc0cb;p=vpp.git session/tcp: support tx flush mark For tcp this means that the last enqueued data goes out with a psh bit set. Change-Id: I29d357ecae6f02e748b59a7b799150ec73d14ba2 Signed-off-by: Florin Coras --- diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 0eaab6cbd6b..9bb6a9880d1 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -1665,8 +1665,9 @@ vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct) return (e->event_type == SESSION_IO_EVT_CT_RX); } -int -vppcom_session_write (uint32_t session_handle, void *buf, size_t n) +static inline int +vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n, + u8 is_flush) { vcl_worker_t *wrk = vcl_worker_get_current (); int rv, n_write, is_nonblocking; @@ -1733,6 +1734,9 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n) ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX); et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s); + if (is_flush && !vcl_session_is_ct (s)) + et = SESSION_IO_EVT_TX_FLUSH; + if (s->is_dgram) n_write = app_send_dgram_raw (tx_fifo, &s->transport, s->vpp_evt_q, buf, n, et, SVM_Q_WAIT); @@ -1748,6 +1752,13 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n) return n_write; } +int +vppcom_session_write (uint32_t session_handle, void *buf, size_t n) +{ + return vppcom_session_write_inline (session_handle, buf, n, + 0 /* is_flush */ ); +} + static vcl_session_t * vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type) { @@ -3345,7 +3356,7 @@ vppcom_session_sendto (uint32_t session_handle, void *buffer, getpid (), flags, flags); } - return (vppcom_session_write (session_handle, buffer, buflen)); + return (vppcom_session_write_inline (session_handle, buffer, buflen, 1)); } int diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index d5f040edc59..be2490fbe95 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -36,6 +36,7 @@ typedef enum SESSION_IO_EVT_CT_RX, FIFO_EVENT_APP_TX, SESSION_IO_EVT_CT_TX, + SESSION_IO_EVT_TX_FLUSH, FIFO_EVENT_DISCONNECT, FIFO_EVENT_BUILTIN_RX, FIFO_EVENT_BUILTIN_TX, @@ -518,6 +519,13 @@ transport_max_rx_enqueue (transport_connection_t * tc) return svm_fifo_max_enqueue (s->server_rx_fifo); } +always_inline u32 +transport_max_tx_dequeue (transport_connection_t * tc) +{ + stream_session_t *s = session_get (tc->s_index, tc->thread_index); + return svm_fifo_max_dequeue (s->server_tx_fifo); +} + always_inline u32 transport_rx_fifo_size (transport_connection_t * tc) { diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 14576685a76..f4e0eaa993e 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -572,9 +572,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, ctx->transport_vft = transport_protocol_get_vft (tp); ctx->tc = session_tx_get_transport (ctx, peek_data); ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc); - ctx->snd_space = - transport_connection_snd_space (ctx->tc, vm->clib_time.last_cpu_time, - ctx->snd_mss); + + if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH)) + { + if (ctx->transport_vft->flush_data) + ctx->transport_vft->flush_data (ctx->tc); + } + + ctx->snd_space = transport_connection_snd_space (ctx->tc, + vm-> + clib_time.last_cpu_time, + ctx->snd_mss); if (ctx->snd_space == 0 || ctx->snd_mss == 0) { vec_add1 (wrk->pending_event_vector, *e); @@ -828,6 +836,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, e = &fifo_events[i]; switch (e->event_type) { + case SESSION_IO_EVT_TX_FLUSH: case FIFO_EVENT_APP_TX: /* Don't try to send more that one frame per dispatch cycle */ if (n_tx_packets == VLIB_FRAME_SIZE) diff --git a/src/vnet/session/transport_interface.h b/src/vnet/session/transport_interface.h index 3bfed415874..10579c45c64 100644 --- a/src/vnet/session/transport_interface.h +++ b/src/vnet/session/transport_interface.h @@ -61,6 +61,7 @@ typedef struct _transport_proto_vft u32 (*send_space) (transport_connection_t * tc); u32 (*tx_fifo_offset) (transport_connection_t * tc); void (*update_time) (f64 time_now, u8 thread_index); + void (*flush_data) (transport_connection_t *tconn); /* * Connection retrieval diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c index f703d634b54..6d6a880eda8 100644 --- a/src/vnet/tcp/tcp.c +++ b/src/vnet/tcp/tcp.c @@ -1123,6 +1123,16 @@ tcp_session_push_header (transport_connection_t * tconn, vlib_buffer_t * b) return tcp_push_header (tc, b); } +static void +tcp_session_flush_data (transport_connection_t * tconn) +{ + tcp_connection_t *tc = (tcp_connection_t *) tconn; + if (tc->flags & TCP_CONN_PSH_PENDING) + return; + tc->flags |= TCP_CONN_PSH_PENDING; + tc->psh_seq = tc->snd_una_max + transport_max_tx_dequeue (tconn) - 1; +} + /* *INDENT-OFF* */ const static transport_proto_vft_t tcp_proto = { .enable = vnet_tcp_enable_disable, @@ -1139,6 +1149,7 @@ const static transport_proto_vft_t tcp_proto = { .send_space = tcp_session_send_space, .update_time = tcp_update_time, .tx_fifo_offset = tcp_session_tx_fifo_offset, + .flush_data = tcp_session_flush_data, .format_connection = format_tcp_session, .format_listener = format_tcp_listener_session, .format_half_open = format_tcp_half_open_session, diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h index 5b235b65d74..46b03ac14b4 100644 --- a/src/vnet/tcp/tcp.h +++ b/src/vnet/tcp/tcp.h @@ -124,6 +124,7 @@ extern timer_expiration_handler tcp_timer_retransmit_syn_handler; _(FRXT_PENDING, "Fast-retransmit pending") \ _(FRXT_FIRST, "Fast-retransmit first again") \ _(DEQ_PENDING, "Pending dequeue acked") \ + _(PSH_PENDING, "Pending psh packet") \ typedef enum _tcp_connection_flag_bits { @@ -334,6 +335,8 @@ typedef struct _tcp_connection u32 last_fib_check; /**< Last time we checked fib route for peer */ u32 sw_if_index; /**< Interface for the connection */ u32 tx_fifo_size; /**< Tx fifo size. Used to constrain cwnd */ + + u32 psh_seq; /**< Add psh header for seg that includes this */ } tcp_connection_t; /* *INDENT-OFF* */ diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c index dff18029155..f04fa5d8901 100644 --- a/src/vnet/tcp/tcp_input.c +++ b/src/vnet/tcp/tcp_input.c @@ -543,6 +543,12 @@ tcp_handle_postponed_dequeues (tcp_worker_ctx_t * wrk) tc->burst_acked = 0; tcp_validate_txf_size (tc, tc->snd_una_max - tc->snd_una); + if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING)) + { + if (seq_leq (tc->psh_seq, tc->snd_una)) + tc->flags &= ~TCP_CONN_PSH_PENDING; + } + /* If everything has been acked, stop retransmit timer * otherwise update. */ tcp_retransmit_timer_update (tc); diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c index 74fc15fe6cc..7cee34995e3 100644 --- a/src/vnet/tcp/tcp_output.c +++ b/src/vnet/tcp/tcp_output.c @@ -1169,7 +1169,12 @@ tcp_push_hdr_i (tcp_connection_t * tc, vlib_buffer_t * b, advertise_wnd = tcp_window_to_advertise (tc, next_state); flags = tcp_make_state_flags (tc, next_state); - + if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING)) + { + if (seq_geq (tc->psh_seq, tc->snd_nxt) + && seq_lt (tc->psh_seq, tc->snd_nxt + data_len)) + flags |= TCP_FLAG_PSH; + } th = vlib_buffer_push_tcp (b, tc->c_lcl_port, tc->c_rmt_port, tc->snd_nxt, tc->rcv_nxt, tcp_hdr_opts_len, flags, advertise_wnd);