session: use safe realloc for pools
[vpp.git] / src / vnet / session / session.c
index f063548..f1d1a4e 100644 (file)
@@ -197,29 +197,31 @@ session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
     session_send_ctrl_evt_to_thread (s, evt);
 }
 
+static void
+session_pool_realloc_rpc (void *rpc_args)
+{
+  session_worker_t *wrk;
+  u32 thread_index;
+
+  thread_index = pointer_to_uword (rpc_args);
+  wrk = &session_main.wrk[thread_index];
+
+  pool_realloc_safe_aligned (wrk->sessions, CLIB_CACHE_LINE_BYTES);
+}
+
 session_t *
 session_alloc (u32 thread_index)
 {
   session_worker_t *wrk = &session_main.wrk[thread_index];
   session_t *s;
-  u8 will_expand = 0;
-  pool_get_aligned_will_expand (wrk->sessions, will_expand,
-                               CLIB_CACHE_LINE_BYTES);
-  /* If we have peekers, let them finish */
-  if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
-    {
-      clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
-      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
-      clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
-    }
-  else
-    {
-      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
-    }
+
+  pool_get_aligned_safe (wrk->sessions, s, thread_index,
+                        session_pool_realloc_rpc, CLIB_CACHE_LINE_BYTES);
   clib_memset (s, 0, sizeof (*s));
   s->session_index = s - wrk->sessions;
   s->thread_index = thread_index;
   s->app_index = APP_INVALID_INDEX;
+
   return s;
 }
 
@@ -1099,6 +1101,15 @@ session_transport_closing_notify (transport_connection_t * tc)
   s = session_get (tc->s_index, tc->thread_index);
   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
     return;
+
+  /* Wait for reply from app before sending notification as the
+   * accept might be rejected */
+  if (s->session_state == SESSION_STATE_ACCEPTING)
+    {
+      s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
+      return;
+    }
+
   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
   app_wrk = app_worker_get (s->app_wrk_index);
   app_worker_close_notify (app_wrk, s);
@@ -1225,6 +1236,11 @@ session_transport_reset_notify (transport_connection_t * tc)
   svm_fifo_dequeue_drop_all (s->tx_fifo);
   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
     return;
+  if (s->session_state == SESSION_STATE_ACCEPTING)
+    {
+      s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
+      return;
+    }
   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
   app_wrk = app_worker_get (s->app_wrk_index);
   app_worker_reset_notify (app_wrk, s);
@@ -1454,12 +1470,12 @@ session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
 int
 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
 {
-  transport_endpoint_t *tep;
+  transport_endpoint_cfg_t *tep;
   int tc_index;
   u32 s_index;
 
   /* Transport bind/listen */
-  tep = session_endpoint_to_transport (sep);
+  tep = session_endpoint_to_transport_cfg (sep);
   s_index = ls->session_index;
   tc_index = transport_start_listen (session_get_transport_proto (ls),
                                     s_index, tep);
@@ -1538,6 +1554,8 @@ session_close (session_t * s)
       return;
     }
 
+  /* App closed so stop propagating dequeue notifications */
+  svm_fifo_clear_deq_ntf (s->tx_fifo);
   s->session_state = SESSION_STATE_CLOSING;
   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
 }
@@ -1752,6 +1770,39 @@ session_register_transport (transport_proto_t transport_proto,
     session_tx_fns[vft->transport_options.tx_type];
 }
 
+void
+session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
+{
+  session_main_t *smm = &session_main;
+  session_update_time_fn *fi;
+  u32 fi_pos = ~0;
+  u8 found = 0;
+
+  vec_foreach (fi, smm->update_time_fns)
+    {
+      if (*fi == fn)
+       {
+         fi_pos = fi - smm->update_time_fns;
+         found = 1;
+         break;
+       }
+    }
+
+  if (is_add)
+    {
+      if (found)
+       {
+         clib_warning ("update time fn %p already registered", fn);
+         return;
+       }
+      vec_add1 (smm->update_time_fns, fn);
+    }
+  else
+    {
+      vec_del1 (smm->update_time_fns, fi_pos);
+    }
+}
+
 transport_proto_t
 session_add_transport_proto (void)
 {
@@ -1839,6 +1890,7 @@ session_manager_main_enable (vlib_main_t * vm)
 
   /* Allocate cache line aligned worker contexts */
   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
+  clib_spinlock_init (&session_main.pool_realloc_lock);
 
   for (i = 0; i < num_threads; i++)
     {
@@ -1847,15 +1899,14 @@ session_manager_main_enable (vlib_main_t * vm)
       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
+      wrk->evts_pending_main =
+       clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->vm = vlib_get_main_by_index (i);
       wrk->last_vlib_time = vlib_time_now (vm);
       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
       wrk->timerfd = -1;
       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
 
-      if (num_threads > 1)
-       clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
-
       if (!smm->no_adaptive && smm->use_private_rx_mqs)
        session_wrk_enable_adaptive_mode (wrk);
     }
@@ -1984,7 +2035,7 @@ session_main_init (vlib_main_t * vm)
   smm->poll_main = 0;
   smm->use_private_rx_mqs = 0;
   smm->no_adaptive = 0;
-  smm->last_transport_proto_type = TRANSPORT_PROTO_SRTP;
+  smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
 
   return 0;
 }