linux-cp: set severity of noisy message to debug
[vpp.git] / src / plugins / linux-cp / lcp_nl.c
index 1c0ca0d..8f2bffd 100644 (file)
@@ -31,6 +31,7 @@
 #include <vlib/vlib.h>
 #include <vlib/unix/unix.h>
 #include <vppinfra/error.h>
+#include <vppinfra/linux/netns.h>
 
 #include <vnet/fib/fib_table.h>
 
 
 #include <plugins/linux-cp/lcp_interface.h>
 
+typedef enum nl_status_t_
+{
+  NL_STATUS_NOTIF_PROC,
+  NL_STATUS_SYNC,
+} nl_status_t;
+
+typedef enum nl_sock_type_t_
+{
+  NL_SOCK_TYPE_LINK,
+  NL_SOCK_TYPE_ADDR,
+  NL_SOCK_TYPE_NEIGH,
+  NL_SOCK_TYPE_ROUTE,
+} nl_sock_type_t;
+
+#define NL_SOCK_TYPES_N (NL_SOCK_TYPE_ROUTE + 1)
+
+/* Socket type, message type, type name, function subname */
+#define foreach_sock_type                                                     \
+  _ (NL_SOCK_TYPE_LINK, RTM_GETLINK, "link", link)                            \
+  _ (NL_SOCK_TYPE_ADDR, RTM_GETADDR, "address", link_addr)                    \
+  _ (NL_SOCK_TYPE_NEIGH, RTM_GETNEIGH, "neighbor", neigh)                     \
+  _ (NL_SOCK_TYPE_ROUTE, RTM_GETROUTE, "route", route)
+
 typedef enum nl_event_type_t_
 {
   NL_EVENT_READ,
@@ -47,7 +71,10 @@ typedef enum nl_event_type_t_
 typedef struct nl_main
 {
 
+  nl_status_t nl_status;
+
   struct nl_sock *sk_route;
+  struct nl_sock *sk_route_sync[NL_SOCK_TYPES_N];
   vlib_log_class_t nl_logger;
   nl_vft_t *nl_vfts;
   struct nl_cache *nl_caches[LCP_NL_N_OBJS];
@@ -59,6 +86,10 @@ typedef struct nl_main
   u32 batch_size;
   u32 batch_delay_ms;
 
+  u32 sync_batch_limit;
+  u32 sync_batch_delay_ms;
+  u32 sync_attempt_delay_ms;
+
 } nl_main_t;
 
 #define NL_RX_BUF_SIZE_DEF    (1 << 27) /* 128 MB */
@@ -66,11 +97,18 @@ typedef struct nl_main
 #define NL_BATCH_SIZE_DEF     (1 << 11) /* 2048 */
 #define NL_BATCH_DELAY_MS_DEF 50       /* 50 ms, max 20 batch/s */
 
+#define NL_SYNC_BATCH_LIMIT_DEF             (1 << 10) /* 1024 */
+#define NL_SYNC_BATCH_DELAY_MS_DEF   20               /* 20ms, max 50 batch/s */
+#define NL_SYNC_ATTEMPT_DELAY_MS_DEF 2000      /* 2s */
+
 static nl_main_t nl_main = {
   .rx_buf_size = NL_RX_BUF_SIZE_DEF,
   .tx_buf_size = NL_TX_BUF_SIZE_DEF,
   .batch_size = NL_BATCH_SIZE_DEF,
   .batch_delay_ms = NL_BATCH_DELAY_MS_DEF,
+  .sync_batch_limit = NL_SYNC_BATCH_LIMIT_DEF,
+  .sync_batch_delay_ms = NL_SYNC_BATCH_DELAY_MS_DEF,
+  .sync_attempt_delay_ms = NL_SYNC_ATTEMPT_DELAY_MS_DEF,
 };
 
 /* #define foreach_nl_nft_proto  \ */
@@ -103,6 +141,25 @@ static nl_main_t nl_main = {
       }                                                                       \
   }
 
