session: Add session_sapi_enable_disable
[vpp.git] / src / vnet / session / session.c
index 1fac5ed..f33dbea 100644 (file)
@@ -97,9 +97,10 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
 int
 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
 {
-  /* only events supported are disconnect and reset */
-  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
-         || evt_type == SESSION_CTRL_EVT_RESET);
+  /* only events supported are disconnect, shutdown and reset */
+  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
+         evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
+         evt_type == SESSION_CTRL_EVT_RESET);
   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
 }
 
@@ -302,21 +303,89 @@ session_delete (session_t * s)
 void
 session_cleanup_half_open (session_handle_t ho_handle)
 {
-  session_t *s = session_get_from_handle (ho_handle);
-  transport_cleanup_half_open (session_get_transport_proto (s),
-                              s->connection_index);
+  session_t *ho = session_get_from_handle (ho_handle);
+
+  /* App transports can migrate their half-opens */
+  if (ho->flags & SESSION_F_IS_MIGRATING)
+    {
+      /* Session still migrating, move to closed state to signal that the
+       * session should be removed. */
+      if (ho->connection_index == ~0)
+       {
+         ho->session_state = SESSION_STATE_CLOSED;
+         return;
+       }
+      /* Migrated transports are no longer half-opens */
+      transport_cleanup (session_get_transport_proto (ho),
+                        ho->connection_index, ho->app_index /* overloaded */);
+    }
+  else
+    transport_cleanup_half_open (session_get_transport_proto (ho),
+                                ho->connection_index);
+  session_free (ho);
+}
+
+static void
+session_half_open_free (session_t *ho)
+{
+  app_worker_t *app_wrk;
+
+  ASSERT (vlib_get_thread_index () <= 1);
+  app_wrk = app_worker_get (ho->app_wrk_index);
+  app_worker_del_half_open (app_wrk, ho);
+  session_free (ho);
+}
+
+static void
+session_half_open_free_rpc (void *args)
+{
+  session_t *ho = ho_session_get (pointer_to_uword (args));
+  session_half_open_free (ho);
 }
 
 void
 session_half_open_delete_notify (transport_connection_t *tc)
 {
-  app_worker_t *app_wrk;
-  session_t *s;
+  /* Notification from ctrl thread accepted without rpc */
+  if (!tc->thread_index)
+    {
+      session_half_open_free (ho_session_get (tc->s_index));
+    }
+  else
+    {
+      void *args = uword_to_pointer ((uword) tc->s_index, void *);
+      session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
+                                           args);
+    }
+}
 
-  s = session_get (tc->s_index, tc->thread_index);
-  app_wrk = app_worker_get (s->app_wrk_index);
-  app_worker_del_half_open (app_wrk, s->ho_index);
-  session_free (s);
+void
+session_half_open_migrate_notify (transport_connection_t *tc)
+{
+  session_t *ho;
+
+  ho = ho_session_get (tc->s_index);
+  ho->flags |= SESSION_F_IS_MIGRATING;
+  ho->connection_index = ~0;
+}
+
+int
+session_half_open_migrated_notify (transport_connection_t *tc)
+{
+  session_t *ho;
+
+  ho = ho_session_get (tc->s_index);
+
+  /* App probably detached so the half-open must be cleaned up */
+  if (ho->session_state == SESSION_STATE_CLOSED)
+    {
+      session_half_open_delete_notify (tc);
+      return -1;
+    }
+  ho->connection_index = tc->c_index;
+  /* Overload app index for half-open with new thread */
+  ho->app_index = tc->thread_index;
+  return 0;
 }
 
 session_t *
@@ -338,7 +407,7 @@ session_alloc_for_connection (transport_connection_t * tc)
   return s;
 }
 
-static session_t *
+session_t *
 session_alloc_for_half_open (transport_connection_t *tc)
 {
   session_t *s;
@@ -1087,7 +1156,9 @@ session_transport_closed_notify (transport_connection_t * tc)
     return;
 
   /* Transport thinks that app requested close but it actually didn't.
-   * Can happen for tcp if fin and rst are received in close succession. */
+   * Can happen for tcp:
+   * 1)if fin and rst are received in close succession.
+   * 2)if app shutdown the connection.  */
   if (s->session_state == SESSION_STATE_READY)
     {
       session_transport_closing_notify (tc);
@@ -1216,13 +1287,11 @@ session_dgram_accept (transport_connection_t * tc, u32 listener_index,
       return rv;
     }
 
-  s->session_state = SESSION_STATE_READY;
-
   return 0;
 }
 
 int
-session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
+session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
 {
   transport_connection_t *tc;
   transport_endpoint_cfg_t *tep;
@@ -1242,7 +1311,7 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
 
   /* For dgram type of service, allocate session and fifos now */
-  app_wrk = app_worker_get (app_wrk_index);
+  app_wrk = app_worker_get (rmt->app_wrk_index);
   s = session_alloc_for_connection (tc);
   s->app_wrk_index = app_wrk->wrk_index;
   s->session_state = SESSION_STATE_OPENED;
@@ -1253,17 +1322,19 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
     }
 
   sh = session_handle (s);
