tcp: cleanup connection/session fixes 31/12931/11
authorFlorin Coras <fcoras@cisco.com>
Thu, 7 Jun 2018 00:55:02 +0000 (17:55 -0700)
committerDave Barach <openvpp@barachs.net>
Mon, 11 Jun 2018 17:32:40 +0000 (17:32 +0000)
- Cleanup session state after last ack and avoid using a cleanup timer.
- Change session cleanup to free the session as opposed to waiting for
delete notify.
- When in close-wait, postpone sending the fin on close until all
outstanding data has been sent.
- Don't flush rx fifo unless in closed state

Change-Id: Ic2a4f0d5568b65c83f4b55b6c469a7b24b947f39
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/svm/svm_fifo.c
src/svm/svm_fifo.h
src/vnet/sctp/sctp_output.c
src/vnet/session-apps/proxy.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp.h
src/vnet/tcp/tcp_input.c
src/vnet/tcp/tcp_output.c

index 10c3192..47df225 100644 (file)
@@ -828,6 +828,13 @@ svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes)
   return total_drop_bytes;
 }
 
+void
+svm_fifo_dequeue_drop_all (svm_fifo_t * f)
+{
+  f->head = f->tail;
+  __sync_fetch_and_sub (&f->cursize, f->cursize);
+}
+
 u32
 svm_fifo_number_ooo_segments (svm_fifo_t * f)
 {
index 39cdcc0..90a49c0 100644 (file)
@@ -152,6 +152,7 @@ int svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_here);
 
 int svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 max_bytes, u8 * copy_here);
 int svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes);
+void svm_fifo_dequeue_drop_all (svm_fifo_t * f);
 u32 svm_fifo_number_ooo_segments (svm_fifo_t * f);
 ooo_segment_t *svm_fifo_first_ooo_segment (svm_fifo_t * f);
 void svm_fifo_init_pointers (svm_fifo_t * f, u32 pointer);
index 3c83f68..0c865ca 100644 (file)
@@ -1478,7 +1478,7 @@ sctp_prepare_data_retransmit (sctp_connection_t * sctp_conn,
    * Make sure we can retransmit something
    */
   available_bytes =
-    stream_session_tx_fifo_max_dequeue (&sctp_conn->sub_conn[idx].connection);
+    session_tx_fifo_max_dequeue (&sctp_conn->sub_conn[idx].connection);
   ASSERT (available_bytes >= offset);
   available_bytes -= offset;
   if (!available_bytes)
index 596b037..78aa0de 100644 (file)
@@ -423,7 +423,7 @@ proxy_server_attach ()
   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
-    pm->prealloc_fifos ? pm->prealloc_fifos : 1;
+    pm->prealloc_fifos ? pm->prealloc_fifos : 0;
 
   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
 
@@ -456,7 +456,7 @@ active_open_attach (void)
   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
-    pm->prealloc_fifos ? pm->prealloc_fifos : 1;
+    pm->prealloc_fifos ? pm->prealloc_fifos : 0;
 
   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
     | APP_OPTIONS_FLAGS_IS_PROXY;
index cfba31e..6559e31 100644 (file)
@@ -414,7 +414,7 @@ stream_session_no_space (transport_connection_t * tc, u32 thread_index,
 }
 
 u32
-stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
+session_tx_fifo_max_dequeue (transport_connection_t * tc)
 {
   stream_session_t *s = session_get (tc->s_index, tc->thread_index);
   if (!s->server_tx_fifo)
@@ -452,12 +452,10 @@ session_enqueue_notify (stream_session_t * s, u8 block)
   session_fifo_event_t evt;
   svm_queue_t *q;
 
-  if (PREDICT_FALSE (s->session_state >= SESSION_STATE_CLOSING))
+  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
     {
       /* Session is closed so app will never clean up. Flush rx fifo */
-      u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
-      if (to_dequeue)
-       svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
+      svm_fifo_dequeue_drop_all (s->server_rx_fifo);
       return 0;
     }
 
@@ -727,10 +725,13 @@ stream_session_disconnect_notify (transport_connection_t * tc)
   s = session_get (tc->s_index, tc->thread_index);
   server = application_get (s->app_index);
   server->cb_fns.session_disconnect_callback (s);
+  s->session_state = SESSION_STATE_CLOSING;
 }
 
 /**
  * Cleans up session and lookup table.
+ *
+ * Transport connection must still be valid.
  */
 void
 stream_session_delete (stream_session_t * s)
@@ -1065,13 +1066,23 @@ stream_session_disconnect (stream_session_t * s)
   session_manager_main_t *smm = &session_manager_main;
   session_fifo_event_t *evt;
 
-  if (!s || s->session_state >= SESSION_STATE_CLOSING)
+  if (!s)
     return;
+
+  if (s->session_state >= SESSION_STATE_CLOSING)
+    {
+      /* Session already closed. Clear the tx fifo */
+      if (s->session_state == SESSION_STATE_CLOSED)
+       svm_fifo_dequeue_drop_all (s->server_tx_fifo);
+      return;
+    }
+
   s->session_state = SESSION_STATE_CLOSING;
 
   /* If we are in the handler thread, or being called with the worker barrier
-   * held (api/cli), just append a new event pending disconnects vector. */
-  if (thread_index > 0 || !vlib_get_current_process (vlib_get_main ()))
+   * held (api/cli), just append a new event to pending disconnects vector. */
+  if ((thread_index == 0 && !vlib_get_current_process (vlib_get_main ()))
+      || thread_index == s->thread_index)
     {
       ASSERT (s->thread_index == thread_index || thread_index == 0);
       vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
@@ -1103,23 +1114,24 @@ stream_session_disconnect_transport (stream_session_t * s)
 /**
  * Cleanup transport and session state.
  *
- * Notify transport of the cleanup, wait for a delete notify to actually
- * remove the session state.
+ * Notify transport of the cleanup and free the session. This should
+ * be called only if transport reported some error and is already
+ * closed.
  */
 void
 stream_session_cleanup (stream_session_t * s)
 {
-  int rv;
-
   s->session_state = SESSION_STATE_CLOSED;
 
-  /* Delete from the main lookup table to avoid more enqueues */
-  rv = session_lookup_del_session (s);
-  if (rv)
-    clib_warning ("hash delete error, rv %d", rv);
-
+  /* Delete from main lookup table before we axe the the transport */
+  session_lookup_del_session (s);
   tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index,
                                                    s->thread_index);
+  /* Since we called cleanup, no delete notification will come. So, make
+   * sure the session is properly freed. */
+  segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
+                                s->server_tx_fifo);
+  session_free (s);
 }
 
 transport_service_type_t