+#define FOREACH_VFT_NO_ARG(__func)                                            \
+  {                                                                           \
+    nl_main_t *nm = &nl_main;                                                 \
+    nl_vft_t *__nv;                                                           \
+    vec_foreach (__nv, nm->nl_vfts)                                           \
+      {                                                                       \
+       if (!__nv->__func.cb)                                                 \
+         continue;                                                           \
+                                                                              \
+       if (!__nv->__func.is_mp_safe)                                         \
+         vlib_worker_thread_barrier_sync (vlib_get_main ());                 \
+                                                                              \
+       __nv->__func.cb ();                                                   \
+                                                                              \
+       if (!__nv->__func.is_mp_safe)                                         \
+         vlib_worker_thread_barrier_release (vlib_get_main ());              \
+      }                                                                       \
+  }
+
 #define FOREACH_VFT_CTX(__func, __arg, __ctx)                                 \
   {                                                                           \
     nl_main_t *nm = &nl_main;                                                 \
@@ -136,6 +193,8 @@ nl_register_vft (const nl_vft_t *nv)
 
 static void lcp_nl_open_socket (void);
 static void lcp_nl_close_socket (void);
+static void lcp_nl_open_sync_socket (nl_sock_type_t sock_type);
+static void lcp_nl_close_sync_socket (nl_sock_type_t sock_type);
 
 static void
 nl_route_del (struct rtnl_route *rr, void *arg)
@@ -149,6 +208,18 @@ nl_route_add (struct rtnl_route *rr, void *arg)
   FOREACH_VFT (nvl_rt_route_add, rr);
 }
 
+static void
+nl_route_sync_begin (void)
+{
+  FOREACH_VFT_NO_ARG (nvl_rt_route_sync_begin);
+}
+
+static void
+nl_route_sync_end (void)
+{
+  FOREACH_VFT_NO_ARG (nvl_rt_route_sync_end);
+}
+
 static void
 nl_neigh_del (struct rtnl_neigh *rn, void *arg)
 {
@@ -161,6 +232,18 @@ nl_neigh_add (struct rtnl_neigh *rn, void *arg)
   FOREACH_VFT (nvl_rt_neigh_add, rn);
 }
 
+static void
+nl_neigh_sync_begin (void)
+{
+  FOREACH_VFT_NO_ARG (nvl_rt_neigh_sync_begin);
+}
+
+static void
+nl_neigh_sync_end (void)
+{
+  FOREACH_VFT_NO_ARG (nvl_rt_neigh_sync_end);
+}
+
 static void
 nl_link_addr_del (struct rtnl_addr *rla, void *arg)
 {
@@ -173,6 +256,18 @@ nl_link_addr_add (struct rtnl_addr *rla, void *arg)
   FOREACH_VFT (nvl_rt_addr_add, rla);
 }
 
+static void
+nl_link_addr_sync_begin (void)
+{
+  FOREACH_VFT_NO_ARG (nvl_rt_addr_sync_begin);
+}
+
+static void
+nl_link_addr_sync_end (void)
+{
+  FOREACH_VFT_NO_ARG (nvl_rt_addr_sync_end);
+}
+
 static void
 nl_link_del (struct rtnl_link *rl, void *arg)
 {
@@ -185,6 +280,18 @@ nl_link_add (struct rtnl_link *rl, void *arg)
   FOREACH_VFT_CTX (nvl_rt_link_add, rl, arg);
 }
 
