dev: move bus code to bus/
[vpp.git] / src / vnet / session / application_local.c
index 192c22b..3cb743d 100644 (file)
@@ -194,6 +194,12 @@ ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client)
     }
 }
 
+static inline u64
+ct_client_seg_handle (u64 server_sh, u32 client_wrk_index)
+{
+  return (((u64) client_wrk_index << 56) | server_sh);
+}
+
 static void
 ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
                          svm_fifo_t *tx_fifo)
@@ -314,7 +320,8 @@ ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
       segment_manager_t *csm;
       csm = app_worker_get_connect_segment_manager (app_wrk);
       if (!segment_manager_app_detached (csm))
-       app_worker_del_segment_notify (app_wrk, ct->segment_handle);
+       app_worker_del_segment_notify (
+         app_wrk, ct_client_seg_handle (ct->segment_handle, ct->client_wrk));
     }
 
   /* Notify server app and free segment */
@@ -379,6 +386,7 @@ ct_session_connect_notify (session_t *ss, session_error_t err)
   session_set_state (cs, SESSION_STATE_CONNECTING);
   cs->app_wrk_index = client_wrk->wrk_index;
   cs->connection_index = cct->c_c_index;
+  cs->opaque = opaque;
   cct->c_s_index = cs->session_index;
 
   /* This will allocate fifos for the session. They won't be used for
@@ -454,11 +462,11 @@ ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
                  segment_manager_t *sm, u32 client_wrk_index)
 {
   u32 seg_ctx_index = ~0, sm_index, pair_bytes;
+  u64 seg_size, seg_handle, client_seg_handle;
   segment_manager_props_t *props;
   const u32 margin = 16 << 10;
   ct_segments_ctx_t *seg_ctx;
   app_worker_t *client_wrk;
-  u64 seg_size, seg_handle;
   application_t *server;
   ct_segment_t *ct_seg;
   uword *spp;
@@ -520,7 +528,11 @@ ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
     goto error;
 
   client_wrk = app_worker_get (client_wrk_index);
-  if (app_worker_add_segment_notify (client_wrk, seg_handle))
+  /* Make sure client workers do not have overlapping segment handles.
+   * Ideally, we should attach fs to client worker segment manager and
+   * create a new handle but that's not currently possible. */
+  client_seg_handle = ct_client_seg_handle (seg_handle, client_wrk_index);
+  if (app_worker_add_segment_notify (client_wrk, client_seg_handle))
     {
       app_worker_del_segment_notify (server_wrk, seg_handle);
       goto error;
@@ -737,7 +749,8 @@ ct_accept_one (u32 thread_index, u32 ho_index)
   cct->client_tx_fifo = ss->rx_fifo;
   cct->client_rx_fifo->refcnt++;
   cct->client_tx_fifo->refcnt++;
-  cct->segment_handle = sct->segment_handle;
+  cct->segment_handle =
+    ct_client_seg_handle (sct->segment_handle, cct->client_wrk);
 
   session_set_state (ss, SESSION_STATE_ACCEPTING);
   if (app_worker_accept_notify (server_wrk, ss))
@@ -982,7 +995,7 @@ ct_session_connect (transport_endpoint_cfg_t * tep)
     goto global_scope;
 
   ll = listen_session_get_from_handle (lh);
-  al = app_listener_get_w_session (ll);
+  al = app_listener_get (ll->al_index);
 
   /*
    * Break loop if rule in local table points to connecting app. This
@@ -1011,8 +1024,12 @@ global_scope:
   ll = session_lookup_listener_wildcard (table_index, sep);
 
   /* Avoid connecting app to own listener */
-  if (ll && ll->app_index != app->app_index)
-    return ct_connect (app_wrk, ll, sep_ext);
+  if (ll)
+    {
+      al = app_listener_get (ll->al_index);
+      if (al->app_index != app->app_index)
+       return ct_connect (app_wrk, ll, sep_ext);
+    }
 
   /* Failed to connect but no error */
   return SESSION_E_LOCAL_CONNECT;
@@ -1021,6 +1038,8 @@ global_scope:
 static inline int
 ct_close_is_reset (ct_connection_t *ct, session_t *s)
 {
+  if (ct->flags & CT_CONN_F_RESET)
+    return 1;
   if (ct->flags & CT_CONN_F_CLIENT)
     return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0);
   else
@@ -1113,10 +1132,10 @@ ct_handle_cleanups (void *args)
       clib_fifo_sub2 (wrk->pending_cleanups, req);
       ct = ct_connection_get (req->ct_index, thread_index);
       s = session_get (ct->c_s_index, ct->c_thread_index);
-      if (!svm_fifo_has_event (s->tx_fifo))
-       ct_session_postponed_cleanup (ct);
-      else
+      if (svm_fifo_has_event (s->tx_fifo) || (s->flags & SESSION_F_RX_EVT))
        clib_fifo_add1 (wrk->pending_cleanups, *req);
+      else
+       ct_session_postponed_cleanup (ct);
       n_to_handle -= 1;
     }
 
@@ -1181,6 +1200,15 @@ ct_session_close (u32 ct_index, u32 thread_index)
   ct_program_cleanup (ct);
 }
 
+static void
+ct_session_reset (u32 ct_index, u32 thread_index)
+{
+  ct_connection_t *ct;
+  ct = ct_connection_get (ct_index, thread_index);
+  ct->flags |= CT_CONN_F_RESET;
+  ct_session_close (ct_index, thread_index);
+}
+
 static transport_connection_t *
 ct_session_get (u32 ct_index, u32 thread_index)
 {
@@ -1333,7 +1361,6 @@ ct_enable_disable (vlib_main_t * vm, u8 is_en)
   return 0;
 }
 
-/* *INDENT-OFF* */
 static const transport_proto_vft_t cut_thru_proto = {
   .enable = ct_enable_disable,
   .start_listen = ct_start_listen,
@@ -1345,6 +1372,7 @@ static const transport_proto_vft_t cut_thru_proto = {
   .cleanup_ho = ct_cleanup_ho,
   .connect = ct_session_connect,
   .close = ct_session_close,
+  .reset = ct_session_reset,
   .custom_tx = ct_custom_tx,
   .app_rx_evt = ct_app_rx_evt,
   .format_listener = format_ct_listener,
@@ -1357,7 +1385,6 @@ static const transport_proto_vft_t cut_thru_proto = {
     .service_type = TRANSPORT_SERVICE_VC,
   },
 };
-/* *INDENT-ON* */
 
 static inline int
 ct_session_can_tx (session_t *s)
@@ -1382,6 +1409,7 @@ ct_session_tx (session_t * s)
   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
     return 0;
+  peer_s->flags |= SESSION_F_RX_EVT;
   return session_enqueue_notify (peer_s);
 }