index 6908a56..b57053c 100644 (file)
@@ -499,7 +499,7 @@ session_clone_safe (u32 session_index, u32 thread_index)
 
 transport_connection_t *session_get_transport (stream_session_t * s);
 
-u32 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc);
+u32 session_tx_fifo_max_dequeue (transport_connection_t * tc);
 
 int
 session_enqueue_stream_connection (transport_connection_t * tc,
index d6fcd91..2eea30b 100644 (file)
@@ -799,7 +799,8 @@ skip_dequeue:
            }
          break;
        case FIFO_EVENT_DISCONNECT:
-         /* Make sure stream disconnects run after the pending list is drained */
+         /* Make sure stream disconnects run after the pending list is
+          * drained */
          s0 = session_get_from_handle (e0->session_handle);
          if (!e0->postponed)
            {
@@ -807,11 +808,9 @@ skip_dequeue:
              vec_add1 (smm->pending_disconnects[thread_index], *e0);
              continue;
            }
-         /* If tx queue is still not empty, wait a bit */
-         if (svm_fifo_max_dequeue (s0->server_tx_fifo)
-             && e0->postponed < 200)
+         /* If tx queue is still not empty, wait */
+         if (svm_fifo_max_dequeue (s0->server_tx_fifo))
            {
-             e0->postponed += 1;
              vec_add1 (smm->pending_disconnects[thread_index], *e0);
              continue;
            }
index 15ac7d3..2a696f1 100644 (file)
@@ -277,19 +277,19 @@ tcp_connection_reset (tcp_connection_t * tc)
       tcp_connection_cleanup (tc);
       break;
     case TCP_STATE_ESTABLISHED:
+      tcp_connection_timers_reset (tc);
+      /* Set the cleanup timer, in case the session layer/app don't
+       * cleanly close the connection */
+      tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
       stream_session_reset_notify (&tc->connection);
-      /* fall through */
+      break;
     case TCP_STATE_CLOSE_WAIT:
     case TCP_STATE_FIN_WAIT_1:
     case TCP_STATE_FIN_WAIT_2:
     case TCP_STATE_CLOSING:
       tc->state = TCP_STATE_CLOSED;
       TCP_EVT_DBG (TCP_EVT_STATE_CHANGE, tc);
-
-      /* Make sure all timers are cleared */
       tcp_connection_timers_reset (tc);
-
-      /* Wait for cleanup from session layer but not forever */
       tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
       break;
     case TCP_STATE_CLOSED:
@@ -326,17 +326,22 @@ tcp_connection_close (tcp_connection_t * tc)
       tc->state = TCP_STATE_FIN_WAIT_1;
       break;
     case TCP_STATE_ESTABLISHED:
-      if (!stream_session_tx_fifo_max_dequeue (&tc->connection))
+      if (!session_tx_fifo_max_dequeue (&tc->connection))
        tcp_send_fin (tc);
       else
        tc->flags |= TCP_CONN_FINPNDG;
       tc->state = TCP_STATE_FIN_WAIT_1;
       break;
     case TCP_STATE_CLOSE_WAIT:
