init / exit function ordering
[vpp.git] / src / vlib / unix / input.c
index ecf659b..43bb206 100644 (file)
@@ -40,6 +40,7 @@
 #include <vlib/vlib.h>
 #include <vlib/unix/unix.h>
 #include <signal.h>
+#include <unistd.h>
 #include <vppinfra/tw_timer_1t_3w_1024sl_ov.h>
 
 /* FIXME autoconf */
 
 typedef struct
 {
+  CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
   int epoll_fd;
   struct epoll_event *epoll_events;
+  int n_epoll_fds;
 
   /* Statistics. */
   u64 epoll_files_ready;
   u64 epoll_waits;
 } linux_epoll_main_t;
 
-static linux_epoll_main_t linux_epoll_main;
+static linux_epoll_main_t *linux_epoll_mains = 0;
 
 static void
 linux_epoll_file_update (clib_file_t * f, clib_file_update_type_t update_type)
 {
   clib_file_main_t *fm = &file_main;
-  linux_epoll_main_t *em = &linux_epoll_main;
-  struct epoll_event e;
-  int op;
-
-  memset (&e, 0, sizeof (e));
+  linux_epoll_main_t *em = vec_elt_at_index (linux_epoll_mains,
+                                            f->polling_thread_index);
+  struct epoll_event e = { 0 };
+  int op, add_del = 0;
 
   e.events = EPOLLIN;
   if (f->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE)
@@ -84,6 +86,7 @@ linux_epoll_file_update (clib_file_t * f, clib_file_update_type_t update_type)
     {
     case UNIX_FILE_UPDATE_ADD:
       op = EPOLL_CTL_ADD;
+      add_del = 1;
       break;
 
     case UNIX_FILE_UPDATE_MODIFY:
@@ -92,6 +95,7 @@ linux_epoll_file_update (clib_file_t * f, clib_file_update_type_t update_type)
 
     case UNIX_FILE_UPDATE_DELETE:
       op = EPOLL_CTL_DEL;
+      add_del = -1;
       break;
 
     default:
@@ -99,30 +103,76 @@ linux_epoll_file_update (clib_file_t * f, clib_file_update_type_t update_type)
       return;
     }
 
+  /* worker threads open epoll fd only if needed */
+  if (update_type == UNIX_FILE_UPDATE_ADD && em->epoll_fd == -1)
+    {
+      em->epoll_fd = epoll_create (1);
+      if (em->epoll_fd < 0)
+       {
+         clib_unix_warning ("epoll_create");
+         return;
+       }
+      em->n_epoll_fds = 0;
+    }
+
   if (epoll_ctl (em->epoll_fd, op, f->file_descriptor, &e) < 0)
-    clib_unix_warning ("epoll_ctl");
+    {
+      clib_unix_warning ("epoll_ctl");
+      return;
+    }
+
+  em->n_epoll_fds += add_del;
+
+  if (em->n_epoll_fds == 0)
+    {
+      close (em->epoll_fd);
+      em->epoll_fd = -1;
+    }
 }
 
-static uword
-linux_epoll_input (vlib_main_t * vm,
-                  vlib_node_runtime_t * node, vlib_frame_t * frame)
+static_always_inline uword
+linux_epoll_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
+                         vlib_frame_t * frame, u32 thread_index)
 {
   unix_main_t *um = &unix_main;
   clib_file_main_t *fm = &file_main;
-  linux_epoll_main_t *em = &linux_epoll_main;
+  linux_epoll_main_t *em = vec_elt_at_index (linux_epoll_mains, thread_index);
   struct epoll_event *e;
   int n_fds_ready;
+  int is_main = (thread_index == 0);
 
   {
     vlib_node_main_t *nm = &vm->node_main;
     u32 ticks_until_expiration;
     f64 timeout;
+    f64 now;
     int timeout_ms = 0, max_timeout_ms = 10;
     f64 vector_rate = vlib_last_vectors_per_main_loop (vm);
 
+    if (is_main == 0)
+      now = vlib_time_now (vm);
+
+    /*
+     * If we've been asked for a fixed-sleep between main loop polls,
+     * do so right away.
+     */
+    if (PREDICT_FALSE (is_main && um->poll_sleep_usec))
+      {
+       struct timespec ts, tsrem;
+       timeout = 0;
+       timeout_ms = 0;
+       node->input_main_loops_per_call = 0;
+       ts.tv_sec = 0;
+       ts.tv_nsec = 1000 * um->poll_sleep_usec;
+
+       while (nanosleep (&ts, &tsrem) < 0)
+         {
+           ts = tsrem;
+         }
+      }
     /* If we're not working very hard, decide how long to sleep */
-    if (vector_rate < 2 && vm->api_queue_nonempty == 0
-       && nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] == 0)
+    else if (is_main && vector_rate < 2 && vm->api_queue_nonempty == 0
+            && nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] == 0)
       {
        ticks_until_expiration = TW (tw_timer_first_expires_in_ticks)
          ((TWT (tw_timer_wheel) *) nm->timing_wheel);
@@ -148,6 +198,14 @@ linux_epoll_input (vlib_main_t * vm,
          }
        node->input_main_loops_per_call = 0;
       }
