session: add wrk context 16/15616/5
authorFlorin Coras <fcoras@cisco.com>
Tue, 30 Oct 2018 19:01:48 +0000 (12:01 -0700)
committerMarco Varlese <marco.varlese@suse.de>
Wed, 31 Oct 2018 08:17:33 +0000 (08:17 +0000)
Change-Id: I66ca0ddea872948507d078e405eb90f9f3a0e897
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/plugins/unittest/tcp_test.c
src/vnet/session-apps/echo_server.c
src/vnet/session/application_interface.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_cli.c
src/vnet/session/session_node.c

index 3bde805..0bc1153 100644 (file)
@@ -1646,9 +1646,9 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Allocate fake session and connection 1
    */
-  pool_get (smm->sessions[0], s);
+  pool_get (smm->wrk[0].sessions, s);
   clib_memset (s, 0, sizeof (*s));
-  s->session_index = sidx = s - smm->sessions[0];
+  s->session_index = sidx = s - smm->wrk[0].sessions;
 
   pool_get (tm->connections[0], tc);
   clib_memset (tc, 0, sizeof (*tc));
@@ -1667,9 +1667,9 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Allocate fake session and connection 2
    */
-  pool_get (session_manager_main.sessions[0], s);
+  pool_get (smm->wrk[0].sessions, s);
   clib_memset (s, 0, sizeof (*s));
-  s->session_index = s - smm->sessions[0];
+  s->session_index = s - smm->wrk[0].sessions;
 
   pool_get (tm->connections[0], tc);
   clib_memset (tc, 0, sizeof (*tc));
@@ -1689,7 +1689,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
    * Confirm that connection lookup works
    */
 
-  s1 = pool_elt_at_index (smm->sessions[0], sidx);
+  s1 = pool_elt_at_index (smm->wrk[0].sessions, sidx);
   session_lookup_add_connection (tc1, session_handle (s1));
   tconn = session_lookup_connection_wt4 (0, &tc1->lcl_ip.ip4,
                                         &tc1->rmt_ip.ip4,
index 3ee33ea..7771075 100644 (file)
@@ -389,7 +389,7 @@ echo_server_create (vlib_main_t * vm, u8 * appns_id, u64 appns_flags,
   vec_validate (esm->rx_retries, num_threads - 1);
   for (i = 0; i < vec_len (esm->rx_retries); i++)
     vec_validate (esm->rx_retries[i],
-                 pool_elts (session_manager_main.sessions[i]));
+                 pool_elts (session_manager_main.wrk[i].sessions));
   esm->rcv_buffer_size = clib_max (esm->rcv_buffer_size, esm->fifo_size);
   for (i = 0; i < num_threads; i++)
     vec_validate (esm->rx_buf[i], esm->rcv_buffer_size);
index 1f5c6ff..89f7462 100644 (file)
@@ -125,10 +125,10 @@ api_parse_session_handle (u64 handle, u32 * session_index, u32 * thread_index)
   *thread_index = handle & 0xFFFFFFFF;
   *session_index = handle >> 32;
 
-  if (*thread_index >= vec_len (smm->sessions))
+  if (*thread_index >= vec_len (smm->wrk))
     return VNET_API_ERROR_INVALID_VALUE;
 
-  pool = smm->sessions[*thread_index];
+  pool = smm->wrk[*thread_index].sessions;
 
   if (pool_is_free_index (pool, *session_index))
     return VNET_API_ERROR_INVALID_VALUE_2;
index 9c246a1..1802c0e 100644 (file)
@@ -121,26 +121,24 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
 stream_session_t *
 session_alloc (u32 thread_index)
 {
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index];
   stream_session_t *s;
   u8 will_expand = 0;
-  pool_get_aligned_will_expand (smm->sessions[thread_index], will_expand,
+  pool_get_aligned_will_expand (wrk->sessions, will_expand,
                                CLIB_CACHE_LINE_BYTES);
   /* If we have peekers, let them finish */
   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
     {
-      clib_rwlock_writer_lock (&smm->peekers_rw_locks[thread_index]);
-      pool_get_aligned (session_manager_main.sessions[thread_index], s,
-                       CLIB_CACHE_LINE_BYTES);
-      clib_rwlock_writer_unlock (&smm->peekers_rw_locks[thread_index]);
+      clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
+      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
+      clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
     }
   else
     {
-      pool_get_aligned (session_manager_main.sessions[thread_index], s,
-                       CLIB_CACHE_LINE_BYTES);
+      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
     }
   clib_memset (s, 0, sizeof (*s));
-  s->session_index = s - session_manager_main.sessions[thread_index];
+  s->session_index = s - wrk->sessions;
   s->thread_index = thread_index;
   return s;
 }
@@ -148,7 +146,7 @@ session_alloc (u32 thread_index)
 void
 session_free (stream_session_t * s)
 {
-  pool_put (session_manager_main.sessions[s->thread_index], s);
+  pool_put (session_manager_main.wrk[s->thread_index].sessions, s);
   if (CLIB_DEBUG)
     clib_memset (s, 0xFA, sizeof (*s));
 }
@@ -391,22 +389,19 @@ session_enqueue_stream_connection (transport_connection_t * tc,
     {
       /* Queue RX event on this fifo. Eventually these will need to be flushed
        * by calling stream_server_flush_enqueue_events () */
-      session_manager_main_t *smm = vnet_get_session_manager_main ();
-      u32 thread_index = s->thread_index;
-      u64 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index];
+      session_manager_worker_t *wrk;
 
-      if (s->enqueue_epoch != enqueue_epoch)
+      wrk = session_manager_get_worker (s->thread_index);
+      if (s->enqueue_epoch != wrk->current_enqueue_epoch[tc->proto])
        {
-         s->enqueue_epoch = enqueue_epoch;
-         vec_add1 (smm->session_to_enqueue[tc->proto][thread_index],
-                   s->session_index);
+         s->enqueue_epoch = wrk->current_enqueue_epoch[tc->proto];
+         vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
        }
     }
 
   return enqueued;
 }
 
