session: cleanup part 1
[vpp.git] / src / vnet / session / session_api.c
index 2d20d5f..6a0d77e 100755 (executable)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015-2016 Cisco and/or its affiliates.
+ * Copyright (c) 2015-2019 Cisco and/or its affiliates.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -198,14 +198,14 @@ send_app_cut_through_registration_add (u32 api_client_index,
 }
 
 static int
-send_session_accept_callback (stream_session_t * s)
+send_session_accept_callback (session_t * s)
 {
   app_worker_t *server_wrk = app_worker_get (s->app_wrk_index);
   transport_proto_vft_t *tp_vft;
   vl_api_accept_session_t *mp;
   vl_api_registration_t *reg;
   transport_connection_t *tc;
-  stream_session_t *listener;
+  session_t *listener;
   svm_msg_q_t *vpp_queue;
   application_t *server;
 
@@ -223,8 +223,8 @@ send_session_accept_callback (stream_session_t * s)
 
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
   mp->context = server_wrk->wrk_index;
-  mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
-  mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+  mp->server_rx_fifo = pointer_to_uword (s->rx_fifo);
+  mp->server_tx_fifo = pointer_to_uword (s->tx_fifo);
 
   if (session_has_transport (s))
     {
@@ -283,7 +283,7 @@ send_session_accept_callback (stream_session_t * s)
 }
 
 static void
-send_session_disconnect_callback (stream_session_t * s)
+send_session_disconnect_callback (session_t * s)
 {
   app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
   vl_api_disconnect_session_t *mp;
@@ -305,7 +305,7 @@ send_session_disconnect_callback (stream_session_t * s)
 }
 
 static void
-send_session_reset_callback (stream_session_t * s)
+send_session_reset_callback (session_t * s)
 {
   app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
   vl_api_registration_t *reg;
@@ -327,7 +327,7 @@ send_session_reset_callback (stream_session_t * s)
 
 int
 send_session_connected_callback (u32 app_wrk_index, u32 api_context,
-                                stream_session_t * s, u8 is_fail)
+                                session_t * s, u8 is_fail)
 {
   vl_api_connect_session_reply_t *mp;
   transport_connection_t *tc;
@@ -365,8 +365,8 @@ send_session_connected_callback (u32 app_wrk_index, u32 api_context,
       clib_memcpy_fast (mp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
       mp->is_ip4 = tc->is_ip4;
       mp->lcl_port = tc->lcl_port;
-      mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
-      mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+      mp->server_rx_fifo = pointer_to_uword (s->rx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->tx_fifo);
     }
   else
     {
@@ -375,8 +375,8 @@ send_session_connected_callback (u32 app_wrk_index, u32 api_context,
       mp->lcl_port = ls->port;
       mp->vpp_event_queue_address = ls->server_evt_q;
       mp->client_event_queue_address = ls->client_evt_q;
-      mp->server_rx_fifo = pointer_to_uword (s->server_tx_fifo);
-      mp->server_tx_fifo = pointer_to_uword (s->server_rx_fifo);
+      mp->server_rx_fifo = pointer_to_uword (s->tx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->rx_fifo);
     }
 
 done:
@@ -415,14 +415,14 @@ mq_try_lock_and_alloc_msg (svm_msg_q_t * app_mq, svm_msg_q_msg_t * msg)
 }
 
 static int
-mq_send_session_accepted_cb (stream_session_t * s)
+mq_send_session_accepted_cb (session_t * s)
 {
   app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
   svm_msg_q_msg_t _msg, *msg = &_msg;
   svm_msg_q_t *vpp_queue, *app_mq;
   transport_proto_vft_t *tp_vft;
   transport_connection_t *tc;
-  stream_session_t *listener;
+  session_t *listener;
   session_accepted_msg_t *mp;
   session_event_t *evt;
   application_t *app;
@@ -436,9 +436,10 @@ mq_send_session_accepted_cb (stream_session_t * s)
   clib_memset (evt, 0, sizeof (*evt));
   evt->event_type = SESSION_CTRL_EVT_ACCEPTED;
   mp = (session_accepted_msg_t *) evt->data;
+  clib_memset (mp, 0, sizeof (*mp));
   mp->context = app->app_index;
-  mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
-  mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+  mp->server_rx_fifo = pointer_to_uword (s->rx_fifo);
+  mp->server_tx_fifo = pointer_to_uword (s->tx_fifo);
   mp->segment_handle = session_segment_handle (s);
 
   if (session_has_transport (s))
@@ -505,10 +506,10 @@ mq_send_session_accepted_cb (stream_session_t * s)
   return 0;
 }
 
-static void
-mq_send_session_disconnected_cb (stream_session_t * s)
+static inline void
+mq_send_session_close_evt (app_worker_t * app_wrk, session_handle_t sh,
+                          session_evt_type_t evt_type)
 {
-  app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
   svm_msg_q_msg_t _msg, *msg = &_msg;
   session_disconnected_msg_t *mp;
   svm_msg_q_t *app_mq;
@@ -519,58 +520,77 @@ mq_send_session_disconnected_cb (stream_session_t * s)
     return;
   evt = svm_msg_q_msg_data (app_mq, msg);
   clib_memset (evt, 0, sizeof (*evt));
-  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+  evt->event_type = evt_type;
   mp = (session_disconnected_msg_t *) evt->data;
-  mp->handle = session_handle (s);
+  mp->handle = sh;
   mp->context = app_wrk->api_client_index;
   svm_msg_q_add_and_unlock (app_mq, msg);
 }
 
+static inline void
+mq_notify_close_subscribers (u32 app_index, session_handle_t sh,
+                            svm_fifo_t * f, session_evt_type_t evt_type)
+{
+  app_worker_t *app_wrk;
+  application_t *app;
+  int i;
+
+  app = application_get (app_index);
+  if (!app)
+    return;
+
+  for (i = 0; i < f->n_subscribers; i++)
+    {
+      if (!(app_wrk = application_get_worker (app, f->subscribers[i])))
+       continue;
+      mq_send_session_close_evt (app_wrk, sh, SESSION_CTRL_EVT_DISCONNECTED);
+    }
+}
+
+static void
+mq_send_session_disconnected_cb (session_t * s)
+{
+  app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
+  session_handle_t sh = session_handle (s);
+
+  mq_send_session_close_evt (app_wrk, session_handle (s),
+                            SESSION_CTRL_EVT_DISCONNECTED);
+
+  if (svm_fifo_n_subscribers (s->rx_fifo))
+    mq_notify_close_subscribers (app_wrk->app_index, sh, s->rx_fifo,
+                                SESSION_CTRL_EVT_DISCONNECTED);
+}
+
 void
 mq_send_local_session_disconnected_cb (u32 app_wrk_index,
                                       local_session_t * ls)
 {
   app_worker_t *app_wrk = app_worker_get (app_wrk_index);
-  svm_msg_q_msg_t _msg, *msg = &_msg;
-  session_disconnected_msg_t *mp;
-  svm_msg_q_t *app_mq;
-  session_event_t *evt;
+  session_handle_t sh = application_local_session_handle (ls);
 
-  app_mq = app_wrk->event_queue;
-  if (mq_try_lock_and_alloc_msg (app_mq, msg))
-    return;
-  evt = svm_msg_q_msg_data (app_mq, msg);
-  clib_memset (evt, 0, sizeof (*evt));
-  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
-  mp = (session_disconnected_msg_t *) evt->data;
-  mp->handle = application_local_session_handle (ls);
-  mp->context = app_wrk->api_client_index;
-  svm_msg_q_add_and_unlock (app_mq, msg);
+  mq_send_session_close_evt (app_wrk, sh, SESSION_CTRL_EVT_DISCONNECTED);
+
+  if (svm_fifo_n_subscribers (ls->rx_fifo))
+    mq_notify_close_subscribers (app_wrk->app_index, sh, ls->rx_fifo,
+                                SESSION_CTRL_EVT_DISCONNECTED);
 }
 
 static void
-mq_send_session_reset_cb (stream_session_t * s)
+mq_send_session_reset_cb (session_t * s)
 {
-  app_worker_t *app = app_worker_get (s->app_wrk_index);
-  svm_msg_q_msg_t _msg, *msg = &_msg;
-  session_reset_msg_t *mp;
-  svm_msg_q_t *app_mq;
-  session_event_t *evt;
+  app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
+  session_handle_t sh = session_handle (s);
 
-  app_mq = app->event_queue;
-  if (mq_try_lock_and_alloc_msg (app_mq, msg))
-    return;
-  evt = svm_msg_q_msg_data (app_mq, msg);
-  clib_memset (evt, 0, sizeof (*evt));
-  evt->event_type = SESSION_CTRL_EVT_RESET;
-  mp = (session_reset_msg_t *) evt->data;
-  mp->handle = session_handle (s);
-  svm_msg_q_add_and_unlock (app_mq, msg);
+  mq_send_session_close_evt (app_wrk, sh, SESSION_CTRL_EVT_RESET);
+
+  if (svm_fifo_n_subscribers (s->rx_fifo))
+    mq_notify_close_subscribers (app_wrk->app_index, sh, s->rx_fifo,
+                                SESSION_CTRL_EVT_RESET);
 }
 
 static int
 mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
-                             stream_session_t * s, u8 is_fail)
+                             session_t * s, u8 is_fail)
 {
   svm_msg_q_msg_t _msg, *msg = &_msg;
   session_connected_msg_t *mp;
@@ -595,6 +615,7 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
   clib_memset (evt, 0, sizeof (*evt));
   evt->event_type = SESSION_CTRL_EVT_CONNECTED;
   mp = (session_connected_msg_t *) evt->data;
+  clib_memset (mp, 0, sizeof (*mp));
   mp->context = api_context;
 
   if (is_fail)
@@ -617,8 +638,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
       clib_memcpy_fast (mp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
       mp->is_ip4 = tc->is_ip4;
       mp->lcl_port = tc->lcl_port;
-      mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
-      mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+      mp->server_rx_fifo = pointer_to_uword (s->rx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->tx_fifo);
     }
   else
     {
@@ -636,8 +657,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
       mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
       mp->client_event_queue_address = ls->client_evt_q;
       mp->server_event_queue_address = ls->server_evt_q;
-      mp->server_rx_fifo = pointer_to_uword (s->server_tx_fifo);
-      mp->server_tx_fifo = pointer_to_uword (s->server_rx_fifo);
+      mp->server_rx_fifo = pointer_to_uword (s->tx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->rx_fifo);
     }
 
 done:
@@ -655,7 +676,7 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
   svm_msg_q_msg_t _msg, *msg = &_msg;
   svm_msg_q_t *app_mq, *vpp_evt_q;
   transport_connection_t *tc;
-  stream_session_t *ls = 0;
+  session_t *ls = 0;
   session_bound_msg_t *mp;
   app_worker_t *app_wrk;
   session_event_t *evt;
@@ -700,12 +721,13 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
       mp->lcl_is_ip4 = session_type_is_ip4 (local->session_type);
     }
 
+  vpp_evt_q = session_manager_get_vpp_event_queue (0);
+  mp->vpp_evt_q = pointer_to_uword (vpp_evt_q);
+
   if (ls && session_transport_service_type (ls) == TRANSPORT_SERVICE_CL)
     {
-      mp->rx_fifo = pointer_to_uword (ls->server_rx_fifo);
-      mp->tx_fifo = pointer_to_uword (ls->server_tx_fifo);
-      vpp_evt_q = session_manager_get_vpp_event_queue (0);
-      mp->vpp_evt_q = pointer_to_uword (vpp_evt_q);
+      mp->rx_fifo = pointer_to_uword (ls->rx_fifo);
+      mp->tx_fifo = pointer_to_uword (ls->tx_fifo);
     }
 
 done:
@@ -870,7 +892,7 @@ vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp)
   transport_connection_t *tc = 0;
   vnet_bind_args_t _a, *a = &_a;
   vl_api_bind_uri_reply_t *rmp;