-      tcp_send_fin (tc);
-      tcp_connection_timers_reset (tc);
-      tc->state = TCP_STATE_LAST_ACK;
-      tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_2MSL_TIME);
+      if (!session_tx_fifo_max_dequeue (&tc->connection))
+       {
+         tcp_send_fin (tc);
+         tcp_connection_timers_reset (tc);
+         tc->state = TCP_STATE_LAST_ACK;
+         tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_2MSL_TIME);
+       }
+      else
+       tc->flags |= TCP_CONN_FINPNDG;
       break;
     case TCP_STATE_FIN_WAIT_1:
       tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_2MSL_TIME);
@@ -367,11 +372,9 @@ tcp_session_cleanup (u32 conn_index, u32 thread_index)
   tcp_connection_t *tc;
   tc = tcp_connection_get (conn_index, thread_index);
   tcp_connection_timers_reset (tc);
-
-  /* Wait for the session tx events to clear */
   tc->state = TCP_STATE_CLOSED;
   TCP_EVT_DBG (TCP_EVT_STATE_CHANGE, tc);
-  tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
+  tcp_connection_cleanup (tc);
 }
 
 /**
index 10aa721..3a31234 100644 (file)
@@ -787,7 +787,7 @@ tcp_timer_is_active (tcp_connection_t * tc, tcp_timers_e timer)
 
 #define tcp_validate_txf_size(_tc, _a)                                         \
   ASSERT(_tc->state != TCP_STATE_ESTABLISHED                           \
-        || stream_session_tx_fifo_max_dequeue (&_tc->connection) >= _a)
+        || session_tx_fifo_max_dequeue (&_tc->connection) >= _a)
 
 void
 scoreboard_remove_hole (sack_scoreboard_t * sb,
index 04612f8..f77d484 100644 (file)
@@ -2474,7 +2474,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              if (tc0->flags & TCP_CONN_FINPNDG)
                {
                  /* TX fifo finally drained */
-                 if (!stream_session_tx_fifo_max_dequeue (&tc0->connection))
+                 if (!session_tx_fifo_max_dequeue (&tc0->connection))
                    tcp_send_fin (tc0);
                }
              /* If FIN is ACKed */
@@ -2507,6 +2507,18 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
                  tcp_maybe_inc_counter (rcv_process, error0, 1);
                  goto drop;
                }
+             if (tc0->flags & TCP_CONN_FINPNDG)
+               {
+                 /* TX fifo finally drained */
+                 if (!session_tx_fifo_max_dequeue (&tc0->connection))
+                   {
+                     tcp_send_fin (tc0);
+                     tcp_connection_timers_reset (tc0);
+                     tc0->state = TCP_STATE_LAST_ACK;
+                     tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE,
+                                       TCP_2MSL_TIME);
+                   }
+               }
              break;
            case TCP_STATE_CLOSING:
              /* In addition to the processing for the ESTABLISHED state, if
@@ -2545,13 +2557,9 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
 
              tc0->state = TCP_STATE_CLOSED;
              TCP_EVT_DBG (TCP_EVT_STATE_CHANGE, tc0);
-             tcp_connection_timers_reset (tc0);
-
-             /* Don't delete the connection/session yet. Instead, wait a
-              * reasonable amount of time until the pipes are cleared. In
-              * particular, this makes sure that we won't have dead sessions
-              * when processing events on the tx path */
-             tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
+             /* Delete the connection/session since the pipes should be
+              * clear by now */
+             tcp_connection_del (tc0);
 
              goto drop;
 
index 91c0e90..8a88bf8 100644 (file)
@@ -390,6 +390,7 @@ tcp_make_options (tcp_connection_t * tc, tcp_options_t * opts,
     case TCP_STATE_ESTABLISHED:
     case TCP_STATE_FIN_WAIT_1:
     case TCP_STATE_CLOSED:
+    case TCP_STATE_CLOSE_WAIT:
       return tcp_make_established_options (tc, opts);
     case TCP_STATE_SYN_RCVD:
       return tcp_make_synack_options (tc, opts);
@@ -1213,7 +1214,7 @@ tcp_prepare_retransmit_segment (tcp_connection_t * tc, u32 offset,
   /*
    * Make sure we can retransmit something
    */
-  available_bytes = stream_session_tx_fifo_max_dequeue (&tc->connection);
+  available_bytes = session_tx_fifo_max_dequeue (&tc->connection);
   ASSERT (available_bytes >= offset);
   available_bytes -= offset;
   if (!available_bytes)
@@ -1554,7 +1555,7 @@ tcp_timer_persist_handler (u32 index)
       || tc->snd_wnd > tc->snd_mss || tcp_in_recovery (tc))
     return;
 
-  available_bytes = stream_session_tx_fifo_max_dequeue (&tc->connection);
+  available_bytes = session_tx_fifo_max_dequeue (&tc->connection);
   offset = tc->snd_una_max - tc->snd_una;
 
   /* Reprogram persist if no new bytes available to send. We may have data