session/tcp: support tx flush mark 48/16448/4
authorFlorin Coras <fcoras@cisco.com>
Wed, 12 Dec 2018 18:56:01 +0000 (10:56 -0800)
committerDave Barach <openvpp@barachs.net>
Thu, 13 Dec 2018 13:40:32 +0000 (13:40 +0000)
For tcp this means that the last enqueued data goes out with a psh bit
set.

Change-Id: I29d357ecae6f02e748b59a7b799150ec73d14ba2
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/vcl/vppcom.c
src/vnet/session/session.h
src/vnet/session/session_node.c
src/vnet/session/transport_interface.h
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp.h
src/vnet/tcp/tcp_input.c
src/vnet/tcp/tcp_output.c

index 0eaab6c..9bb6a98 100644 (file)
@@ -1665,8 +1665,9 @@ vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
     return (e->event_type == SESSION_IO_EVT_CT_RX);
 }
 
-int
-vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
+static inline int
+vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
+                            u8 is_flush)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
   int rv, n_write, is_nonblocking;
@@ -1733,6 +1734,9 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
 
   ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
   et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
+  if (is_flush && !vcl_session_is_ct (s))
+    et = SESSION_IO_EVT_TX_FLUSH;
+
   if (s->is_dgram)
     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
                                  s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
@@ -1748,6 +1752,13 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
   return n_write;
 }
 
+int
+vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
+{
+  return vppcom_session_write_inline (session_handle, buf, n,
+                                     0 /* is_flush */ );
+}
+
 static vcl_session_t *
 vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
 {
@@ -3345,7 +3356,7 @@ vppcom_session_sendto (uint32_t session_handle, void *buffer,
            getpid (), flags, flags);
     }
 
-  return (vppcom_session_write (session_handle, buffer, buflen));
+  return (vppcom_session_write_inline (session_handle, buffer, buflen, 1));
 }
 
 int
index d5f040e..be2490f 100644 (file)
@@ -36,6 +36,7 @@ typedef enum
   SESSION_IO_EVT_CT_RX,
   FIFO_EVENT_APP_TX,
   SESSION_IO_EVT_CT_TX,
+  SESSION_IO_EVT_TX_FLUSH,
   FIFO_EVENT_DISCONNECT,
   FIFO_EVENT_BUILTIN_RX,
   FIFO_EVENT_BUILTIN_TX,
@@ -518,6 +519,13 @@ transport_max_rx_enqueue (transport_connection_t * tc)
   return svm_fifo_max_enqueue (s->server_rx_fifo);
 }
 