-  stream_session_t *s;
+  session_t *s;
   application_t *app = 0;
   svm_msg_q_t *vpp_evt_q;
   app_worker_t *app_wrk;
@@ -911,8 +933,8 @@ done:
               clib_memcpy_fast (rmp->lcl_ip, &tc->lcl_ip, sizeof(tc->lcl_ip));
               if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL)
                 {
-                  rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
-                  rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
+                  rmp->rx_fifo = pointer_to_uword (s->rx_fifo);
+                  rmp->tx_fifo = pointer_to_uword (s->tx_fifo);
                   vpp_evt_q = session_manager_get_vpp_event_queue (0);
                   rmp->vpp_evt_q = pointer_to_uword (vpp_evt_q);
                 }
@@ -1067,7 +1089,7 @@ vl_api_reset_session_reply_t_handler (vl_api_reset_session_reply_t * mp)
   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
   app_worker_t *app_wrk;
   application_t *app;
-  stream_session_t *s;
+  session_t *s;
   u32 index, thread_index;
 
   app = application_lookup (mp->context);
@@ -1109,7 +1131,7 @@ vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
 {
   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
   local_session_t *ls;
-  stream_session_t *s;
+  session_t *s;
 
   /* Server isn't interested, kill the session */
   if (mp->retval)
@@ -1166,7 +1188,7 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
   clib_error_t *error;
   application_t *app = 0;
   app_worker_t *app_wrk;
-  stream_session_t *s;
+  session_t *s;
   transport_connection_t *tc = 0;
   ip46_address_t *ip46;
   svm_msg_q_t *vpp_evt_q;
@@ -1216,8 +1238,8 @@ done:
             clib_memcpy_fast (rmp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
             if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL)
               {
-               rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
-               rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
+               rmp->rx_fifo = pointer_to_uword (s->rx_fifo);
+               rmp->tx_fifo = pointer_to_uword (s->tx_fifo);
                vpp_evt_q = session_manager_get_vpp_event_queue (0);
                rmp->vpp_evt_q = pointer_to_uword (vpp_evt_q);
               }