-
 int
 session_enqueue_dgram_connection (stream_session_t * s,
                                  session_dgram_hdr_t * hdr,
@@ -432,15 +427,13 @@ session_enqueue_dgram_connection (stream_session_t * s,
     {
       /* Queue RX event on this fifo. Eventually these will need to be flushed
        * by calling stream_server_flush_enqueue_events () */
-      session_manager_main_t *smm = vnet_get_session_manager_main ();
-      u32 thread_index = s->thread_index;
-      u64 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index];
+      session_manager_worker_t *wrk;
 
-      if (s->enqueue_epoch != enqueue_epoch)
+      wrk = session_manager_get_worker (s->thread_index);
+      if (s->enqueue_epoch != wrk->current_enqueue_epoch[proto])
        {
-         s->enqueue_epoch = enqueue_epoch;
-         vec_add1 (smm->session_to_enqueue[proto][thread_index],
-                   s->session_index);
+         s->enqueue_epoch = wrk->current_enqueue_epoch[proto];
+         vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
        }
     }
   return enqueued;
@@ -539,12 +532,12 @@ session_dequeue_notify (stream_session_t * s)
 int
 session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
 {
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk = session_manager_get_worker (thread_index);
   stream_session_t *s;
   int i, errors = 0;
   u32 *indices;
 
-  indices = smm->session_to_enqueue[transport_proto][thread_index];
+  indices = wrk->session_to_enqueue[transport_proto];
 
   for (i = 0; i < vec_len (indices); i++)
     {
@@ -559,8 +552,8 @@ session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
     }
 
   vec_reset_length (indices);
-  smm->session_to_enqueue[transport_proto][thread_index] = indices;
-  smm->current_enqueue_epoch[transport_proto][thread_index]++;
+  wrk->session_to_enqueue[transport_proto] = indices;
+  wrk->current_enqueue_epoch[transport_proto]++;
 
   return errors;
 }
@@ -1068,7 +1061,7 @@ void
 stream_session_disconnect (stream_session_t * s)
 {
   u32 thread_index = vlib_get_thread_index ();
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk;
   session_event_t *evt;
 
   if (!s)
@@ -1088,7 +1081,8 @@ stream_session_disconnect (stream_session_t * s)
    * held, just append a new event to pending disconnects vector. */
   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
     {
-      vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+      wrk = session_manager_get_worker (thread_index);
+      vec_add2 (wrk->pending_disconnects, evt, 1);
       clib_memset (evt, 0, sizeof (*evt));
       evt->session_handle = session_handle (s);
       evt->event_type = FIFO_EVENT_DISCONNECT;
@@ -1207,7 +1201,7 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
   else
     oldheap = svm_push_data_heap (am->vlib_rp);
 
-  for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
+  for (i = 0; i < vec_len (smm->wrk); i++)
     {
       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
       u32 notif_q_size = clib_max (16, evt_q_length >> 4);
@@ -1220,10 +1214,10 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
       cfg->n_rings = 2;
       cfg->q_nitems = evt_q_length;
       cfg->ring_cfgs = rc;
-      smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
+      smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
       if (smm->evt_qs_use_memfd_seg)
        {
-         if (svm_msg_q_alloc_consumer_eventfd (smm->vpp_event_queues[i]))
+         if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
            clib_warning ("eventfd returned");
        }
     }
@@ -1342,6 +1336,7 @@ session_manager_main_enable (vlib_main_t * vm)
   session_manager_main_t *smm = &session_manager_main;
   vlib_thread_main_t *vtm = vlib_get_thread_main ();
   u32 num_threads, preallocated_sessions_per_worker;
+  session_manager_worker_t *wrk;
   int i, j;
 
   num_threads = 1 /* main thread */  + vtm->n_threads;
@@ -1350,38 +1345,28 @@ session_manager_main_enable (vlib_main_t * vm)
     return clib_error_return (0, "n_thread_stacks not set");
 
   /* configure per-thread ** vectors */
-  vec_validate (smm->sessions, num_threads - 1);
-  vec_validate (smm->tx_buffers, num_threads - 1);
-  vec_validate (smm->pending_event_vector, num_threads - 1);
-  vec_validate (smm->pending_disconnects, num_threads - 1);
-  vec_validate (smm->free_event_vector, num_threads - 1);
-  vec_validate (smm->vpp_event_queues, num_threads - 1);
-  vec_validate (smm->peekers_rw_locks, num_threads - 1);
-  vec_validate (smm->dispatch_period, num_threads - 1);
-  vec_validate (smm->last_vlib_time, num_threads - 1);
-  vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES);
+  vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
 
   for (i = 0; i < TRANSPORT_N_PROTO; i++)
     {
-      vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
-      vec_validate (smm->session_to_enqueue[i], num_threads - 1);
       for (j = 0; j < num_threads; j++)
-       smm->current_enqueue_epoch[i][j] = 1;
+       smm->wrk[j].current_enqueue_epoch[i] = 1;
     }
 
   for (i = 0; i < num_threads; i++)
     {
-      vec_validate (smm->free_event_vector[i], 0);
-      _vec_len (smm->free_event_vector[i]) = 0;
-      vec_validate (smm->pending_event_vector[i], 0);
-      _vec_len (smm->pending_event_vector[i]) = 0;
-      vec_validate (smm->pending_disconnects[i], 0);
-      _vec_len (smm->pending_disconnects[i]) = 0;
+      wrk = &smm->wrk[i];
+      vec_validate (wrk->free_event_vector, 0);
+      _vec_len (wrk->free_event_vector) = 0;
+      vec_validate (wrk->pending_event_vector, 0);
+      _vec_len (wrk->pending_event_vector) = 0;
+      vec_validate (wrk->pending_disconnects, 0);
+      _vec_len (wrk->pending_disconnects) = 0;
 
-      smm->last_vlib_time[i] = vlib_time_now (vlib_mains[i]);
+      wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
 
       if (num_threads > 1)
-       clib_rwlock_init (&smm->peekers_rw_locks[i]);
+       clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
     }
 
 #if SESSION_DEBUG
@@ -1401,7 +1386,7 @@ session_manager_main_enable (vlib_main_t * vm)
     {
       if (num_threads == 1)
        {
-         pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
+         pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
        }
       else
        {
@@ -1412,7 +1397,7 @@ session_manager_main_enable (vlib_main_t * vm)
 
          for (j = 1; j < num_threads; j++)
            {
-             pool_init_fixed (smm->sessions[j],
+             pool_init_fixed (smm->wrk[j].sessions,
                               preallocated_sessions_per_worker);
            }
        }
index 4d46596..a989861 100644 (file)
@@ -186,43 +186,52 @@ extern session_fifo_rx_fn session_tx_fifo_dequeue_internal;
 
 u8 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e);
 
-struct _session_manager_main
+typedef struct session_manager_worker_
 {
-  /** Per worker thread session pools */
-  stream_session_t **sessions;
+  CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
 
-  /** Per worker-thread session pool peekers rw locks */
-  clib_rwlock_t *peekers_rw_locks;
+  /** Worker session pool */
+  stream_session_t *sessions;
 
-  /** Per-proto, per-worker enqueue epoch counters */
-  u64 *current_enqueue_epoch[TRANSPORT_N_PROTO];
+  /** vpp event message queue for worker */
+  svm_msg_q_t *vpp_event_queue;
 
-  /** Per-proto, per-worker thread vector of sessions to enqueue */
-  u32 **session_to_enqueue[TRANSPORT_N_PROTO];
+  /** Our approximation of a "complete" dispatch loop period */
+  f64 dispatch_period;
 
-  /** per-worker tx buffer free lists */
-  u32 **tx_buffers;
+  /** vlib_time_now last time around the track */
+  f64 last_vlib_time;
 
-  /** Per worker-thread vector of partially read events */
-  session_event_t **free_event_vector;
+  /** Per-proto enqueue epoch counters */
+  u64 current_enqueue_epoch[TRANSPORT_N_PROTO];
 
-  /** per-worker active event vectors */
-  session_event_t **pending_event_vector;
+  /** Per-proto vector of sessions to enqueue */
+  u32 *session_to_enqueue[TRANSPORT_N_PROTO];
 
-  /** per-worker postponed disconnects */
-  session_event_t **pending_disconnects;
+  /** Context for session tx */
+  session_tx_context_t ctx;
 
-  /** per-worker session context */
-  session_tx_context_t *ctx;
+  /** Vector of tx buffer free lists */
+  u32 *tx_buffers;
 
-  /** Our approximation of a "complete" dispatch loop period */
-  f64 *dispatch_period;
+  /** Vector of partially read events */
+  session_event_t *free_event_vector;
 
-  /** vlib_time_now last time around the track */
-  f64 *last_vlib_time;
+  /** Vector of active event vectors */
+  session_event_t *pending_event_vector;
+
+  /** Vector of postponed disconnects */
+  session_event_t *pending_disconnects;
 
-  /** vpp fifo event queue */
-  svm_msg_q_t **vpp_event_queues;
+  /** Peekers rw lock */
+  clib_rwlock_t peekers_rw_locks;
+
+} session_manager_worker_t;
+
+struct _session_manager_main
+{
+  /** Worker contexts */
+  session_manager_worker_t *wrk;
 
   /** Event queues memfd segment initialized only if so configured */
   ssvm_private_t evt_qs_segment;
@@ -238,16 +247,16 @@ struct _session_manager_main
    * Trade memory for speed, for now */
   u32 *session_type_to_next;
 
+  /*
+   * Config parameters
+   */
+
   /** Session manager is enabled */
   u8 is_enabled;
 
   /** vpp fifo event queue configured length */
   u32 configured_event_queue_length;
 
-  /*
-   * Config parameters
-   */
-
   /** Session ssvm segment configs*/
   uword session_baseva;
   uword session_va_space_size;
@@ -297,11 +306,17 @@ vnet_get_session_manager_main ()
   return &session_manager_main;
 }
 
+always_inline session_manager_worker_t *
+session_manager_get_worker (u32 thread_index)
+{
+  return &session_manager_main.wrk[thread_index];
+}
+
 always_inline u8
 stream_session_is_valid (u32 si, u8 thread_index)
 {
   stream_session_t *s;
-  s = pool_elt_at_index (session_manager_main.sessions[thread_index], si);
+  s = pool_elt_at_index (session_manager_main.wrk[thread_index].sessions, si);
   if (s->thread_index != thread_index || s->session_index != si
       /* || s->server_rx_fifo->master_session_index != si
          || s->server_tx_fifo->master_session_index != si
@@ -320,20 +335,23 @@ always_inline stream_session_t *
 session_get (u32 si, u32 thread_index)
 {
   ASSERT (stream_session_is_valid (si, thread_index));
-  return pool_elt_at_index (session_manager_main.sessions[thread_index], si);
+  return pool_elt_at_index (session_manager_main.wrk[thread_index].sessions,
+                           si);
 }
 
 always_inline stream_session_t *
 session_get_if_valid (u64 si, u32 thread_index)
 {
-  if (thread_index >= vec_len (session_manager_main.sessions))
+  if (thread_index >= vec_len (session_manager_main.wrk))
     return 0;
 
-  if (pool_is_free_index (session_manager_main.sessions[thread_index], si))
+  if (pool_is_free_index (session_manager_main.wrk[thread_index].sessions,
+                         si))
     return 0;
 
   ASSERT (stream_session_is_valid (si, thread_index));
-  return pool_elt_at_index (session_manager_main.sessions[thread_index], si);
+  return pool_elt_at_index (session_manager_main.wrk[thread_index].sessions,
+                           si);
 }
 
 always_inline session_handle_t
@@ -368,7 +386,7 @@ session_get_from_handle (session_handle_t handle)
   session_manager_main_t *smm = &session_manager_main;
   u32 session_index, thread_index;
   session_parse_handle (handle, &session_index, &thread_index);
-  return pool_elt_at_index (smm->sessions[thread_index], session_index);
+  return pool_elt_at_index (smm->wrk[thread_index].sessions, session_index);
 }
 
 always_inline stream_session_t *
@@ -441,19 +459,19 @@ u8 session_tx_is_dgram (stream_session_t * s);
 always_inline void
 session_pool_add_peeker (u32 thread_index)
 {
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index];
   if (thread_index == vlib_get_thread_index ())
     return;
-  clib_rwlock_reader_lock (&smm->peekers_rw_locks[thread_index]);
+  clib_rwlock_reader_lock (&wrk->peekers_rw_locks);
 }
 
 always_inline void
 session_pool_remove_peeker (u32 thread_index)
 {
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index];
   if (thread_index == vlib_get_thread_index ())
     return;
-  clib_rwlock_reader_unlock (&smm->peekers_rw_locks[thread_index]);
+  clib_rwlock_reader_unlock (&wrk->peekers_rw_locks);
 }
 
 /**
@@ -464,18 +482,19 @@ session_pool_remove_peeker (u32 thread_index)
 always_inline stream_session_t *
 session_get_from_handle_safe (u64 handle)
 {
-  session_manager_main_t *smm = &session_manager_main;
   u32 thread_index = session_thread_from_handle (handle);
+  session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index];
+
   if (thread_index == vlib_get_thread_index ())
     {
-      return pool_elt_at_index (smm->sessions[thread_index],
+      return pool_elt_at_index (wrk->sessions,
                                session_index_from_handle (handle));
     }
   else
     {
       session_pool_add_peeker (thread_index);
       /* Don't use pool_elt_at index. See @ref session_pool_add_peeker */
-      return smm->sessions[thread_index] + session_index_from_handle (handle);
+      return wrk->sessions + session_index_from_handle (handle);
     }
 }
 
@@ -503,19 +522,19 @@ transport_tx_fifo_size (transport_connection_t * tc)
 always_inline f64
 transport_dispatch_period (u32 thread_index)
 {
-  return session_manager_main.dispatch_period[thread_index];
+  return session_manager_main.wrk[thread_index].dispatch_period;
 }
 
 always_inline f64
 transport_time_now (u32 thread_index)
 {
-  return session_manager_main.last_vlib_time[thread_index];
+  return session_manager_main.wrk[thread_index].last_vlib_time;
 }
 
 always_inline u32
 session_get_index (stream_session_t * s)
 {
-  return (s - session_manager_main.sessions[s->thread_index]);
+  return (s - session_manager_main.wrk[s->thread_index].sessions);
 }
 
 always_inline stream_session_t *
@@ -531,7 +550,7 @@ session_clone_safe (u32 session_index, u32 thread_index)
    */
   session_pool_add_peeker (thread_index);
   new_s = session_alloc (current_thread_index);
-  old_s = session_manager_main.sessions[thread_index] + session_index;
+  old_s = session_manager_main.wrk[thread_index].sessions + session_index;
   clib_memcpy (new_s, old_s, sizeof (*new_s));
   session_pool_remove_peeker (thread_index);
   new_s->thread_index = current_thread_index;
@@ -607,7 +626,7 @@ clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
 always_inline svm_msg_q_t *
 session_manager_get_vpp_event_queue (u32 thread_index)
 {
-  return session_manager_main.vpp_event_queues[thread_index];
+  return session_manager_main.wrk[thread_index].vpp_event_queue;
 }
 
 int session_manager_flush_enqueue_events (u8 proto, u32 thread_index);
index 6ca090a..d6350c6 100755 (executable)
@@ -244,7 +244,7 @@ show_session_command_fn (vlib_main_t * vm, unformat_input_t * input,
       sst = session_type_from_proto_and_ip (transport_proto, 1);
       vlib_cli_output (vm, "%-40s%-24s", "Listener", "App");
       /* *INDENT-OFF* */
-      pool_foreach (s, smm->sessions[0], ({
+      pool_foreach (s, smm->wrk[0].sessions, ({
        if (s->session_state != SESSION_STATE_LISTENING
            || s->session_type != sst)
          continue;
@@ -257,10 +257,10 @@ show_session_command_fn (vlib_main_t * vm, unformat_input_t * input,
       return 0;
     }
 
-  for (i = 0; i < vec_len (smm->sessions); i++)
+  for (i = 0; i < vec_len (smm->wrk); i++)
     {
       u32 once_per_pool;
-      pool = smm->sessions[i];
+      pool = smm->wrk[0].sessions;
 
       once_per_pool = 1;
 
@@ -323,8 +323,9 @@ clear_session_command_fn (vlib_main_t * vm, unformat_input_t * input,
 {
   session_manager_main_t *smm = &session_manager_main;
   u32 thread_index = 0, clear_all = 0;
+  session_manager_worker_t *wrk;
   u32 session_index = ~0;
-  stream_session_t **pool, *session;
+  stream_session_t *session;
 
   if (!smm->is_enabled)
     {
@@ -359,9 +360,9 @@ clear_session_command_fn (vlib_main_t * vm, unformat_input_t * input,
   if (clear_all)
     {
       /* *INDENT-OFF* */
-      vec_foreach (pool, smm->sessions)
+      vec_foreach (wrk, smm->wrk)
        {
-         pool_foreach(session, *pool, ({
+         pool_foreach(session, wrk->sessions, ({
            clear_session (session);
          }));
        };
index eb97439..04a5622 100644 (file)
@@ -265,12 +265,13 @@ always_inline void
 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
                            vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
 {
-  session_manager_main_t *smm = &session_manager_main;
   vlib_buffer_t *chain_b, *prev_b;
   u32 chain_bi0, to_deq, left_from_seg;
+  session_manager_worker_t *wrk;
   u16 len_to_deq, n_bytes_read;
   u8 *data, j;
 
+  wrk = session_manager_get_worker (ctx->s->thread_index);
   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
   b->total_length_not_including_first_buffer = 0;
 
@@ -284,8 +285,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
 
       *n_bufs -= 1;
-      chain_bi0 = smm->tx_buffers[ctx->s->thread_index][*n_bufs];
-      _vec_len (smm->tx_buffers[ctx->s->thread_index]) = *n_bufs;
+      chain_bi0 = wrk->tx_buffers[*n_bufs];
+      _vec_len (wrk->tx_buffers) = *n_bufs;
 
       chain_b = vlib_get_buffer (vm, chain_bi0);
       chain_b->current_data = 0;
@@ -342,16 +343,15 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
 
 always_inline int
 session_output_try_get_buffers (vlib_main_t * vm,
-                               session_manager_main_t * smm,
+                               session_manager_worker_t * wrk,
                                u32 thread_index, u16 * n_bufs, u32 wanted)
 {
   u32 n_alloc;
-  vec_validate_aligned (smm->tx_buffers[thread_index], wanted - 1,
-                       CLIB_CACHE_LINE_BYTES);
-  n_alloc = vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][*n_bufs],
+  vec_validate_aligned (wrk->tx_buffers, wanted - 1, CLIB_CACHE_LINE_BYTES);
+  n_alloc = vlib_buffer_alloc (vm, &wrk->tx_buffers[*n_bufs],
                               wanted - *n_bufs);
   *n_bufs += n_alloc;
-  _vec_len (smm->tx_buffers[thread_index]) = *n_bufs;
+  _vec_len (wrk->tx_buffers) = *n_bufs;
   return n_alloc;
 }
 
@@ -548,7 +548,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0;
   u32 thread_index = s->thread_index, n_left, pbi;
   session_manager_main_t *smm = &session_manager_main;
-  session_tx_context_t *ctx = &smm->ctx[thread_index];
+  session_manager_worker_t *wrk = &smm->wrk[thread_index];
+  session_tx_context_t *ctx = &wrk->ctx;
   transport_proto_t tp;
   vlib_buffer_t *pb;
   u16 n_bufs, rv;
@@ -556,7 +557,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   if (PREDICT_FALSE ((rv = session_tx_not_ready (s, peek_data))))
     {
       if (rv < 2)
-       vec_add1 (smm->pending_event_vector[thread_index], *e);
+       vec_add1 (wrk->pending_event_vector, *e);
       return SESSION_TX_NO_DATA;
     }
 
@@ -572,7 +573,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
     transport_connection_max_tx_burst (ctx->tc, vm->clib_time.last_cpu_time);
   if (ctx->snd_space == 0 || ctx->snd_mss == 0)
     {
-      vec_add1 (smm->pending_event_vector[thread_index], *e);
+      vec_add1 (wrk->pending_event_vector, *e);
       return SESSION_TX_NO_DATA;
     }
 
@@ -586,7 +587,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   if (PREDICT_FALSE (!ctx->max_len_to_snd))
     return SESSION_TX_NO_DATA;
 
-  n_bufs = vec_len (smm->tx_buffers[thread_index]);
+  n_bufs = vec_len (wrk->tx_buffers);
   n_bufs_needed = ctx->n_segs_per_evt * ctx->n_bufs_per_seg;
 
   /*
@@ -594,11 +595,11 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
    */
   if (n_bufs < n_bufs_needed)
     {
-      session_output_try_get_buffers (vm, smm, thread_index, &n_bufs,
+      session_output_try_get_buffers (vm, wrk, thread_index, &n_bufs,
                                      ctx->n_bufs_per_seg * VLIB_FRAME_SIZE);
       if (PREDICT_FALSE (n_bufs < n_bufs_needed))
        {
-         vec_add1 (smm->pending_event_vector[thread_index], *e);
+         vec_add1 (wrk->pending_event_vector, *e);
          return SESSION_TX_NO_BUFFERS;
        }
     }
@@ -620,15 +621,15 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
       vlib_buffer_t *b0, *b1;
       u32 bi0, bi1;
 
-      pbi = smm->tx_buffers[thread_index][n_bufs - 3];
+      pbi = wrk->tx_buffers[n_bufs - 3];
       pb = vlib_get_buffer (vm, pbi);
       vlib_prefetch_buffer_header (pb, STORE);
-      pbi = smm->tx_buffers[thread_index][n_bufs - 4];
+      pbi = wrk->tx_buffers[n_bufs - 4];
       pb = vlib_get_buffer (vm, pbi);
       vlib_prefetch_buffer_header (pb, STORE);
 
-      to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs];
-      to_next[1] = bi1 = smm->tx_buffers[thread_index][--n_bufs];
+      to_next[0] = bi0 = wrk->tx_buffers[--n_bufs];
+      to_next[1] = bi1 = wrk->tx_buffers[--n_bufs];
 
       b0 = vlib_get_buffer (vm, bi0);
       b1 = vlib_get_buffer (vm, bi1);
@@ -657,12 +658,12 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
 
       if (n_left > 1)
        {
-         pbi = smm->tx_buffers[thread_index][n_bufs - 2];
+         pbi = wrk->tx_buffers[n_bufs - 2];
          pb = vlib_get_buffer (vm, pbi);
          vlib_prefetch_buffer_header (pb, STORE);
        }
 
-      to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs];
+      to_next[0] = bi0 = wrk->tx_buffers[--n_bufs];
       b0 = vlib_get_buffer (vm, bi0);
       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
 
@@ -684,7 +685,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
     session_tx_trace_frame (vm, node, next_index, to_next,
                            ctx->n_segs_per_evt, s, n_trace);
 
-  _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
+  _vec_len (wrk->tx_buffers) = n_bufs;
   *n_tx_packets += ctx->n_segs_per_evt;
   transport_connection_update_tx_stats (ctx->tc, ctx->max_len_to_snd);
   vlib_put_next_frame (vm, node, next_index, n_left_to_next);
@@ -693,7 +694,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   ASSERT (ctx->left_to_snd == 0);
   if (ctx->max_len_to_snd < ctx->max_dequeue)
     if (svm_fifo_set_event (s->server_tx_fifo))
-      vec_add1 (smm->pending_event_vector[thread_index], *e);
+      vec_add1 (wrk->pending_event_vector, *e);
 
   if (!peek_data && ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
     {
@@ -704,7 +705,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
       /* More data needs to be read */
       else if (svm_fifo_max_dequeue (s->server_tx_fifo) > 0)
        if (svm_fifo_set_event (s->server_tx_fifo))
-         vec_add1 (smm->pending_event_vector[thread_index], *e);
+         vec_add1 (wrk->pending_event_vector, *e);
     }
   return SESSION_TX_OK;
 }
@@ -746,14 +747,14 @@ session_event_get_session (session_event_t * e, u8 thread_index)
 }
 
 static void
-session_update_dispatch_period (session_manager_main_t * smm, f64 now,
+session_update_dispatch_period (session_manager_worker_t * wrk, f64 now,
                                u32 thread_index)
 {
-  f64 sample, prev_period = smm->dispatch_period[thread_index], a = 0.8;
+  f64 sample, prev_period = wrk->dispatch_period, a = 0.8;
 
-  sample = now - smm->last_vlib_time[thread_index];
-  smm->dispatch_period[thread_index] = a * sample + (1 - a) * prev_period;
-  smm->last_vlib_time[thread_index] = now;
+  sample = now - wrk->last_vlib_time;
+  wrk->dispatch_period = a * sample + (1 - a) * prev_period;
+  wrk->last_vlib_time = now;
 }
 
 static uword
@@ -762,6 +763,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
   u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
+  session_manager_worker_t *wrk = &smm->wrk[thread_index];
   session_event_t *pending_events, *e;
   session_event_t *fifo_events;
   svm_msg_q_msg_t _msg, *msg = &_msg;
@@ -777,19 +779,19 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   /*
    *  Update transport time
    */
-  session_update_dispatch_period (smm, now, thread_index);
+  session_update_dispatch_period (wrk, now, thread_index);
   transport_update_time (now, thread_index);
 
   /*
    * Get vpp queue events that we can dequeue without blocking
    */
-  mq = smm->vpp_event_queues[thread_index];
-  fifo_events = smm->free_event_vector[thread_index];
+  mq = wrk->vpp_event_queue;
+  fifo_events = wrk->free_event_vector;
   n_to_dequeue = svm_msg_q_size (mq);
-  pending_events = smm->pending_event_vector[thread_index];
+  pending_events = wrk->pending_event_vector;
 
   if (!n_to_dequeue && !vec_len (pending_events)
-      && !vec_len (smm->pending_disconnects[thread_index]))
+      && !vec_len (wrk->pending_disconnects))
     return 0;
 
   SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
@@ -821,11 +823,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   svm_msg_q_unlock (mq);
 
   vec_append (fifo_events, pending_events);
-  vec_append (fifo_events, smm->pending_disconnects[thread_index]);
+  vec_append (fifo_events, wrk->pending_disconnects);
 
   _vec_len (pending_events) = 0;
-  smm->pending_event_vector[thread_index] = pending_events;
-  _vec_len (smm->pending_disconnects[thread_index]) = 0;
+  wrk->pending_event_vector = pending_events;
+  _vec_len (wrk->pending_disconnects) = 0;
 
 skip_dequeue:
   n_events = vec_len (fifo_events);
@@ -842,7 +844,7 @@ skip_dequeue:
          /* Don't try to send more that one frame per dispatch cycle */
          if (n_tx_packets == VLIB_FRAME_SIZE)
            {
-             vec_add1 (smm->pending_event_vector[thread_index], *e);
+             vec_add1 (wrk->pending_event_vector, *e);
              break;
            }
 
@@ -880,13 +882,13 @@ skip_dequeue:
          if (!e->postponed)
            {
              e->postponed = 1;
-             vec_add1 (smm->pending_disconnects[thread_index], *e);
+             vec_add1 (wrk->pending_disconnects, *e);
              continue;
            }
          /* If tx queue is still not empty, wait */
          if (svm_fifo_max_dequeue (s->server_tx_fifo))
            {
-             vec_add1 (smm->pending_disconnects[thread_index], *e);
+             vec_add1 (wrk->pending_disconnects, *e);
              continue;
            }
 
@@ -930,7 +932,7 @@ skip_dequeue:
     }
 
   _vec_len (fifo_events) = 0;
-  smm->free_event_vector[thread_index] = fifo_events;
+  wrk->free_event_vector = fifo_events;
 
   vlib_node_increment_counter (vm, session_queue_node.index,
                               SESSION_QUEUE_ERROR_TX, n_tx_packets);
@@ -966,7 +968,7 @@ dump_thread_0_event_queue (void)
   svm_msg_q_t *mq;
   int i, index;
 
-  mq = smm->vpp_event_queues[my_thread_index];
+  mq = smm->wrk[my_thread_index].vpp_event_queue;
   index = mq->q->head;
 
   for (i = 0; i < mq->q->cursize; i++)
@@ -1044,20 +1046,22 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
 u8
 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
 {
-  session_manager_main_t *smm = vnet_get_session_manager_main ();
-  svm_msg_q_t *mq;
   session_event_t *pending_event_vector, *evt;
+  session_manager_worker_t *wrk;
   int i, index, found = 0;
   svm_msg_q_msg_t *msg;
   svm_msg_q_ring_t *ring;
+  svm_msg_q_t *mq;
   u8 thread_index;
 
   ASSERT (e);
   thread_index = f->master_thread_index;
+  wrk = session_manager_get_worker (thread_index);
+
   /*
    * Search evt queue
    */
-  mq = smm->vpp_event_queues[thread_index];
+  mq = wrk->vpp_event_queue;
   index = mq->q->head;
   for (i = 0; i < mq->q->cursize; i++)
     {
@@ -1073,7 +1077,7 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
   /*
    * Search pending events vector
    */
-  pending_event_vector = smm->pending_event_vector[thread_index];
+  pending_event_vector = wrk->pending_event_vector;
   vec_foreach (evt, pending_event_vector)
   {
     found = session_node_cmp_event (evt, f);