+always_inline u32
+transport_max_tx_dequeue (transport_connection_t * tc)
+{
+  stream_session_t *s = session_get (tc->s_index, tc->thread_index);
+  return svm_fifo_max_dequeue (s->server_tx_fifo);
+}
+
 always_inline u32
 transport_rx_fifo_size (transport_connection_t * tc)
 {
index 1457668..f4e0eaa 100644 (file)
@@ -572,9 +572,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   ctx->transport_vft = transport_protocol_get_vft (tp);
   ctx->tc = session_tx_get_transport (ctx, peek_data);
   ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
-  ctx->snd_space =
-    transport_connection_snd_space (ctx->tc, vm->clib_time.last_cpu_time,
-                                   ctx->snd_mss);
+
+  if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
+    {
+      if (ctx->transport_vft->flush_data)
+       ctx->transport_vft->flush_data (ctx->tc);
+    }
+
+  ctx->snd_space = transport_connection_snd_space (ctx->tc,
+                                                  vm->
+                                                  clib_time.last_cpu_time,
+                                                  ctx->snd_mss);
   if (ctx->snd_space == 0 || ctx->snd_mss == 0)
     {
       vec_add1 (wrk->pending_event_vector, *e);
@@ -828,6 +836,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
       e = &fifo_events[i];
       switch (e->event_type)
        {
+       case SESSION_IO_EVT_TX_FLUSH:
        case FIFO_EVENT_APP_TX:
          /* Don't try to send more that one frame per dispatch cycle */
          if (n_tx_packets == VLIB_FRAME_SIZE)
index 3bfed41..10579c4 100644 (file)
@@ -61,6 +61,7 @@ typedef struct _transport_proto_vft
   u32 (*send_space) (transport_connection_t * tc);
   u32 (*tx_fifo_offset) (transport_connection_t * tc);
   void (*update_time) (f64 time_now, u8 thread_index);
+  void (*flush_data) (transport_connection_t *tconn);
 
   /*
    * Connection retrieval
index f703d63..6d6a880 100644 (file)
@@ -1123,6 +1123,16 @@ tcp_session_push_header (transport_connection_t * tconn, vlib_buffer_t * b)
   return tcp_push_header (tc, b);
 }
 
+static void
+tcp_session_flush_data (transport_connection_t * tconn)
+{
+  tcp_connection_t *tc = (tcp_connection_t *) tconn;
+  if (tc->flags & TCP_CONN_PSH_PENDING)
+    return;
+  tc->flags |= TCP_CONN_PSH_PENDING;
+  tc->psh_seq = tc->snd_una_max + transport_max_tx_dequeue (tconn) - 1;
+}
+
 /* *INDENT-OFF* */
 const static transport_proto_vft_t tcp_proto = {
   .enable = vnet_tcp_enable_disable,
@@ -1139,6 +1149,7 @@ const static transport_proto_vft_t tcp_proto = {
   .send_space = tcp_session_send_space,
   .update_time = tcp_update_time,
   .tx_fifo_offset = tcp_session_tx_fifo_offset,
+  .flush_data = tcp_session_flush_data,
   .format_connection = format_tcp_session,
   .format_listener = format_tcp_listener_session,
   .format_half_open = format_tcp_half_open_session,
index 5b235b6..46b03ac 100644 (file)
@@ -124,6 +124,7 @@ extern timer_expiration_handler tcp_timer_retransmit_syn_handler;
   _(FRXT_PENDING, "Fast-retransmit pending")   \
   _(FRXT_FIRST, "Fast-retransmit first again") \
   _(DEQ_PENDING, "Pending dequeue acked")      \
+  _(PSH_PENDING, "Pending psh packet")         \
 
 typedef enum _tcp_connection_flag_bits
 {
@@ -334,6 +335,8 @@ typedef struct _tcp_connection
   u32 last_fib_check;  /**< Last time we checked fib route for peer */
   u32 sw_if_index;     /**< Interface for the connection */
   u32 tx_fifo_size;    /**< Tx fifo size. Used to constrain cwnd */
+
+  u32 psh_seq;         /**< Add psh header for seg that includes this */
 } tcp_connection_t;
 
 /* *INDENT-OFF* */
index dff1802..f04fa5d 100644 (file)
@@ -543,6 +543,12 @@ tcp_handle_postponed_dequeues (tcp_worker_ctx_t * wrk)
       tc->burst_acked = 0;
       tcp_validate_txf_size (tc, tc->snd_una_max - tc->snd_una);
 
+      if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING))
+       {
+         if (seq_leq (tc->psh_seq, tc->snd_una))
+           tc->flags &= ~TCP_CONN_PSH_PENDING;
+       }
+
       /* If everything has been acked, stop retransmit timer
        * otherwise update. */
       tcp_retransmit_timer_update (tc);
index 74fc15f..7cee349 100644 (file)
@@ -1169,7 +1169,12 @@ tcp_push_hdr_i (tcp_connection_t * tc, vlib_buffer_t * b,
     advertise_wnd = tcp_window_to_advertise (tc, next_state);
 
   flags = tcp_make_state_flags (tc, next_state);
-
+  if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING))
+    {
+      if (seq_geq (tc->psh_seq, tc->snd_nxt)
+         && seq_lt (tc->psh_seq, tc->snd_nxt + data_len))
+       flags |= TCP_FLAG_PSH;
+    }
   th = vlib_buffer_push_tcp (b, tc->c_lcl_port, tc->c_rmt_port, tc->snd_nxt,
                             tc->rcv_nxt, tcp_hdr_opts_len, flags,
                             advertise_wnd);