session: add wrk context
[vpp.git] / src / vnet / session / session.c
index 9c246a1..1802c0e 100644 (file)
@@ -121,26 +121,24 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
 stream_session_t *
 session_alloc (u32 thread_index)
 {
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index];
   stream_session_t *s;
   u8 will_expand = 0;
-  pool_get_aligned_will_expand (smm->sessions[thread_index], will_expand,
+  pool_get_aligned_will_expand (wrk->sessions, will_expand,
                                CLIB_CACHE_LINE_BYTES);
   /* If we have peekers, let them finish */
   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
     {
-      clib_rwlock_writer_lock (&smm->peekers_rw_locks[thread_index]);
-      pool_get_aligned (session_manager_main.sessions[thread_index], s,
-                       CLIB_CACHE_LINE_BYTES);
-      clib_rwlock_writer_unlock (&smm->peekers_rw_locks[thread_index]);
+      clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
+      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
+      clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
     }
   else
     {
-      pool_get_aligned (session_manager_main.sessions[thread_index], s,
-                       CLIB_CACHE_LINE_BYTES);
+      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
     }
   clib_memset (s, 0, sizeof (*s));
-  s->session_index = s - session_manager_main.sessions[thread_index];
+  s->session_index = s - wrk->sessions;
   s->thread_index = thread_index;
   return s;
 }
@@ -148,7 +146,7 @@ session_alloc (u32 thread_index)
 void
 session_free (stream_session_t * s)
 {
-  pool_put (session_manager_main.sessions[s->thread_index], s);
+  pool_put (session_manager_main.wrk[s->thread_index].sessions, s);
   if (CLIB_DEBUG)
     clib_memset (s, 0xFA, sizeof (*s));
 }
@@ -391,22 +389,19 @@ session_enqueue_stream_connection (transport_connection_t * tc,
     {
       /* Queue RX event on this fifo. Eventually these will need to be flushed
        * by calling stream_server_flush_enqueue_events () */
-      session_manager_main_t *smm = vnet_get_session_manager_main ();
-      u32 thread_index = s->thread_index;
-      u64 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index];
+      session_manager_worker_t *wrk;
 
-      if (s->enqueue_epoch != enqueue_epoch)
+      wrk = session_manager_get_worker (s->thread_index);
+      if (s->enqueue_epoch != wrk->current_enqueue_epoch[tc->proto])
        {
-         s->enqueue_epoch = enqueue_epoch;
-         vec_add1 (smm->session_to_enqueue[tc->proto][thread_index],
-                   s->session_index);
+         s->enqueue_epoch = wrk->current_enqueue_epoch[tc->proto];
+         vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
        }
     }
 
   return enqueued;
 }
 
