session: handle close before app accept reply
[vpp.git] / src / vnet / session / session.c
index 3dd80ad..b48459d 100644 (file)
@@ -194,7 +194,6 @@ session_alloc_for_connection (transport_connection_t * tc)
 
   s = session_alloc (thread_index);
   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
-  s->session_state = SESSION_STATE_CONNECTING;
   s->enqueue_epoch = (u64) ~ 0;
 
   /* Attach transport to session and vice versa */
@@ -629,6 +628,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
        }
       else
        {
+         new_s->session_state = SESSION_STATE_CONNECTING;
          new_s->app_wrk_index = app_wrk->wrk_index;
          new_si = new_s->session_index;
          new_ti = new_s->thread_index;
@@ -723,7 +723,7 @@ session_dgram_connect_notify (transport_connection_t * tc,
   return 0;
 }
 
-void
+int
 stream_session_accept_notify (transport_connection_t * tc)
 {
   app_worker_t *app_wrk;
@@ -733,9 +733,9 @@ stream_session_accept_notify (transport_connection_t * tc)
   s = session_get (tc->s_index, tc->thread_index);
   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
   if (!app_wrk)
-    return;
+    return -1;
   app = application_get (app_wrk->app_index);
-  app->cb_fns.session_accept_callback (s);
+  return app->cb_fns.session_accept_callback (s);
 }
 
 /**
@@ -805,7 +805,10 @@ stream_session_delete_notify (transport_connection_t * tc)
     case SESSION_STATE_TRANSPORT_CLOSING:
       /* If transport finishes or times out before we get a reply
        * from the app, do the whole disconnect since we might still
-       * have lingering events */
+       * have lingering events. Cleanup session table in advance
+       * because transport will soon be closed and closed sessions
+       * are assumed to have been removed from the lookup table */
+      session_lookup_del_session (s);
       stream_session_disconnect (s);
       s->session_state = SESSION_STATE_CLOSED;
       break;
@@ -819,8 +822,7 @@ stream_session_delete_notify (transport_connection_t * tc)
       stream_session_delete (s);
       break;
     default:
-      /* Assume connection was not yet added the lookup table */
-      session_free_w_fifos (s);
+      stream_session_delete (s);
       break;
     }
 }
@@ -835,7 +837,10 @@ stream_session_reset_notify (transport_connection_t * tc)
   app_worker_t *app_wrk;
   application_t *app;
   s = session_get (tc->s_index, tc->thread_index);
-  s->session_state = SESSION_STATE_CLOSED;
+  svm_fifo_dequeue_drop_all (s->server_tx_fifo);
+  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+    return;
+  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
   app_wrk = app_worker_get (s->app_wrk_index);
   app = application_get (app_wrk->app_index);
   app->cb_fns.session_reset_callback (s);
@@ -869,7 +874,7 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index,
   if (notify)
     {
       application_t *app = application_get (app_wrk->app_index);
-      app->cb_fns.session_accept_callback (s);
+      return app->cb_fns.session_accept_callback (s);
     }
 
   return 0;
@@ -1081,7 +1086,7 @@ stream_session_disconnect (stream_session_t * s)
    * held, just append a new event to pending disconnects vector. */
   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
     {
-      wrk = session_manager_get_worker (thread_index);
+      wrk = session_manager_get_worker (s->thread_index);
       vec_add2 (wrk->pending_disconnects, evt, 1);
       clib_memset (evt, 0, sizeof (*evt));
       evt->session_handle = session_handle (s);
@@ -1204,11 +1209,10 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
   for (i = 0; i < vec_len (smm->wrk); i++)
     {
       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
-      u32 notif_q_size = clib_max (16, evt_q_length >> 4);
       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
        {evt_q_length, evt_size, 0}
        ,
-       {notif_q_size, 256, 0}
+       {evt_q_length << 1, 256, 0}
       };
       cfg->consumer_pid = 0;
       cfg->n_rings = 2;
@@ -1366,6 +1370,7 @@ session_manager_main_enable (vlib_main_t * vm)
       _vec_len (wrk->postponed_event_vector) = 0;
 
       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
+      wrk->dispatch_period = 500e-6;
 
       if (num_threads > 1)
        clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);