session: basic support for interrupt mode 85/31685/34
authorFlorin Coras <fcoras@cisco.com>
Thu, 18 Mar 2021 22:04:34 +0000 (15:04 -0700)
committerDamjan Marion <dmarion@me.com>
Tue, 6 Apr 2021 11:15:38 +0000 (11:15 +0000)
Experimental support for session layer interrupt mode.  When enabled
(use-private-rx-mqs must be set) session queue node switches to
interrupt state when lightly loaded, i.e., no events and less than 1
vector/dispatch.

Because transport protocols require a periodic time update, when in
interrupt state the session queue node workers register a timerfd with
the unix-epoll-input node that when triggered signals, i.e., wakes up,
the queue node. Under light load, the timer is set to trigger every 1ms
whereas if no session is allocated, the worker moves to idle state and
the timeout is set to 100ms.

Type: feature

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I905b00777fbc025faf9c4074fce4c516cd139387

src/vnet/session/application.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c

index 16acc9c..a93e4b9 100644 (file)
@@ -497,6 +497,9 @@ VLIB_NODE_FN (appsl_rx_mqs_input_node)
   if (aw->pending_rx_mqs)
     vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
 
+  if (n_msgs && wrk->state == SESSION_WRK_INTERRUPT)
+    vlib_node_set_interrupt_pending (vm, session_queue_node.index);
+
   return n_msgs;
 }
 
index c24a95f..7513aa3 100644 (file)
@@ -28,11 +28,12 @@ static inline int
 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
                            session_evt_type_t evt_type)
 {
+  session_worker_t *wrk = session_main_get_worker (thread_index);
   session_event_t *evt;
   svm_msg_q_msg_t msg;
   svm_msg_q_t *mq;
 
-  mq = session_main_get_vpp_event_queue (thread_index);
+  mq = wrk->vpp_event_queue;
   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
     return -1;
   if (PREDICT_FALSE (svm_msg_q_is_full (mq)
@@ -72,6 +73,10 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
   evt->event_type = evt_type;
 
   svm_msg_q_add_and_unlock (mq, &msg);
+
+  if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+    vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+
   return 0;
 }
 
@@ -121,19 +126,20 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
 void
 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
 {
-  session_t *s;
+  session_t *s = session_get (tc->s_index, tc->thread_index);
 
-  s = session_get (tc->s_index, tc->thread_index);
   ASSERT (s->thread_index == vlib_get_thread_index ());
   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
+
   if (!(s->flags & SESSION_F_CUSTOM_TX))
     {
       s->flags |= SESSION_F_CUSTOM_TX;
       if (svm_fifo_set_event (s->tx_fifo)
          || transport_connection_is_descheduled (tc))
        {
-         session_worker_t *wrk;
          session_evt_elt_t *elt;
+         session_worker_t *wrk;
+
          wrk = session_main_get_worker (tc->thread_index);
          if (has_prio)
            elt = session_evt_alloc_new (wrk);
@@ -142,6 +148,10 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
          elt->evt.session_index = tc->s_index;
          elt->evt.event_type = SESSION_IO_EVT_TX;
          tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
+
+         if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+           vlib_node_set_interrupt_pending (wrk->vm,
+                                            session_queue_node.index);
        }
     }
 }
@@ -157,6 +167,9 @@ sesssion_reschedule_tx (transport_connection_t * tc)
   elt = session_evt_alloc_new (wrk);
   elt->evt.session_index = tc->s_index;
   elt->evt.event_type = SESSION_IO_EVT_TX;
+
+  if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+    vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
 }
 
 static void
@@ -175,6 +188,9 @@ session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
       clib_memset (&elt->evt, 0, sizeof (session_event_t));
       elt->evt.session_handle = session_handle (s);
       elt->evt.event_type = evt;
+
+      if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+       vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
     }
   else
     session_send_ctrl_evt_to_thread (s, evt);
