api: multiple connections per process
[vpp.git] / src / vnet / session / session.c
index cc9f4a8..3369106 100644 (file)
@@ -36,7 +36,8 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
   mq = session_main_get_vpp_event_queue (thread_index);
   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
     return -1;
-  if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+  if (PREDICT_FALSE (svm_msg_q_is_full (mq)
+                    || svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
     {
       svm_msg_q_unlock (mq);
       return -2;
@@ -125,7 +126,7 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
 
   s = session_get (tc->s_index, tc->thread_index);
   ASSERT (s->thread_index == vlib_get_thread_index ());
-  ASSERT (s->session_state < SESSION_STATE_TRANSPORT_DELETED);
+  ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
   if (!(s->flags & SESSION_F_CUSTOM_TX))
     {
       s->flags |= SESSION_F_CUSTOM_TX;
@@ -205,6 +206,32 @@ session_free (session_t * s)
   pool_put (session_main.wrk[s->thread_index].sessions, s);
 }
 
+u8
+session_is_valid (u32 si, u8 thread_index)
+{
+  session_t *s;
+  transport_connection_t *tc;
+
+  s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
+
+  if (!s)
+    return 1;
+
+  if (s->thread_index != thread_index || s->session_index != si)
+    return 0;
+
+  if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
+      || s->session_state <= SESSION_STATE_LISTENING)
+    return 1;
+
+  tc = session_get_transport (s);
+  if (s->connection_index != tc->c_index
+      || s->thread_index != tc->thread_index || tc->s_index != si)
+    return 0;
+
+  return 1;
+}
+
 static void
 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
 {
@@ -799,6 +826,7 @@ session_dgram_connect_notify (transport_connection_t * tc,
   new_s->rx_fifo->master_session_index = new_s->session_index;
   new_s->rx_fifo->master_thread_index = new_s->thread_index;
   new_s->session_state = SESSION_STATE_READY;
+  new_s->flags |= SESSION_F_IS_MIGRATING;
   session_lookup_add_connection (tc, session_handle (new_s));
 
   /*
@@ -863,7 +891,8 @@ session_transport_delete_notify (transport_connection_t * tc)
       /* Session was created but accept notification was not yet sent to the
        * app. Cleanup everything. */
       session_lookup_del_session (s);
-      session_free_w_fifos (s);
+      segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
+      session_free (s);
       break;
     case SESSION_STATE_ACCEPTING:
     case SESSION_STATE_TRANSPORT_CLOSING:
@@ -886,7 +915,7 @@ session_transport_delete_notify (transport_connection_t * tc)
        * session is just removed because both transport and app have
        * confirmed the close*/
       session_lookup_del_session (s);
-      s->session_state = SESSION_STATE_CLOSED;
+      s->session_state = SESSION_STATE_TRANSPORT_DELETED;
       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
       svm_fifo_dequeue_drop_all (s->tx_fifo);
       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
@@ -1279,8 +1308,6 @@ session_transport_reset (session_t * s)
 void
 session_transport_cleanup (session_t * s)
 {
-  s->session_state = SESSION_STATE_CLOSED;
-
   /* Delete from main lookup table before we axe the the transport */
   session_lookup_del_session (s);
   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
@@ -1288,7 +1315,8 @@ session_transport_cleanup (session_t * s)
                       s->thread_index);
   /* Since we called cleanup, no delete notification will come. So, make
    * sure the session is properly freed. */
-  session_free_w_fifos (s);
+  segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
+  session_free (s);
 }
 
 /**
@@ -1305,7 +1333,7 @@ session_vpp_event_queues_allocate (session_main_t * smm)
 {
   u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
   ssvm_private_t *eqs = &smm->evt_qs_segment;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   uword eqs_size = 64 << 20;
   pid_t vpp_pid = getpid ();
   void *oldheap;
@@ -1494,6 +1522,7 @@ session_manager_main_enable (vlib_main_t * vm)
       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->vm = vlib_mains[i];
       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
+      wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
 
       if (num_threads > 1)
        clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
@@ -1537,7 +1566,6 @@ session_manager_main_enable (vlib_main_t * vm)
 
   /* Enable transports */
   transport_enable_disable (vm, 1);
-  transport_init_tx_pacers_period ();
   return 0;
 }
 
@@ -1609,10 +1637,25 @@ session_manager_main_init (vlib_main_t * vm)
   smm->evt_qs_segment_size = 1 << 20;
 #endif
   smm->is_enabled = 0;
+  smm->session_enable_asap = 0;
+  return 0;
+}
+
+static clib_error_t *
+session_main_init (vlib_main_t * vm)
+{
+  session_main_t *smm = &session_main;
+  if (smm->session_enable_asap)
+    {
+      vlib_worker_thread_barrier_sync (vm);
+      vnet_session_enable_disable (vm, 1 /* is_en */ );
+      vlib_worker_thread_barrier_release (vm);
+    }
   return 0;
 }
 
 VLIB_INIT_FUNCTION (session_manager_main_init);
+VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_init);
 
 static clib_error_t *
 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
@@ -1694,7 +1737,7 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input)
                         &smm->evt_qs_segment_size))
        ;
       else if (unformat (input, "enable"))
-       vnet_session_enable_disable (vm, 1 /* is_en */ );
+       smm->session_enable_asap = 1;
       else
        return clib_error_return (0, "unknown input `%U'",
                                  format_unformat_error, input);