+  *rsh = sh;
+
   session_lookup_add_connection (tc, sh);
-  return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque);
+  return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
 }
 
 int
-session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
+session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
 {
   transport_connection_t *tc;
   transport_endpoint_cfg_t *tep;
   app_worker_t *app_wrk;
-  session_t *s;
+  session_t *ho;
   int rv;
 
   tep = session_endpoint_to_transport_cfg (rmt);
@@ -1276,7 +1347,7 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
 
   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
 
-  app_wrk = app_worker_get (app_wrk_index);
+  app_wrk = app_worker_get (rmt->app_wrk_index);
 
   /* If transport offers a vc service, only allocate established
    * session once the connection has been established.
@@ -1286,29 +1357,30 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
    * session on transport notification, and to cleanup the half-open
    * session if the app detaches before connection establishment.
    */
-  s = session_alloc_for_half_open (tc);
-  s->app_wrk_index = app_wrk->wrk_index;
-  s->ho_index = app_worker_add_half_open (app_wrk, session_handle (s));
-  s->opaque = opaque;
+  ho = session_alloc_for_half_open (tc);
+  ho->app_wrk_index = app_wrk->wrk_index;
+  ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
+  ho->opaque = rmt->opaque;
+  *rsh = session_handle (ho);
 
-  session_lookup_add_half_open (tc, tc->c_index);
+  if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
+    session_lookup_add_half_open (tc, tc->c_index);
 
   return 0;
 }
 
 int
-session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
+session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
 {
-  session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
-  transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
-
-  sep->app_wrk_index = app_wrk_index;
-  sep->opaque = opaque;
+  transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
 
+  /* Not supported for now */
+  *rsh = SESSION_INVALID_HANDLE;
   return transport_connect (rmt->transport_proto, tep_cfg);
 }
 
-typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
+typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
+                                       session_handle_t *);
 
 /* *INDENT-OFF* */
 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
@@ -1332,11 +1404,11 @@ static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
  *              on open completion.
  */
 int
-session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
+session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
 {
   transport_service_type_t tst;
   tst = transport_protocol_service_type (rmt->transport_proto);
-  return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
+  return session_open_srv_fns[tst](rmt, rsh);
 }
 
 /**
@@ -1398,6 +1470,20 @@ session_stop_listen (session_t * s)
   return 0;
 }
 
+/**
+ * Initialize session half-closing procedure.
+ *
+ * Note that half-closing will not change the state of the session.
+ */
+void
+session_half_close (session_t *s)
+{
+  if (!s)
+    return;
+
+  session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
+}
+
 /**
  * Initialize session closing procedure.
  *
@@ -1438,6 +1524,24 @@ session_reset (session_t * s)
   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
 }
 
+/**
+ * Notify transport the session can be half-disconnected.
+ *
+ * Must be called from the session's thread.
+ */
+void
+session_transport_half_close (session_t *s)
+{
+  /* Only READY session can be half-closed */
+  if (s->session_state != SESSION_STATE_READY)
+    {
+      return;
+    }
+
+  transport_half_close (session_get_transport_proto (s), s->connection_index,
+                       s->thread_index);
+}
+
 /**
  * Notify transport the session can be disconnected. This should eventually
  * result in a delete notification that allows us to cleanup session state.
@@ -1711,9 +1815,11 @@ session_manager_main_enable (vlib_main_t * vm)
       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
+      wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->vm = vlib_get_main_by_index (i);
       wrk->last_vlib_time = vlib_time_now (vm);
       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
+      wrk->timerfd = -1;
       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
 
       if (num_threads > 1)
@@ -1793,6 +1899,7 @@ session_node_enable_disable (u8 is_en)
          vlib_node_set_state (vm, session_queue_node.index, mstate);
          if (is_en)
            {
+             session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
              vlib_node_set_state (vm, session_queue_process_node.index,
                                   state);
              n = vlib_get_node (vm, session_queue_process_node.index);
@@ -1856,7 +1963,7 @@ session_main_init (vlib_main_t * vm)
   smm->evt_qs_segment_size = 1 << 20;
 #endif
 
-  smm->last_transport_proto_type = TRANSPORT_PROTO_DTLS;
+  smm->last_transport_proto_type = TRANSPORT_PROTO_SRTP;
 
   return 0;
 }
@@ -1962,7 +2069,7 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input)
       else if (unformat (input, "segment-baseva 0x%lx", &smm->session_baseva))
        ;
       else if (unformat (input, "use-app-socket-api"))
-       appns_sapi_enable ();
+       (void) appns_sapi_enable_disable (1 /* is_enable */);
       else if (unformat (input, "poll-main"))
        smm->poll_main = 1;
       else if (unformat (input, "use-private-rx-mqs"))