-
 int
 session_enqueue_dgram_connection (stream_session_t * s,
                                  session_dgram_hdr_t * hdr,
@@ -432,15 +427,13 @@ session_enqueue_dgram_connection (stream_session_t * s,
     {
       /* Queue RX event on this fifo. Eventually these will need to be flushed
        * by calling stream_server_flush_enqueue_events () */
-      session_manager_main_t *smm = vnet_get_session_manager_main ();
-      u32 thread_index = s->thread_index;
-      u64 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index];
+      session_manager_worker_t *wrk;
 
-      if (s->enqueue_epoch != enqueue_epoch)
+      wrk = session_manager_get_worker (s->thread_index);
+      if (s->enqueue_epoch != wrk->current_enqueue_epoch[proto])
        {
-         s->enqueue_epoch = enqueue_epoch;
-         vec_add1 (smm->session_to_enqueue[proto][thread_index],
-                   s->session_index);
+         s->enqueue_epoch = wrk->current_enqueue_epoch[proto];
+         vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
        }
     }
   return enqueued;
@@ -539,12 +532,12 @@ session_dequeue_notify (stream_session_t * s)
 int
 session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
 {
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk = session_manager_get_worker (thread_index);
   stream_session_t *s;
   int i, errors = 0;
   u32 *indices;
 
-  indices = smm->session_to_enqueue[transport_proto][thread_index];
+  indices = wrk->session_to_enqueue[transport_proto];
 
   for (i = 0; i < vec_len (indices); i++)
     {
@@ -559,8 +552,8 @@ session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
     }
 
   vec_reset_length (indices);
-  smm->session_to_enqueue[transport_proto][thread_index] = indices;
-  smm->current_enqueue_epoch[transport_proto][thread_index]++;
+  wrk->session_to_enqueue[transport_proto] = indices;
+  wrk->current_enqueue_epoch[transport_proto]++;
 
   return errors;
 }
@@ -1068,7 +1061,7 @@ void
 stream_session_disconnect (stream_session_t * s)
 {
   u32 thread_index = vlib_get_thread_index ();
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk;
   session_event_t *evt;
 
   if (!s)
@@ -1088,7 +1081,8 @@ stream_session_disconnect (stream_session_t * s)
    * held, just append a new event to pending disconnects vector. */
   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
     {
-      vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+      wrk = session_manager_get_worker (thread_index);
+      vec_add2 (wrk->pending_disconnects, evt, 1);
       clib_memset (evt, 0, sizeof (*evt));
       evt->session_handle = session_handle (s);
       evt->event_type = FIFO_EVENT_DISCONNECT;
@@ -1207,7 +1201,7 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
   else
     oldheap = svm_push_data_heap (am->vlib_rp);
 
-  for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
+  for (i = 0; i < vec_len (smm->wrk); i++)
     {
       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
       u32 notif_q_size = clib_max (16, evt_q_length >> 4);
@@ -1220,10 +1214,10 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
       cfg->n_rings = 2;
       cfg->q_nitems = evt_q_length;
       cfg->ring_cfgs = rc;
-      smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
+      smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
       if (smm->evt_qs_use_memfd_seg)
        {
-         if (svm_msg_q_alloc_consumer_eventfd (smm->vpp_event_queues[i]))
+         if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
            clib_warning ("eventfd returned");
        }
     }
@@ -1342,6 +1336,7 @@ session_manager_main_enable (vlib_main_t * vm)
   session_manager_main_t *smm = &session_manager_main;
   vlib_thread_main_t *vtm = vlib_get_thread_main ();
   u32 num_threads, preallocated_sessions_per_worker;
+  session_manager_worker_t *wrk;
   int i, j;
 
   num_threads = 1 /* main thread */  + vtm->n_threads;
@@ -1350,38 +1345,28 @@ session_manager_main_enable (vlib_main_t * vm)
     return clib_error_return (0, "n_thread_stacks not set");
 
   /* configure per-thread ** vectors */
-  vec_validate (smm->sessions, num_threads - 1);
-  vec_validate (smm->tx_buffers, num_threads - 1);
-  vec_validate (smm->pending_event_vector, num_threads - 1);
-  vec_validate (smm->pending_disconnects, num_threads - 1);
-  vec_validate (smm->free_event_vector, num_threads - 1);
-  vec_validate (smm->vpp_event_queues, num_threads - 1);
-  vec_validate (smm->peekers_rw_locks, num_threads - 1);
-  vec_validate (smm->dispatch_period, num_threads - 1);
-  vec_validate (smm->last_vlib_time, num_threads - 1);
-  vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES);
+  vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
 
   for (i = 0; i < TRANSPORT_N_PROTO; i++)
     {
-      vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
-      vec_validate (smm->session_to_enqueue[i], num_threads - 1);
       for (j = 0; j < num_threads; j++)
-       smm->current_enqueue_epoch[i][j] = 1;
+       smm->wrk[j].current_enqueue_epoch[i] = 1;
     }
 
   for (i = 0; i < num_threads; i++)
     {
-      vec_validate (smm->free_event_vector[i], 0);
-      _vec_len (smm->free_event_vector[i]) = 0;
-      vec_validate (smm->pending_event_vector[i], 0);
-      _vec_len (smm->pending_event_vector[i]) = 0;
-      vec_validate (smm->pending_disconnects[i], 0);
-      _vec_len (smm->pending_disconnects[i]) = 0;
+      wrk = &smm->wrk[i];
+      vec_validate (wrk->free_event_vector, 0);
+      _vec_len (wrk->free_event_vector) = 0;
+      vec_validate (wrk->pending_event_vector, 0);
+      _vec_len (wrk->pending_event_vector) = 0;
+      vec_validate (wrk->pending_disconnects, 0);
+      _vec_len (wrk->pending_disconnects) = 0;
 
-      smm->last_vlib_time[i] = vlib_time_now (vlib_mains[i]);
+      wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
 
       if (num_threads > 1)
-       clib_rwlock_init (&smm->peekers_rw_locks[i]);
+       clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
     }
 
 #if SESSION_DEBUG
@@ -1401,7 +1386,7 @@ session_manager_main_enable (vlib_main_t * vm)
     {
       if (num_threads == 1)
        {
-         pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
+         pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
        }
       else
        {
@@ -1412,7 +1397,7 @@ session_manager_main_enable (vlib_main_t * vm)
 
          for (j = 1; j < num_threads; j++)
            {
-             pool_init_fixed (smm->sessions[j],
+             pool_init_fixed (smm->wrk[j].sessions,
                               preallocated_sessions_per_worker);
            }
        }