session: session enable in multiworker
[vpp.git] / src / vnet / session / session.c
index 7b53a47..163f4d2 100644 (file)
@@ -41,17 +41,11 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
       svm_msg_q_unlock (mq);
       return -2;
     }
-  msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
-  if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
-    {
-      svm_msg_q_unlock (mq);
-      return -2;
-    }
-  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
-  evt->event_type = evt_type;
   switch (evt_type)
     {
     case SESSION_CTRL_EVT_RPC:
+      msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+      evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
       evt->rpc_args.fp = data;
       evt->rpc_args.arg = args;
       break;
@@ -59,10 +53,15 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
     case SESSION_IO_EVT_TX:
     case SESSION_IO_EVT_TX_FLUSH:
     case SESSION_IO_EVT_BUILTIN_RX:
+      msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+      evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
       evt->session_index = *(u32 *) data;
       break;
     case SESSION_IO_EVT_BUILTIN_TX:
     case SESSION_CTRL_EVT_CLOSE:
+    case SESSION_CTRL_EVT_RESET:
+      msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+      evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
       evt->session_handle = session_handle ((session_t *) data);
       break;
     default:
@@ -70,6 +69,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
       svm_msg_q_unlock (mq);
       return -1;
     }
+  evt->event_type = evt_type;
 
   svm_msg_q_add_and_unlock (mq, &msg);
   return 0;
@@ -92,10 +92,10 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
 int
 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
 {
-  /* only event supported for now is disconnect */
-  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE);
-  return session_send_evt_to_thread (s, 0, s->thread_index,
-                                    SESSION_CTRL_EVT_CLOSE);
+  /* only events supported are disconnect and reset */
+  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
+         || evt_type == SESSION_CTRL_EVT_RESET);
+  return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
 }
 
 void
@@ -125,6 +125,7 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
 
   s = session_get (tc->s_index, tc->thread_index);
   ASSERT (s->thread_index == vlib_get_thread_index ());
+  ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
   if (!(s->flags & SESSION_F_CUSTOM_TX))
     {
       s->flags |= SESSION_F_CUSTOM_TX;
@@ -144,7 +145,7 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
 }
 
 static void
-session_program_transport_close (session_t * s)
+session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
 {
   u32 thread_index = vlib_get_thread_index ();
   session_evt_elt_t *elt;
@@ -158,10 +159,10 @@ session_program_transport_close (session_t * s)
       elt = session_evt_alloc_ctrl (wrk);
       clib_memset (&elt->evt, 0, sizeof (session_event_t));
       elt->evt.session_handle = session_handle (s);
-      elt->evt.event_type = SESSION_CTRL_EVT_CLOSE;
+      elt->evt.event_type = evt;
     }
   else
-    session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE);
+    session_send_ctrl_evt_to_thread (s, evt);
 }
 
 session_t *
@@ -568,10 +569,10 @@ session_enqueue_notify (session_t * s)
 static void
 session_enqueue_notify_rpc (void *arg)
 {
-  session_handle_t sh = (session_handle_t) arg;
+  u32 session_index = pointer_to_uword (arg);
   session_t *s;
 
-  s = session_get_from_handle_if_valid (sh);
+  s = session_get_if_valid (session_index, vlib_get_thread_index ());
   if (!s)
     return;
 
@@ -586,8 +587,15 @@ void
 session_enqueue_notify_thread (session_handle_t sh)
 {
   u32 thread_index = session_thread_from_handle (sh);
+  u32 session_index = session_index_from_handle (sh);
+
+  /*
+   * Pass session index (u32) as opposed to handle (u64) in case pointers
+   * are not 64-bit.
+   */
   session_send_rpc_evt_to_thread (thread_index,
-                                 session_enqueue_notify_rpc, (void *) sh);
+                                 session_enqueue_notify_rpc,
+                                 uword_to_pointer (session_index, void *));
 }
 
 int
@@ -595,6 +603,8 @@ session_dequeue_notify (session_t * s)
 {
   app_worker_t *app_wrk;
 
+  svm_fifo_clear_deq_ntf (s->tx_fifo);
+
   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
   if (PREDICT_FALSE (!app_wrk))
     return -1;
@@ -607,8 +617,6 @@ session_dequeue_notify (session_t * s)
     return session_notify_subscribers (app_wrk->app_index, s,
                                       s->tx_fifo, SESSION_IO_EVT_TX);
 
-  svm_fifo_clear_deq_ntf (s->tx_fifo);
-
   return 0;
 }
 