@@ -1693,6 +1709,9 @@ session_manager_main_enable (vlib_main_t * vm)
 
       if (num_threads > 1)
        clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
+
+      if (!smm->no_adaptive && smm->use_private_rx_mqs)
+       session_wrk_enable_adaptive_mode (wrk);
     }
 
   /* Allocate vpp event queues segment and queue */
@@ -1817,6 +1836,7 @@ session_main_init (vlib_main_t * vm)
   smm->session_enable_asap = 0;
   smm->poll_main = 0;
   smm->use_private_rx_mqs = 0;
+  smm->no_adaptive = 0;
   smm->session_baseva = HIGH_SEGMENT_BASEVA;
 
 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
@@ -1938,6 +1958,8 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input)
        smm->poll_main = 1;
       else if (unformat (input, "use-private-rx-mqs"))
        smm->use_private_rx_mqs = 1;
+      else if (unformat (input, "no-adaptive"))
+       smm->no_adaptive = 1;
       else
        return clib_error_return (0, "unknown input `%U'",
                                  format_unformat_error, input);
index 5586316..93278d6 100644 (file)
@@ -69,6 +69,18 @@ typedef struct session_ctrl_evt_data_
   u8 data[SESSION_CTRL_MSG_MAX_SIZE];
 } session_evt_ctrl_data_t;
 
+typedef enum session_wrk_state_
+{
+  SESSION_WRK_POLLING,
+  SESSION_WRK_INTERRUPT,
+  SESSION_WRK_IDLE,
+} __clib_packed session_wrk_state_t;
+
+typedef enum session_wrk_flags_
+{
+  SESSION_WRK_F_ADAPTIVE = 1 << 0,
+} __clib_packed session_wrk_flag_t;
+
 typedef struct session_worker_
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -91,6 +103,15 @@ typedef struct session_worker_
   /** Per-proto vector of sessions to enqueue */
   u32 **session_to_enqueue;
 
+  /** Timerfd used to periodically signal wrk session queue node */
+  u32 timerfd;
+
+  /** Worker flags */
+  session_wrk_flag_t flags;
+
+  /** Worker state */
+  session_wrk_state_t state;
+
   /** Context for session tx */
   session_tx_context_t ctx;
 
@@ -121,6 +142,9 @@ typedef struct session_worker_
   /** Vector of nexts for the pending tx buffers */
   u16 *pending_tx_nexts;
 
+  /** Clib file for timerfd. Used only if adaptive mode is on */
+  uword timerfd_file;
+
 #if SESSION_DEBUG
   /** last event poll time by thread */
   clib_time_type_t last_event_poll;
@@ -177,6 +201,9 @@ typedef struct session_main_
   /** Allocate private rx mqs for external apps */
   u8 use_private_rx_mqs;
 
+  /** Do not enable session queue node adaptive mode */
+  u8 no_adaptive;
+
   /** vpp fifo event queue configured length */
   u32 configured_event_queue_length;
 
@@ -682,6 +709,8 @@ session_add_pending_tx_buffer (u32 thread_index, u32 bi, u32 next_node)
   session_worker_t *wrk = session_main_get_worker (thread_index);
   vec_add1 (wrk->pending_tx_buffers, bi);
   vec_add1 (wrk->pending_tx_nexts, next_node);
+  if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+    vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
 }
 
 always_inline void
@@ -691,6 +720,7 @@ session_wrk_update_time (session_worker_t *wrk, f64 now)
   wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
 }
 
+void session_wrk_enable_adaptive_mode (session_worker_t *wrk);
 fifo_segment_t *session_main_get_evt_q_segment (void);
 void session_node_enable_disable (u8 is_en);
 clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
index f8157cc..d7adbb5 100644 (file)
@@ -24,6 +24,7 @@
 #include <vnet/session/application_local.h>
 #include <vnet/session/session_debug.h>
 #include <svm/queue.h>