+static void
+nl_link_sync_begin (void)
+{
+  FOREACH_VFT_NO_ARG (nvl_rt_link_sync_begin);
+}
+
+static void
+nl_link_sync_end (void)
+{
+  FOREACH_VFT_NO_ARG (nvl_rt_link_sync_end);
+}
+
 static void
 nl_route_dispatch (struct nl_object *obj, void *arg)
 {
@@ -246,7 +353,171 @@ nl_route_process_msgs (void)
   if (n_msgs)
     vec_delete (nm->nl_msg_queue, n_msgs, 0);
 
-  NL_INFO ("Processed %u messages", n_msgs);
+  NL_DBG ("Processed %u messages", n_msgs);
+
+  return n_msgs;
+}
+
+static int
+lcp_nl_route_discard_msgs (void)
+{
+  nl_main_t *nm = &nl_main;
+  nl_msg_info_t *msg_info;
+  int n_msgs;
+
+  n_msgs = vec_len (nm->nl_msg_queue);
+  if (n_msgs == 0)
+    return 0;
+
+  vec_foreach (msg_info, nm->nl_msg_queue)
+    {
+      nlmsg_free (msg_info->msg);
+    }
+
+  vec_reset_length (nm->nl_msg_queue);
+
+  NL_INFO ("Discarded %u messages", n_msgs);
+
+  return n_msgs;
+}
+
+static int
+lcp_nl_route_send_dump_req (nl_sock_type_t sock_type, int msg_type)
+{
+  nl_main_t *nm = &nl_main;
+  struct nl_sock *sk_route = nm->sk_route_sync[sock_type];
+  int err;
+  struct rtgenmsg rt_hdr = {
+    .rtgen_family = AF_UNSPEC,
+  };
+
+  err =
+    nl_send_simple (sk_route, msg_type, NLM_F_DUMP, &rt_hdr, sizeof (rt_hdr));
+
+  if (err < 0)
+    {
+      NL_ERROR ("Unable to send a dump request: %s", nl_geterror (err));
+    }
+  else
+    NL_INFO ("Dump request sent via socket %d of type %d",
+            nl_socket_get_fd (sk_route), sock_type);
+
+  return err;
+}
+
+static int
+lcp_nl_route_dump_cb (struct nl_msg *msg, void *arg)
+{
+  int err;
+
+  if ((err = nl_msg_parse (msg, nl_route_dispatch, NULL)) < 0)
+    NL_ERROR ("Unable to parse object: %s", nl_geterror (err));
+
+  return NL_OK;
+}
+
+static int
+lcp_nl_recv_dump_replies (nl_sock_type_t sock_type, int msg_limit,
+                         int *is_done_rcvd)
+{
+  nl_main_t *nm = &nl_main;
+  struct nl_sock *sk_route = nm->sk_route_sync[sock_type];
+  struct sockaddr_nl nla;
+  uint8_t *buf = NULL;
+  int n_bytes;
+  struct nlmsghdr *hdr;
+  struct nl_msg *msg = NULL;
+  int err = 0;
+  int done = 0;
+  int n_msgs = 0;
+
+continue_reading:
+  n_bytes = nl_recv (sk_route, &nla, &buf, /* creds */ NULL);
+  if (n_bytes <= 0)
+    return n_bytes;
+
+  hdr = (struct nlmsghdr *) buf;
+  while (nlmsg_ok (hdr, n_bytes))
+    {
+      nlmsg_free (msg);
+      msg = nlmsg_convert (hdr);
+      if (!msg)
+       {
+         err = -NLE_NOMEM;
+         goto out;
+       }
+
+      n_msgs++;
+
+      nlmsg_set_proto (msg, NETLINK_ROUTE);
+      nlmsg_set_src (msg, &nla);
+
+      /* Message that terminates a multipart message. Finish parsing and signal
+       * the caller that all dump replies have been received
+       */
+      if (hdr->nlmsg_type == NLMSG_DONE)
+       {
+         done = 1;
+         goto out;
+       }
+      /* Message to be ignored. Continue parsing */
+      else if (hdr->nlmsg_type == NLMSG_NOOP)
+       ;
+      /* Message that indicates data was lost. Finish parsing and return an
+       * error
+       */
+      else if (hdr->nlmsg_type == NLMSG_OVERRUN)
+       {
+         err = -NLE_MSG_OVERFLOW;
+         goto out;
+       }
+      /* Message that indicates an error. Finish parsing, extract the error
+       * code, and return it */
+      else if (hdr->nlmsg_type == NLMSG_ERROR)
+       {
+         struct nlmsgerr *e = nlmsg_data (hdr);
+
+         if (hdr->nlmsg_len < nlmsg_size (sizeof (*e)))
+           {
+             err = -NLE_MSG_TRUNC;
+             goto out;
+           }
+         else if (e->error)
+           {
+             err = -nl_syserr2nlerr (e->error);
+             goto out;
+           }
+         /* Message is an acknowledgement (err_code = 0). Continue parsing */
+         else
+           ;
+       }
+      /* Message that contains the requested data. Pass it for processing and
+       * continue parsing
+       */
+      else
+       {
+         lcp_nl_route_dump_cb (msg, NULL);
+       }
+
+      hdr = nlmsg_next (hdr, &n_bytes);
+    }
+
+  nlmsg_free (msg);
+  free (buf);
+  msg = NULL;
+  buf = NULL;
+
+  if (!done && n_msgs < msg_limit)
+    goto continue_reading;
+
+out:
+  nlmsg_free (msg);
+  free (buf);
+
+  if (err)
+    return err;
+
+  *is_done_rcvd = done;
 
   return n_msgs;
 }
