session: basic support for interrupt mode
[vpp.git] / src / vnet / session / session_node.c
index 8a350d4..d7adbb5 100644 (file)
@@ -24,6 +24,7 @@
 #include <vnet/session/application_local.h>
 #include <vnet/session/session_debug.h>
 #include <svm/queue.h>
+#include <sys/timerfd.h>
 
 #define app_check_thread_and_barrier(_fn, _arg)                                \
   if (!vlib_thread_is_main_w_barrier ())                               \
@@ -472,8 +473,10 @@ session_mq_worker_update_handler (void *data)
   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
   rmp = (session_worker_update_reply_msg_t *) evt->data;
   rmp->handle = mp->handle;
-  rmp->rx_fifo = pointer_to_uword (s->rx_fifo);
-  rmp->tx_fifo = pointer_to_uword (s->tx_fifo);
+  if (s->rx_fifo)
+    rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
+  if (s->tx_fifo)
+    rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
   rmp->segment_handle = session_segment_handle (s);
   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
 
@@ -630,16 +633,18 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
              svm_fifo_t *f = ctx->s->tx_fifo;
              session_dgram_hdr_t *hdr = &ctx->hdr;
              u16 deq_now;
+             u32 offset;
+
              deq_now = clib_min (hdr->data_length - hdr->data_offset,
                                  len_to_deq);
-             n_bytes_read = svm_fifo_peek (f, hdr->data_offset, deq_now,
-                                           data);
+             offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
+             n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
              ASSERT (n_bytes_read > 0);
 
              hdr->data_offset += n_bytes_read;
              if (hdr->data_offset == hdr->data_length)
                {
-                 u32 offset = hdr->data_length + SESSION_CONN_HDR_LEN;
+                 offset = hdr->data_length + SESSION_CONN_HDR_LEN;
                  svm_fifo_dequeue_drop (f, offset);
                  if (ctx->left_to_snd > n_bytes_read)
                    svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
@@ -1037,7 +1042,8 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
       return SESSION_TX_NO_BUFFERS;
     }
 
-  transport_connection_update_tx_bytes (ctx->tc, ctx->max_len_to_snd);
+  if (transport_connection_is_tx_paced (ctx->tc))
+    transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
 
   ctx->left_to_snd = ctx->max_len_to_snd;
   n_left = ctx->n_segs_per_evt;
@@ -1127,7 +1133,10 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
 
   if (!peek_data)
     {
-      if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd))
+      u32 n_dequeued = ctx->max_len_to_snd;
+      if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
+       n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
+      if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
        session_dequeue_notify (ctx->s);
     }
   return SESSION_TX_OK;
@@ -1182,6 +1191,10 @@ session_tx_fifo_dequeue_internal (session_worker_t * wrk,
          session_evt_add_head_old (wrk, elt);
     }
 
+  if (sp->max_burst_size &&
+      svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
+    session_dequeue_notify (s);
+
   return n_packets;
 }
 
@@ -1387,24 +1400,112 @@ session_flush_pending_tx_buffers (session_worker_t * wrk,
   vec_reset_length (wrk->pending_tx_nexts);
 }
 
