For tcp this means that the last enqueued data goes out with a psh bit
set.
Change-Id: I29d357ecae6f02e748b59a7b799150ec73d14ba2
Signed-off-by: Florin Coras <fcoras@cisco.com>
return (e->event_type == SESSION_IO_EVT_CT_RX);
}
return (e->event_type == SESSION_IO_EVT_CT_RX);
}
-int
-vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
+static inline int
+vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
+ u8 is_flush)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
int rv, n_write, is_nonblocking;
{
vcl_worker_t *wrk = vcl_worker_get_current ();
int rv, n_write, is_nonblocking;
ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
+ if (is_flush && !vcl_session_is_ct (s))
+ et = SESSION_IO_EVT_TX_FLUSH;
+
if (s->is_dgram)
n_write = app_send_dgram_raw (tx_fifo, &s->transport,
s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
if (s->is_dgram)
n_write = app_send_dgram_raw (tx_fifo, &s->transport,
s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
+int
+vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
+{
+ return vppcom_session_write_inline (session_handle, buf, n,
+ 0 /* is_flush */ );
+}
+
static vcl_session_t *
vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
{
static vcl_session_t *
vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
{
getpid (), flags, flags);
}
getpid (), flags, flags);
}
- return (vppcom_session_write (session_handle, buffer, buflen));
+ return (vppcom_session_write_inline (session_handle, buffer, buflen, 1));
SESSION_IO_EVT_CT_RX,
FIFO_EVENT_APP_TX,
SESSION_IO_EVT_CT_TX,
SESSION_IO_EVT_CT_RX,
FIFO_EVENT_APP_TX,
SESSION_IO_EVT_CT_TX,
+ SESSION_IO_EVT_TX_FLUSH,
FIFO_EVENT_DISCONNECT,
FIFO_EVENT_BUILTIN_RX,
FIFO_EVENT_BUILTIN_TX,
FIFO_EVENT_DISCONNECT,
FIFO_EVENT_BUILTIN_RX,
FIFO_EVENT_BUILTIN_TX,
return svm_fifo_max_enqueue (s->server_rx_fifo);
}
return svm_fifo_max_enqueue (s->server_rx_fifo);
}
+always_inline u32
+transport_max_tx_dequeue (transport_connection_t * tc)
+{
+ stream_session_t *s = session_get (tc->s_index, tc->thread_index);
+ return svm_fifo_max_dequeue (s->server_tx_fifo);
+}
+
always_inline u32
transport_rx_fifo_size (transport_connection_t * tc)
{
always_inline u32
transport_rx_fifo_size (transport_connection_t * tc)
{
ctx->transport_vft = transport_protocol_get_vft (tp);
ctx->tc = session_tx_get_transport (ctx, peek_data);
ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
ctx->transport_vft = transport_protocol_get_vft (tp);
ctx->tc = session_tx_get_transport (ctx, peek_data);
ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
- ctx->snd_space =
- transport_connection_snd_space (ctx->tc, vm->clib_time.last_cpu_time,
- ctx->snd_mss);
+
+ if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
+ {
+ if (ctx->transport_vft->flush_data)
+ ctx->transport_vft->flush_data (ctx->tc);
+ }
+
+ ctx->snd_space = transport_connection_snd_space (ctx->tc,
+ vm->
+ clib_time.last_cpu_time,
+ ctx->snd_mss);
if (ctx->snd_space == 0 || ctx->snd_mss == 0)
{
vec_add1 (wrk->pending_event_vector, *e);
if (ctx->snd_space == 0 || ctx->snd_mss == 0)
{
vec_add1 (wrk->pending_event_vector, *e);
e = &fifo_events[i];
switch (e->event_type)
{
e = &fifo_events[i];
switch (e->event_type)
{
+ case SESSION_IO_EVT_TX_FLUSH:
case FIFO_EVENT_APP_TX:
/* Don't try to send more that one frame per dispatch cycle */
if (n_tx_packets == VLIB_FRAME_SIZE)
case FIFO_EVENT_APP_TX:
/* Don't try to send more that one frame per dispatch cycle */
if (n_tx_packets == VLIB_FRAME_SIZE)
u32 (*send_space) (transport_connection_t * tc);
u32 (*tx_fifo_offset) (transport_connection_t * tc);
void (*update_time) (f64 time_now, u8 thread_index);
u32 (*send_space) (transport_connection_t * tc);
u32 (*tx_fifo_offset) (transport_connection_t * tc);
void (*update_time) (f64 time_now, u8 thread_index);
+ void (*flush_data) (transport_connection_t *tconn);
/*
* Connection retrieval
/*
* Connection retrieval
return tcp_push_header (tc, b);
}
return tcp_push_header (tc, b);
}
+static void
+tcp_session_flush_data (transport_connection_t * tconn)
+{
+ tcp_connection_t *tc = (tcp_connection_t *) tconn;
+ if (tc->flags & TCP_CONN_PSH_PENDING)
+ return;
+ tc->flags |= TCP_CONN_PSH_PENDING;
+ tc->psh_seq = tc->snd_una_max + transport_max_tx_dequeue (tconn) - 1;
+}
+
/* *INDENT-OFF* */
const static transport_proto_vft_t tcp_proto = {
.enable = vnet_tcp_enable_disable,
/* *INDENT-OFF* */
const static transport_proto_vft_t tcp_proto = {
.enable = vnet_tcp_enable_disable,
.send_space = tcp_session_send_space,
.update_time = tcp_update_time,
.tx_fifo_offset = tcp_session_tx_fifo_offset,
.send_space = tcp_session_send_space,
.update_time = tcp_update_time,
.tx_fifo_offset = tcp_session_tx_fifo_offset,
+ .flush_data = tcp_session_flush_data,
.format_connection = format_tcp_session,
.format_listener = format_tcp_listener_session,
.format_half_open = format_tcp_half_open_session,
.format_connection = format_tcp_session,
.format_listener = format_tcp_listener_session,
.format_half_open = format_tcp_half_open_session,
_(FRXT_PENDING, "Fast-retransmit pending") \
_(FRXT_FIRST, "Fast-retransmit first again") \
_(DEQ_PENDING, "Pending dequeue acked") \
_(FRXT_PENDING, "Fast-retransmit pending") \
_(FRXT_FIRST, "Fast-retransmit first again") \
_(DEQ_PENDING, "Pending dequeue acked") \
+ _(PSH_PENDING, "Pending psh packet") \
typedef enum _tcp_connection_flag_bits
{
typedef enum _tcp_connection_flag_bits
{
u32 last_fib_check; /**< Last time we checked fib route for peer */
u32 sw_if_index; /**< Interface for the connection */
u32 tx_fifo_size; /**< Tx fifo size. Used to constrain cwnd */
u32 last_fib_check; /**< Last time we checked fib route for peer */
u32 sw_if_index; /**< Interface for the connection */
u32 tx_fifo_size; /**< Tx fifo size. Used to constrain cwnd */
+
+ u32 psh_seq; /**< Add psh header for seg that includes this */
} tcp_connection_t;
/* *INDENT-OFF* */
} tcp_connection_t;
/* *INDENT-OFF* */
tc->burst_acked = 0;
tcp_validate_txf_size (tc, tc->snd_una_max - tc->snd_una);
tc->burst_acked = 0;
tcp_validate_txf_size (tc, tc->snd_una_max - tc->snd_una);
+ if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING))
+ {
+ if (seq_leq (tc->psh_seq, tc->snd_una))
+ tc->flags &= ~TCP_CONN_PSH_PENDING;
+ }
+
/* If everything has been acked, stop retransmit timer
* otherwise update. */
tcp_retransmit_timer_update (tc);
/* If everything has been acked, stop retransmit timer
* otherwise update. */
tcp_retransmit_timer_update (tc);
advertise_wnd = tcp_window_to_advertise (tc, next_state);
flags = tcp_make_state_flags (tc, next_state);
advertise_wnd = tcp_window_to_advertise (tc, next_state);
flags = tcp_make_state_flags (tc, next_state);
+ if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING))
+ {
+ if (seq_geq (tc->psh_seq, tc->snd_nxt)
+ && seq_lt (tc->psh_seq, tc->snd_nxt + data_len))
+ flags |= TCP_FLAG_PSH;
+ }
th = vlib_buffer_push_tcp (b, tc->c_lcl_port, tc->c_rmt_port, tc->snd_nxt,
tc->rcv_nxt, tcp_hdr_opts_len, flags,
advertise_wnd);
th = vlib_buffer_push_tcp (b, tc->c_lcl_port, tc->c_rmt_port, tc->snd_nxt,
tc->rcv_nxt, tcp_hdr_opts_len, flags,
advertise_wnd);