@@ -261,38 +532,150 @@ nl_route_process (vlib_main_t *vm, vlib_node_runtime_t *node,
   uword event_type;
   uword *event_data = 0;
   f64 wait_time = DAY_F64;
+  int n_msgs;
+  int is_done;
 
   while (1)
     {
-      /* If we process a batch of messages and stop because we reached the
-       * batch size limit, we want to wake up after the batch delay and
-       * process more. Otherwise we just want to wait for a read event.
-       */
-      vlib_process_wait_for_event_or_clock (vm, wait_time);
-      event_type = vlib_process_get_events (vm, &event_data);
-
-      switch (event_type)
+      if (nm->nl_status == NL_STATUS_NOTIF_PROC)
        {
-       /* process batch of queued messages on timeout or read event signal */
-       case ~0:
-       case NL_EVENT_READ:
-         nl_route_process_msgs ();
-         wait_time = (vec_len (nm->nl_msg_queue) != 0) ?
-                       nm->batch_delay_ms * 1e-3 :
-                       DAY_F64;
-         break;
-
-       /* reopen the socket if there was an error polling/reading it */
-       case NL_EVENT_ERR:
+         /* If we process a batch of messages and stop because we reached the
+          * batch size limit, we want to wake up after the batch delay and
+          * process more. Otherwise we just want to wait for a read event.
+          */
+         vlib_process_wait_for_event_or_clock (vm, wait_time);
+         event_type = vlib_process_get_events (vm, &event_data);
+         vec_reset_length (event_data);
+
+         switch (event_type)
+           {
+           /* Process batch of queued messages on timeout or read event
+            * signal
+            */
+           case ~0:
+           case NL_EVENT_READ:
+             nl_route_process_msgs ();
+             wait_time = (vec_len (nm->nl_msg_queue) != 0) ?
+                           nm->batch_delay_ms * 1e-3 :
+                           DAY_F64;
+             break;
+
+           /* Initiate synchronization if there was an error polling or
+            * reading the notification socket
+            */
+           case NL_EVENT_ERR:
+             nm->nl_status = NL_STATUS_SYNC;
+             break;
+
+           default:
+             NL_ERROR ("Unknown event type: %u", (u32) event_type);
+           }
+       }
+      else if (nm->nl_status == NL_STATUS_SYNC)
+       {
+         /* Stop processing notifications - close the notification socket and
+          * discard all messages that are currently in the queue
+          */
          lcp_nl_close_socket ();
+         lcp_nl_route_discard_msgs ();
+
+         /* Wait some time before next synchronization attempt. Allows to
+          * reduce the number of failed attempts that stall the main thread by
+          * waiting out the notification storm
+          */
+         NL_INFO ("Wait before next synchronization attempt for %ums",
+                  nm->sync_attempt_delay_ms);
+         vlib_process_suspend (vm, nm->sync_attempt_delay_ms * 1e-3);
+
+         /* Open netlink synchronization socket, one for every data type of
+          * interest: link, address, neighbor, and route. That is needed to
+          * be able to send dump requests for every data type simultaneously.
+          * If send a dump request while the previous one is in progress,
+          * the request will fail and EBUSY returned
+          */
+#define _(stype, mtype, tname, fn) lcp_nl_open_sync_socket (stype);
+         foreach_sock_type
+#undef _
+
+         /* Start reading notifications and enqueueing them for further
+          * processing. The notifications will serve as a difference between
+          * the snapshot made after the dump request and the actual state at
+          * the moment. Once all the dump replies are processed, the
+          * notifications will be processed
+          */
          lcp_nl_open_socket ();
-         break;
 
-       default:
-         NL_ERROR ("Unknown event type: %u", (u32) event_type);
+         /* Request the current entry set from the kernel for every data type
+          * of interest. Thus requesting a snapshot of the current routing
+          * state that the kernel will make and then reply with
+          */
+#define _(stype, mtype, tname, fn) lcp_nl_route_send_dump_req (stype, mtype);
+         foreach_sock_type
+#undef _
+
+         /* Process all the dump replies */
+#define _(stype, mtype, tname, fn)                                            \
+  nl_##fn##_sync_begin ();                                                    \
+  is_done = 0;                                                                \
+  do                                                                          \
+    {                                                                         \
+      n_msgs =                                                                \
+       lcp_nl_recv_dump_replies (stype, nm->sync_batch_limit, &is_done);     \
+      if (n_msgs < 0)                                                         \
+       {                                                                     \
+         NL_ERROR ("Error receiving dump replies of type " tname             \
+                   ": %s (%d)",                                              \
+                   nl_geterror (n_msgs), n_msgs);                            \
+         break;                                                              \
+       }                                                                     \
+      else if (n_msgs == 0)                                                   \
+       {                                                                     \
+         NL_ERROR ("EOF while receiving dump replies of type " tname);       \
+         break;                                                              \
+       }                                                                     \
+      else                                                                    \
+       NL_INFO ("Processed %u dump replies of type " tname, n_msgs);         \
+                                                                              \
+      /* Suspend the processing loop and wait until event signal is           \
+       * received or timeout expires. During synchronization, only            \
+       * error event is expected because read event is suppressed.            \
+       * Allows not to stall the main thread and detect errors on the         \
+       * notification socket that will make synchronization                   \
+       * incomplete                                                           \
+       */                                                                     \
+      vlib_process_wait_for_event_or_clock (vm,                               \
+                                           nm->sync_batch_delay_ms * 1e-3);  \
+      event_type = vlib_process_get_events (vm, &event_data);                 \
+      vec_reset_length (event_data);                                          \
+                                                                              \
+      /* If error event received, stop synchronization and repeat an          \
+       * attempt later                                                        \
+       */                                                                     \
+      if (event_type == NL_EVENT_ERR)                                         \
+       goto sync_later;                                                      \
+    }                                                                         \
+  while (!is_done);                                                           \
+  nl_##fn##_sync_end ();
+
+           foreach_sock_type
+#undef _
+
+             /* Start processing notifications */
+             nm->nl_status = NL_STATUS_NOTIF_PROC;
+
+         /* Trigger messages processing if there are notifications received
+          * during synchronization
+          */
+         wait_time = (vec_len (nm->nl_msg_queue) != 0) ? 1e-3 : DAY_F64;
+
+       sync_later:
+         /* Close netlink synchronization sockets */
+#define _(stype, mtype, tname, fn) lcp_nl_close_sync_socket (stype);
+         foreach_sock_type
+#undef _
        }
-
-      vec_reset_length (event_data);
+      else
+       NL_ERROR ("Unknown status: %d", nm->nl_status);
     }
   return frame->n_vectors;
 }