+    else if (is_main == 0 && vector_rate < 2
+            && (vlib_global_main.time_last_barrier_release + 0.5 < now)
+            && nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] == 0)
+      {
+       timeout = 10e-3;
+       timeout_ms = max_timeout_ms;
+       node->input_main_loops_per_call = 0;
+      }
     else                       /* busy */
       {
        /* Don't come back for a respectable number of dispatch cycles */
@@ -155,21 +213,48 @@ linux_epoll_input (vlib_main_t * vm,
       }
 
     /* Allow any signal to wakeup our sleep. */
-    {
-      static sigset_t unblock_all_signals;
-      n_fds_ready = epoll_pwait (em->epoll_fd,
-                                em->epoll_events,
-                                vec_len (em->epoll_events),
-                                timeout_ms, &unblock_all_signals);
-
-      /* This kludge is necessary to run over absurdly old kernels */
-      if (n_fds_ready < 0 && errno == ENOSYS)
-       {
-         n_fds_ready = epoll_wait (em->epoll_fd,
-                                   em->epoll_events,
-                                   vec_len (em->epoll_events), timeout_ms);
-       }
-    }
+    if (is_main || em->epoll_fd != -1)
+      {
+       static sigset_t unblock_all_signals;
+       n_fds_ready = epoll_pwait (em->epoll_fd,
+                                  em->epoll_events,
+                                  vec_len (em->epoll_events),
+                                  timeout_ms, &unblock_all_signals);
+
+       /* This kludge is necessary to run over absurdly old kernels */
+       if (n_fds_ready < 0 && errno == ENOSYS)
+         {
+           n_fds_ready = epoll_wait (em->epoll_fd,
+                                     em->epoll_events,
+                                     vec_len (em->epoll_events), timeout_ms);
+         }
+
+      }
+    else
+      {
+       /*
+        * Worker thread, no epoll fd's, sleep for 100us at a time
+        * and check for a barrier sync request
+        */
+       if (timeout_ms)
+         {
+           struct timespec ts, tsrem;
+           f64 limit = now + (f64) timeout_ms * 1e-3;
+
+           while (vlib_time_now (vm) < limit)
+             {
+               /* Sleep for 100us at a time */
+               ts.tv_sec = 0;
+               ts.tv_nsec = 1000 * 100;
+
+               while (nanosleep (&ts, &tsrem) < 0)
+                 ts = tsrem;
+               if (*vlib_worker_threads->wait_at_barrier)
+                 goto done;
+             }
+         }
+       goto done;
+      }
   }
 
   if (n_fds_ready < 0)
@@ -178,7 +263,7 @@ linux_epoll_input (vlib_main_t * vm,
        vlib_panic_with_error (vm, clib_error_return_unix (0, "epoll_wait"));
 
       /* non fatal error (e.g. EINTR). */
-      return 0;
+      goto done;
     }
 
   em->epoll_waits += 1;