+#include <sys/timerfd.h>
 
 #define app_check_thread_and_barrier(_fn, _arg)                                \
   if (!vlib_thread_is_main_w_barrier ())                               \
@@ -1418,6 +1419,79 @@ session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
   return n_to_dequeue;
 }
 
+static void
+session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
+{
+  struct itimerspec its;
+
+  its.it_value.tv_sec = 0;
+  its.it_value.tv_nsec = time_ns;
+  its.it_interval.tv_sec = 0;
+  its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+  if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
+    clib_warning ("timerfd_settime");
+}
+
+always_inline u64
+session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
+{
+  if (state == SESSION_WRK_INTERRUPT)
+    return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
+  else if (state == SESSION_WRK_IDLE)
+    return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
+  else
+    return 0;
+}
+
+static inline void
+session_wrk_state_update (session_worker_t *wrk, session_wrk_state_t state)
+{
+  u64 time_ns;
+
+  wrk->state = state;
+  time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
+  session_wrk_timerfd_update (wrk, time_ns);
+}
+
+static void
+session_wrk_update_state (session_worker_t *wrk)
+{
+  vlib_main_t *vm = wrk->vm;
+
+  if (wrk->state == SESSION_WRK_POLLING)
+    {
+      if (pool_elts (wrk->event_elts) == 3 &&
+         vlib_last_vectors_per_main_loop (vm) < 1)
+       {
+         session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+         vlib_node_set_state (vm, session_queue_node.index,
+                              VLIB_NODE_STATE_INTERRUPT);
+       }
+    }
+  else if (wrk->state == SESSION_WRK_INTERRUPT)
+    {
+      if (pool_elts (wrk->event_elts) > 3 ||
+         vlib_last_vectors_per_main_loop (vm) > 1)
+       {
+         session_wrk_state_update (wrk, SESSION_WRK_POLLING);
+         vlib_node_set_state (vm, session_queue_node.index,
+                              VLIB_NODE_STATE_POLLING);
+       }
+      else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
+       {
+         session_wrk_state_update (wrk, SESSION_WRK_IDLE);
+       }
+    }
+  else
+    {
+      if (pool_elts (wrk->event_elts))
+       {
+         session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+       }
+    }
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
@@ -1513,6 +1587,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
 
+  if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
+    session_wrk_update_state (wrk);
+
   return n_tx_packets;
 }
 
@@ -1530,6 +1607,46 @@ VLIB_REGISTER_NODE (session_queue_node) =
 };
 /* *INDENT-ON* */
 
+static clib_error_t *
+session_wrk_tfd_read_ready (clib_file_t *cf)
+{
+  session_worker_t *wrk = session_main_get_worker (cf->private_data);
+  u64 buf;
+  int rv;
+
+  vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+  rv = read (wrk->timerfd, &buf, sizeof (buf));
+  if (rv < 0 && errno != EAGAIN)
+    clib_unix_warning ("failed");
+  return 0;
+}
+
+static clib_error_t *
+session_wrk_tfd_write_ready (clib_file_t *cf)
+{
+  return 0;
+}
+
+void
+session_wrk_enable_adaptive_mode (session_worker_t *wrk)
+{
+  u32 thread_index = wrk->vm->thread_index;
+  clib_file_t template = { 0 };
+
+  if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+    clib_warning ("timerfd_create");
+
+  template.read_function = session_wrk_tfd_read_ready;
+  template.write_function = session_wrk_tfd_write_ready;
+  template.file_descriptor = wrk->timerfd;
+  template.private_data = thread_index;
+  template.polling_thread_index = thread_index;
+  template.description = format (0, "session-wrk-tfd-%u", thread_index);
+
+  wrk->timerfd_file = clib_file_add (&file_main, &template);
+  wrk->flags |= SESSION_WRK_F_ADAPTIVE;
+}
+
 static clib_error_t *
 session_queue_exit (vlib_main_t * vm)
 {