@@ -318,10 +701,6 @@ nl_route_cb (struct nl_msg *msg, void *arg)
   msg_info->msg = msg;
   nlmsg_get (msg);
 
-  /* notify process node */
-  vlib_process_signal_event (vlib_get_main (), nl_route_process_node.index,
-                            NL_EVENT_READ, 0);
-
   return 0;
 }
 
@@ -331,21 +710,23 @@ lcp_nl_drain_messages (void)
   int err;
   nl_main_t *nm = &nl_main;
 
-  /* Read until there's an error. Unless the error is ENOBUFS, which means
-   * the kernel couldn't send a message due to socket buffer overflow.
-   * Continue reading when that happens.
-   *
-   * libnl translates both ENOBUFS and ENOMEM to NLE_NOMEM. So we need to
-   * check return status and errno to make sure we should keep going.
-   */
-  while ((err = nl_recvmsgs_default (nm->sk_route)) > -1 ||
-        (err == -NLE_NOMEM && errno == ENOBUFS))
+  /* Read until there's an error */
+  while ((err = nl_recvmsgs_default (nm->sk_route)) > -1)
     ;
 
   /* If there was an error other then EAGAIN, signal process node */
   if (err != -NLE_AGAIN)
     vlib_process_signal_event (vlib_get_main (), nl_route_process_node.index,
                               NL_EVENT_ERR, 0);