+int
+session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
+{
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  u32 i, n_to_dequeue = 0;
+  session_event_t *evt;
+
+  n_to_dequeue = svm_msg_q_size (mq);
+  for (i = 0; i < n_to_dequeue; i++)
+    {
+      svm_msg_q_sub_raw (mq, msg);
+      evt = svm_msg_q_msg_data (mq, msg);
+      session_evt_add_to_list (wrk, evt);
+      svm_msg_q_free_msg (mq, msg);
+    }
+
+  return n_to_dequeue;
+}
+
+static void
+session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
+{
+  struct itimerspec its;
+
+  its.it_value.tv_sec = 0;
+  its.it_value.tv_nsec = time_ns;
+  its.it_interval.tv_sec = 0;
+  its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+  if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
+    clib_warning ("timerfd_settime");
+}
+
+always_inline u64
+session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
+{
+  if (state == SESSION_WRK_INTERRUPT)
+    return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
+  else if (state == SESSION_WRK_IDLE)
+    return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
+  else
+    return 0;
+}
+
+static inline void
+session_wrk_state_update (session_worker_t *wrk, session_wrk_state_t state)
+{
+  u64 time_ns;
+
+  wrk->state = state;
+  time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
+  session_wrk_timerfd_update (wrk, time_ns);
+}
+
+static void
+session_wrk_update_state (session_worker_t *wrk)
+{
+  vlib_main_t *vm = wrk->vm;
+
+  if (wrk->state == SESSION_WRK_POLLING)
+    {
+      if (pool_elts (wrk->event_elts) == 3 &&
+         vlib_last_vectors_per_main_loop (vm) < 1)
+       {
+         session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+         vlib_node_set_state (vm, session_queue_node.index,
+                              VLIB_NODE_STATE_INTERRUPT);
+       }
+    }
+  else if (wrk->state == SESSION_WRK_INTERRUPT)
+    {
+      if (pool_elts (wrk->event_elts) > 3 ||
+         vlib_last_vectors_per_main_loop (vm) > 1)
+       {
+         session_wrk_state_update (wrk, SESSION_WRK_POLLING);
+         vlib_node_set_state (vm, session_queue_node.index,
+                              VLIB_NODE_STATE_POLLING);
+       }
+      else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
+       {
+         session_wrk_state_update (wrk, SESSION_WRK_IDLE);
+       }
+    }
+  else
+    {
+      if (pool_elts (wrk->event_elts))
+       {
+         session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+       }
+    }
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
 {
+  u32 thread_index = vm->thread_index, __clib_unused n_evts;
+  session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
   session_main_t *smm = vnet_get_session_main ();
-  u32 thread_index = vm->thread_index, n_to_dequeue;
   session_worker_t *wrk = &smm->wrk[thread_index];
-  session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
   clib_llist_index_t ei, next_ei, old_ti;
-  svm_msg_q_msg_t _msg, *msg = &_msg;
-  int i = 0, n_tx_packets;
-  session_event_t *evt;
-  svm_msg_q_t *mq;
+  int n_tx_packets;
 
   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
 
-  wrk->last_vlib_time = vlib_time_now (vm);
-  wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
+  session_wrk_update_time (wrk, vlib_time_now (vm));
 
   /*
    *  Update transport time
@@ -1414,26 +1515,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
 
   /*
-   *  Dequeue and handle new events
+   *  Dequeue new internal mq events
    */
 
-  /* Try to dequeue what is available. Don't wait for lock.
-   * XXX: we may need priorities here */
-  mq = wrk->vpp_event_queue;
-  n_to_dequeue = svm_msg_q_size (mq);
-  if (n_to_dequeue && svm_msg_q_try_lock (mq) == 0)
-    {
-      for (i = 0; i < n_to_dequeue; i++)
-       {
-         svm_msg_q_sub_w_lock (mq, msg);
-         evt = svm_msg_q_msg_data (mq, msg);
-         session_evt_add_to_list (wrk, evt);
-         svm_msg_q_free_msg (mq, msg);
-       }
-      svm_msg_q_unlock (mq);
-    }
-
-  SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_to_dequeue, !i);
+  n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
+  SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
 
   /*
    * Handle control events
@@ -1441,12 +1527,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
 
-  /* *INDENT-OFF* */
   clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
     clib_llist_remove (wrk->event_elts, evt_list, elt);
     session_event_dispatch_ctrl (wrk, elt);
   }));
-  /* *INDENT-ON* */
 
   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
 
@@ -1503,6 +1587,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
 
+  if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
+    session_wrk_update_state (wrk);
+
   return n_tx_packets;
 }
 
@@ -1520,10 +1607,50 @@ VLIB_REGISTER_NODE (session_queue_node) =
 };
 /* *INDENT-ON* */
 
+static clib_error_t *
+session_wrk_tfd_read_ready (clib_file_t *cf)
+{
+  session_worker_t *wrk = session_main_get_worker (cf->private_data);
+  u64 buf;
+  int rv;
+
+  vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+  rv = read (wrk->timerfd, &buf, sizeof (buf));
+  if (rv < 0 && errno != EAGAIN)
+    clib_unix_warning ("failed");
+  return 0;
+}
+
+static clib_error_t *
+session_wrk_tfd_write_ready (clib_file_t *cf)
+{
+  return 0;
+}
+
+void
+session_wrk_enable_adaptive_mode (session_worker_t *wrk)
+{
+  u32 thread_index = wrk->vm->thread_index;
+  clib_file_t template = { 0 };
+
+  if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+    clib_warning ("timerfd_create");
+
+  template.read_function = session_wrk_tfd_read_ready;
+  template.write_function = session_wrk_tfd_write_ready;
+  template.file_descriptor = wrk->timerfd;
+  template.private_data = thread_index;
+  template.polling_thread_index = thread_index;
+  template.description = format (0, "session-wrk-tfd-%u", thread_index);
+
+  wrk->timerfd_file = clib_file_add (&file_main, &template);
+  wrk->flags |= SESSION_WRK_F_ADAPTIVE;
+}
+
 static clib_error_t *
 session_queue_exit (vlib_main_t * vm)
 {
-  if (vec_len (vlib_mains) < 2)
+  if (vlib_get_n_threads () < 2)
     return 0;
 
   /*