@@ -191,15 +276,50 @@ linux_epoll_input (vlib_main_t * vm,
       clib_error_t *errors[4];
       int n_errors = 0;
 
-      if (PREDICT_TRUE (!(e->events & EPOLLERR)))
+      if (PREDICT_FALSE (pool_is_free (fm->file_pool, f)))
+       {
+         /*
+          * Under rare scenerop, epoll may still post us events for the
+          * deleted file descriptor. We just deal with it and throw away the
+          * events for the corresponding file descriptor.
+          */
+         if (e->events & EPOLLIN)
+           {
+             errors[n_errors] =
+               clib_error_return (0, "epoll event EPOLLIN dropped due "
+                                  "to free index %u", i);
+             n_errors++;
+           }
+         if (e->events & EPOLLOUT)
+           {
+             errors[n_errors] =
+               clib_error_return (0, "epoll event EPOLLOUT dropped due "
+                                  "to free index %u", i);
+             n_errors++;
+           }
+         if (e->events & EPOLLERR)
+           {
+             errors[n_errors] =
+               clib_error_return (0, "epoll event EPOLLERR dropped due "
+                                  "to free index %u", i);
+             n_errors++;
+           }
+       }
+      else if (PREDICT_TRUE (!(e->events & EPOLLERR)))
        {
          if (e->events & EPOLLIN)
            {
+             f->read_events++;
              errors[n_errors] = f->read_function (f);
+             /* Make sure f is valid if the file pool moves */
+             if (pool_is_free_index (fm->file_pool, i))
+               continue;
+             f = pool_elt_at_index (fm->file_pool, i);
              n_errors += errors[n_errors] != 0;
            }
          if (e->events & EPOLLOUT)
            {
+             f->write_events++;
              errors[n_errors] = f->write_function (f);
              n_errors += errors[n_errors] != 0;
            }
@@ -208,6 +328,7 @@ linux_epoll_input (vlib_main_t * vm,
        {
          if (f->error_function)
            {
+             f->error_events++;
              errors[n_errors] = f->error_function (f);
              n_errors += errors[n_errors] != 0;
            }
@@ -222,9 +343,28 @@ linux_epoll_input (vlib_main_t * vm,
        }
     }
 
+done:
+  if (PREDICT_FALSE (vm->cpu_id != clib_get_current_cpu_id ()))
+    {
+      vm->cpu_id = clib_get_current_cpu_id ();
+      vm->numa_node = clib_get_current_numa_node ();
+    }
+
   return 0;
 }
 
+static uword
+linux_epoll_input (vlib_main_t * vm,
+                  vlib_node_runtime_t * node, vlib_frame_t * frame)
+{
+  u32 thread_index = vlib_get_thread_index ();
+
+  if (thread_index == 0)
+    return linux_epoll_input_inline (vm, node, frame, 0);
+  else
+    return linux_epoll_input_inline (vm, node, frame, thread_index);
+}
+
 /* *INDENT-OFF* */
 VLIB_REGISTER_NODE (linux_epoll_input_node,static) = {
   .function = linux_epoll_input,
@@ -236,15 +376,28 @@ VLIB_REGISTER_NODE (linux_epoll_input_node,static) = {
 clib_error_t *
 linux_epoll_input_init (vlib_main_t * vm)
 {
-  linux_epoll_main_t *em = &linux_epoll_main;
+  linux_epoll_main_t *em;
   clib_file_main_t *fm = &file_main;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
+
+
+  vec_validate_aligned (linux_epoll_mains, tm->n_vlib_mains,
+                       CLIB_CACHE_LINE_BYTES);
 
-  /* Allocate some events. */
-  vec_resize (em->epoll_events, VLIB_FRAME_SIZE);
+  vec_foreach (em, linux_epoll_mains)
+  {
+    /* Allocate some events. */
+    vec_resize (em->epoll_events, VLIB_FRAME_SIZE);
 
-  em->epoll_fd = epoll_create (vec_len (em->epoll_events));
-  if (em->epoll_fd < 0)
-    return clib_error_return_unix (0, "epoll_create");
+    if (linux_epoll_mains == em)
+      {
+       em->epoll_fd = epoll_create (1);
+       if (em->epoll_fd < 0)
+         return clib_error_return_unix (0, "epoll_create");
+      }
+    else
+      em->epoll_fd = -1;
+  }
 
   fm->file_update = linux_epoll_file_update;
 
@@ -258,10 +411,15 @@ VLIB_INIT_FUNCTION (linux_epoll_input_init);
 static clib_error_t *
 unix_input_init (vlib_main_t * vm)
 {
-  return vlib_call_init_function (vm, linux_epoll_input_init);
+  return 0;
 }
 
-VLIB_INIT_FUNCTION (unix_input_init);
+/* *INDENT-OFF* */
+VLIB_INIT_FUNCTION (unix_input_init) =
+{
+  .runs_before = VLIB_INITS ("linux_epoll_input_init"),
+};
+/* *INDENT-ON* */
 
 /*
  * fd.io coding-style-patch-verification: ON