+  else
+    {
+      /* If netlink notification processing is active, signal process node
+       * there were notifications read
+       */
+      if (nm->nl_status == NL_STATUS_NOTIF_PROC)
+       vlib_process_signal_event (
+         vlib_get_main (), nl_route_process_node.index, NL_EVENT_READ, 0);
+    }
 
   return err;
 }
@@ -543,6 +924,64 @@ lcp_nl_open_socket (void)
   NL_INFO ("Opened netlink socket %d", nl_socket_get_fd (nm->sk_route));
 }
 
+static void
+lcp_nl_open_sync_socket (nl_sock_type_t sock_type)
+{
+  nl_main_t *nm = &nl_main;
+  int dest_ns_fd, curr_ns_fd;
+  struct nl_sock *sk_route;
+
+  /* Allocate a new blocking socket for routes that will be used for dump
+   * requests. Buffer sizes are left default because replies to dump requests
+   * are flow-controlled and the kernel will not overflow the socket by sending
+   * these
+   */
+
+  nm->sk_route_sync[sock_type] = sk_route = nl_socket_alloc ();
+
+  dest_ns_fd = lcp_get_default_ns_fd ();
+  if (dest_ns_fd > 0)
+    {
+      curr_ns_fd = clib_netns_open (NULL /* self */);
+      if (clib_setns (dest_ns_fd) == -1)
+       NL_ERROR ("Cannot set destination ns");
+    }
+
+  nl_connect (sk_route, NETLINK_ROUTE);
+
+  if (dest_ns_fd > 0)
+    {
+      if (curr_ns_fd == -1)
+       {
+         NL_ERROR ("No previous ns to set");
+       }
+      else
+       {
+         if (clib_setns (curr_ns_fd) == -1)
+           NL_ERROR ("Cannot set previous ns");
+         close (curr_ns_fd);
+       }
+    }
+
+  NL_INFO ("Opened netlink synchronization socket %d of type %d",
+          nl_socket_get_fd (sk_route), sock_type);
+}
+
+static void
+lcp_nl_close_sync_socket (nl_sock_type_t sock_type)
+{
+  nl_main_t *nm = &nl_main;
+  struct nl_sock *sk_route = nm->sk_route_sync[sock_type];
+
+  if (sk_route)
+    {
+      NL_INFO ("Closing netlink synchronization socket %d of type %d",
+              nl_socket_get_fd (sk_route), sock_type);
+      nl_socket_free (sk_route);
+      nm->sk_route_sync[sock_type] = NULL;
+    }
+}
+
 #include <vnet/plugin/plugin.h>
 clib_error_t *
 lcp_nl_init (vlib_main_t *vm)
@@ -552,6 +991,7 @@ lcp_nl_init (vlib_main_t *vm)
     .pair_add_fn = lcp_nl_pair_add_cb,
   };
 
+  nm->nl_status = NL_STATUS_NOTIF_PROC;
   nm->clib_file_index = ~0;
   nm->nl_logger = vlib_log_register_class ("nl", "nl");