@@ -791,6 +799,7 @@ session_dgram_connect_notify (transport_connection_t * tc,
   new_s->rx_fifo->master_session_index = new_s->session_index;
   new_s->rx_fifo->master_thread_index = new_s->thread_index;
   new_s->session_state = SESSION_STATE_READY;
+  new_s->flags |= SESSION_F_IS_MIGRATING;
   session_lookup_add_connection (tc, session_handle (new_s));
 
   /*
@@ -855,7 +864,8 @@ session_transport_delete_notify (transport_connection_t * tc)
       /* Session was created but accept notification was not yet sent to the
        * app. Cleanup everything. */
       session_lookup_del_session (s);
-      session_free_w_fifos (s);
+      segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
+      session_free (s);
       break;
     case SESSION_STATE_ACCEPTING:
     case SESSION_STATE_TRANSPORT_CLOSING:
@@ -878,10 +888,10 @@ session_transport_delete_notify (transport_connection_t * tc)
        * session is just removed because both transport and app have
        * confirmed the close*/
       session_lookup_del_session (s);
-      s->session_state = SESSION_STATE_CLOSED;
+      s->session_state = SESSION_STATE_TRANSPORT_DELETED;
       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
       svm_fifo_dequeue_drop_all (s->tx_fifo);
-      session_program_transport_close (s);
+      session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
       break;
     case SESSION_STATE_TRANSPORT_DELETED:
       break;
@@ -1187,12 +1197,26 @@ session_close (session_t * s)
        * acknowledge the close */
       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
          || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
-       session_program_transport_close (s);
+       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
       return;
     }
 
   s->session_state = SESSION_STATE_CLOSING;
-  session_program_transport_close (s);
+  session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
+}
+
+/**
+ * Force a close without waiting for data to be flushed
+ */
+void
+session_reset (session_t * s)
+{
+  if (s->session_state >= SESSION_STATE_CLOSING)
+    return;
+  /* Drop all outstanding tx data */
+  svm_fifo_dequeue_drop_all (s->tx_fifo);
+  s->session_state = SESSION_STATE_CLOSING;
+  session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
 }
 
 /**
@@ -1227,6 +1251,26 @@ session_transport_close (session_t * s)
                   s->thread_index);
 }
 
+/**
+ * Force transport close
+ */
+void
+session_transport_reset (session_t * s)
+{
+  if (s->session_state >= SESSION_STATE_APP_CLOSED)
+    {
+      if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
+       s->session_state = SESSION_STATE_CLOSED;
+      else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
+       session_free_w_fifos (s);
+      return;
+    }
+
+  s->session_state = SESSION_STATE_APP_CLOSED;
+  transport_reset (session_get_transport_proto (s), s->connection_index,
+                  s->thread_index);
+}
+
 /**
  * Cleanup transport and session state.
  *
@@ -1237,12 +1281,11 @@ session_transport_close (session_t * s)
 void
 session_transport_cleanup (session_t * s)
 {
-  s->session_state = SESSION_STATE_CLOSED;
-
   /* Delete from main lookup table before we axe the the transport */
   session_lookup_del_session (s);
-  transport_cleanup (session_get_transport_proto (s), s->connection_index,
-                    s->thread_index);
+  if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
+    transport_cleanup (session_get_transport_proto (s), s->connection_index,
+                      s->thread_index);
   /* Since we called cleanup, no delete notification will come. So, make
    * sure the session is properly freed. */
   session_free_w_fifos (s);
@@ -1451,6 +1494,7 @@ session_manager_main_enable (vlib_main_t * vm)
       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->vm = vlib_mains[i];
       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
+      wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
 
       if (num_threads > 1)
        clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
@@ -1494,7 +1538,6 @@ session_manager_main_enable (vlib_main_t * vm)
 
   /* Enable transports */
   transport_enable_disable (vm, 1);
-  transport_init_tx_pacers_period ();
   return 0;
 }
 
@@ -1566,10 +1609,25 @@ session_manager_main_init (vlib_main_t * vm)
   smm->evt_qs_segment_size = 1 << 20;
 #endif
   smm->is_enabled = 0;
+  smm->session_enable_asap = 0;
+  return 0;
+}
+
+static clib_error_t *
+session_main_init (vlib_main_t * vm)
+{
+  session_main_t *smm = &session_main;
+  if (smm->session_enable_asap)
+    {
+      vlib_worker_thread_barrier_sync (vm);
+      vnet_session_enable_disable (vm, 1 /* is_en */ );
+      vlib_worker_thread_barrier_release (vm);
+    }
   return 0;
 }
 
 VLIB_INIT_FUNCTION (session_manager_main_init);
+VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_init);
 
 static clib_error_t *
 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
@@ -1650,6 +1708,8 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input)
       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
                         &smm->evt_qs_segment_size))
        ;
+      else if (unformat (input, "enable"))
+       smm->session_enable_asap = 1;
       else
        return clib_error_return (0, "unknown input `%U'",
                                  format_unformat_error, input);