Address bfd rpc scale issues 79/13679/11
authorDave Barach <dave@barachs.net>
Wed, 25 Jul 2018 12:30:27 +0000 (08:30 -0400)
committerJohn Lo <loj@cisco.com>
Wed, 29 Aug 2018 22:41:35 +0000 (22:41 +0000)
Remove the expensive RPC call for every received packet and replace it with
lock-protected direct calls. Reinstate RPC for the less frequent
notification traffic.

Adjust the wakeup event sending logic to minimize the number of events
sent, by measuring the time it takes from sending the event to processing
it, and subsequently not sending the event if the pending wake-up time
is within 2x or the event propagation delay.

Eventually: remove oingo / oingoes.

Change-Id: I0b3d33c5d029527b54867a97ab07f35f346aaa3d
Signed-off-by: Dave Barach <dave@barachs.net>
Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
Signed-off-by: Steve Shin <jonshin@cisco.com>
Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
src/vnet/bfd/bfd_main.c
src/vnet/bfd/bfd_main.h
src/vnet/bfd/bfd_udp.c

index 28ece31..6e6be9e 100644 (file)
@@ -25,6 +25,7 @@
 #include <x86intrin.h>
 #endif
 
+#include <vlibmemory/api.h>
 #include <vppinfra/random.h>
 #include <vppinfra/error.h>
 #include <vppinfra/hash.h>
 #include <vnet/bfd/bfd_main.h>
 #include <vlib/log.h>
 
+u32 oingoes;
+
+void
+oingo (void)
+{
+  oingoes++;
+}
+
 static u64
 bfd_calc_echo_checksum (u32 discriminator, u64 expire_time, u32 secret)
 {
@@ -315,6 +324,7 @@ bfd_set_timer (bfd_main_t * bm, bfd_session_t * bs, u64 now,
   if (next && (now + bm->wheel_inaccuracy > bs->wheel_time_clocks ||
               next < bs->wheel_time_clocks || !bs->wheel_time_clocks))
     {
+      int send_signal = 0;
       bs->wheel_time_clocks = next;
       BFD_DBG ("timing_wheel_insert(%p, %lu (%ld clocks/%.2fs in the "
               "future), %u);",
@@ -322,12 +332,41 @@ bfd_set_timer (bfd_main_t * bm, bfd_session_t * bs, u64 now,
               (i64) bs->wheel_time_clocks - clib_cpu_time_now (),
               (i64) (bs->wheel_time_clocks - clib_cpu_time_now ()) /
               bm->cpu_cps, bs->bs_idx);
+      bfd_lock (bm);
       timing_wheel_insert (&bm->wheel, bs->wheel_time_clocks, bs->bs_idx);
+
       if (!handling_wakeup)
        {
-         vlib_process_signal_event (bm->vlib_main,
-                                    bm->bfd_process_node_index,
-                                    BFD_EVENT_RESCHEDULE, bs->bs_idx);
+
+         /* Send only if it is earlier than current awaited wakeup time */
+         send_signal =
+           (bs->wheel_time_clocks < bm->bfd_process_next_wakeup_clocks) &&
+           /*
+            * If the wake-up time is within 2x the delay of the event propagation delay,
+            * avoid the expense of sending the event. The 2x multiplier is to workaround the race whereby
+            * simultaneous event + expired timer create one recurring bogus wakeup/suspend instance,
+            * due to double scheduling of the node on the pending list.
+            */
+           (bm->bfd_process_next_wakeup_clocks - bs->wheel_time_clocks >
+            2 * bm->bfd_process_wakeup_event_delay_clocks) &&
+           /* Must be no events in flight to send an event */
+           (!bm->bfd_process_wakeup_events_in_flight);
+
+         /* If we do send the signal, note this down along with the start timestamp */
+         if (send_signal)
+           {
+             bm->bfd_process_wakeup_events_in_flight++;
+             bm->bfd_process_wakeup_event_start_clocks = now;
+           }
+       }
+      bfd_unlock (bm);
+
+      /* Use the multithreaded event sending so the workers can send events too */
+      if (send_signal)
+       {
+         vlib_process_signal_event_mt (bm->vlib_main,
+                                       bm->bfd_process_node_index,
+                                       BFD_EVENT_RESCHEDULE, ~0);
        }
     }
 }
@@ -508,12 +547,98 @@ bfd_input_format_trace (u8 * s, va_list * args)
   return s;
 }
 
+typedef struct
+{
+  u32 bs_idx;
+} bfd_rpc_event_t;
+
+static void
+bfd_rpc_event_cb (const bfd_rpc_event_t * a)
+{
+  bfd_main_t *bm = &bfd_main;
+  u32 bs_idx = a->bs_idx;
+  u32 valid_bs = 0;
+  bfd_session_t session_data;
+
+  bfd_lock (bm);
+  if (!pool_is_free_index (bm->sessions, bs_idx))
+    {
+      bfd_session_t *bs = pool_elt_at_index (bm->sessions, bs_idx);
+      clib_memcpy (&session_data, bs, sizeof (bfd_session_t));
+      valid_bs = 1;
+    }
+  else
+    {
+      BFD_DBG ("Ignoring event RPC for non-existent session index %u",
+              bs_idx);
+    }
+  bfd_unlock (bm);
+
+  if (valid_bs)
+    bfd_event (bm, &session_data);
+}
+
+static void
+bfd_event_rpc (u32 bs_idx)
+{
+  const u32 data_size = sizeof (bfd_rpc_event_t);
+  u8 data[data_size];
+  bfd_rpc_event_t *event = (bfd_rpc_event_t *) data;
+
+  event->bs_idx = bs_idx;
+  vl_api_rpc_call_main_thread (bfd_rpc_event_cb, data, data_size);
+}
+
+typedef struct
+{
+  u32 bs_idx;
+} bfd_rpc_notify_listeners_t;
+
+static void
+bfd_rpc_notify_listeners_cb (const bfd_rpc_notify_listeners_t * a)
+{
+  bfd_main_t *bm = &bfd_main;
+  u32 bs_idx = a->bs_idx;
+  bfd_lock (bm);
+  if (!pool_is_free_index (bm->sessions, bs_idx))
+    {
+      bfd_session_t *bs = pool_elt_at_index (bm->sessions, bs_idx);
+      bfd_notify_listeners (bm, BFD_LISTEN_EVENT_UPDATE, bs);
+    }
+  else
+    {
+      BFD_DBG ("Ignoring notify RPC for non-existent session index %u",
+              bs_idx);
+    }
+  bfd_unlock (bm);
+}
+
+static void
+bfd_notify_listeners_rpc (u32 bs_idx)
+{
+  const u32 data_size = sizeof (bfd_rpc_notify_listeners_t);
+  u8 data[data_size];
+  bfd_rpc_notify_listeners_t *notify = (bfd_rpc_notify_listeners_t *) data;
+  notify->bs_idx = bs_idx;
+  vl_api_rpc_call_main_thread (bfd_rpc_notify_listeners_cb, data, data_size);
+}
+
 static void
 bfd_on_state_change (bfd_main_t * bm, bfd_session_t * bs, u64 now,
                     int handling_wakeup)
 {
   BFD_DBG ("\nState changed: %U", format_bfd_session, bs);
-  bfd_event (bm, bs);
+
+  if (vlib_get_thread_index () == 0)
+    {
+      bfd_event (bm, bs);
+    }
+  else
+    {
+      /* without RPC - a REGRESSION: BFD event are not propagated */
+      bfd_event_rpc (bs->bs_idx);
+    }
+
   switch (bs->local_state)
     {
     case BFD_STATE_admin_down:
@@ -553,7 +678,15 @@ bfd_on_state_change (bfd_main_t * bm, bfd_session_t * bs, u64 now,
       bfd_set_timer (bm, bs, now, handling_wakeup);
       break;
     }
-  bfd_notify_listeners (bm, BFD_LISTEN_EVENT_UPDATE, bs);
+  if (vlib_get_thread_index () == 0)
+    {
+      bfd_notify_listeners (bm, BFD_LISTEN_EVENT_UPDATE, bs);
+    }
+  else
+    {
+      /* without RPC - a REGRESSION: state changes are not propagated */
+      bfd_notify_listeners_rpc (bs->bs_idx);
+    }
 }
 
 static void
@@ -1011,9 +1144,13 @@ bfd_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f)
   while (1)
     {
       u64 now = clib_cpu_time_now ();
+      bfd_lock (bm);
       u64 next_expire = timing_wheel_next_expiring_elt_time (&bm->wheel);
       BFD_DBG ("timing_wheel_next_expiring_elt_time(%p) returns %lu",
               &bm->wheel, next_expire);
+      bm->bfd_process_next_wakeup_clocks =
+       (i64) next_expire >= 0 ? next_expire : ~0;
+      bfd_unlock (bm);
       if ((i64) next_expire < 0)
        {
          BFD_DBG ("wait for event without timeout");
@@ -1042,10 +1179,16 @@ bfd_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f)
          /* nothing to do here */
          break;
        case BFD_EVENT_RESCHEDULE:
+         bfd_lock (bm);
+         bm->bfd_process_wakeup_event_delay_clocks =
+           now - bm->bfd_process_wakeup_event_start_clocks;
+         bm->bfd_process_wakeup_events_in_flight--;
+         bfd_unlock (bm);
          /* nothing to do here - reschedule is done automatically after
           * each event or timeout */
          break;
        case BFD_EVENT_NEW_SESSION:
+         bfd_lock (bm);
          if (!pool_is_free_index (bm->sessions, *event_data))
            {
              bfd_session_t *bs =
@@ -1058,8 +1201,10 @@ bfd_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f)
              BFD_DBG ("Ignoring event for non-existent session index %u",
                       (u32) * event_data);
            }
+         bfd_unlock (bm);
          break;
        case BFD_EVENT_CONFIG_CHANGED:
+         bfd_lock (bm);
          if (!pool_is_free_index (bm->sessions, *event_data))
            {
              bfd_session_t *bs =
@@ -1071,6 +1216,7 @@ bfd_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f)
              BFD_DBG ("Ignoring event for non-existent session index %u",
                       (u32) * event_data);
            }
+         bfd_unlock (bm);
          break;
        default:
          vlib_log_err (bm->log_class, "BUG: event type 0x%wx", event_type);
@@ -1079,6 +1225,7 @@ bfd_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f)
       BFD_DBG ("advancing wheel, now is %lu", now);
       BFD_DBG ("timing_wheel_advance (%p, %lu, %p, 0);", &bm->wheel, now,
               expired);
+      bfd_lock (bm);
       expired = timing_wheel_advance (&bm->wheel, now, expired, 0);
       BFD_DBG ("Expired %d elements", vec_len (expired));
       u32 *p = NULL;
@@ -1092,6 +1239,7 @@ bfd_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f)
            bfd_set_timer (bm, bs, now, 1);
          }
       }
+      bfd_unlock (bm);
       if (expired)
        {
          _vec_len (expired) = 0;
@@ -1159,6 +1307,8 @@ bfd_register_listener (bfd_notify_fn_t fn)
 static clib_error_t *
 bfd_main_init (vlib_main_t * vm)
 {
+  vlib_thread_main_t *tm = &vlib_thread_main;
+  u32 n_vlib_mains = tm->n_vlib_mains;
 #if BFD_DEBUG
   setbuf (stdout, NULL);
 #endif
@@ -1178,6 +1328,9 @@ bfd_main_init (vlib_main_t * vm)
   bm->wheel_inaccuracy = 2 << bm->wheel.log2_clocks_per_bin;
   bm->log_class = vlib_log_register_class ("bfd", 0);
   vlib_log_debug (bm->log_class, "initialized");
+  bm->owner_thread_index = ~0;
+  if (n_vlib_mains > 1)
+    clib_spinlock_init (&bm->lock);
   return 0;
 }
 
@@ -1187,6 +1340,9 @@ bfd_session_t *
 bfd_get_session (bfd_main_t * bm, bfd_transport_e t)
 {
   bfd_session_t *result;
+
+  bfd_lock (bm);
+
   pool_get (bm->sessions, result);
   memset (result, 0, sizeof (*result));
   result->bs_idx = result - bm->sessions;
@@ -1202,6 +1358,7 @@ bfd_get_session (bfd_main_t * bm, bfd_transport_e t)
                         "couldn't allocate unused session discriminator even "
                         "after %u tries!", limit);
          pool_put (bm->sessions, result);
+         bfd_unlock (bm);
          return NULL;
        }
       ++counter;
@@ -1209,12 +1366,15 @@ bfd_get_session (bfd_main_t * bm, bfd_transport_e t)
   while (hash_get (bm->session_by_disc, result->local_discr));
   bfd_set_defaults (bm, result);
   hash_set (bm->session_by_disc, result->local_discr, result->bs_idx);
+  bfd_unlock (bm);
   return result;
 }
 
 void
 bfd_put_session (bfd_main_t * bm, bfd_session_t * bs)
 {
+  bfd_lock (bm);
+
   vlib_log_info (bm->log_class, "delete session: %U",
                 format_bfd_session_brief, bs);
   bfd_notify_listeners (bm, BFD_LISTEN_EVENT_DELETE, bs);
@@ -1228,11 +1388,13 @@ bfd_put_session (bfd_main_t * bm, bfd_session_t * bs)
     }
   hash_unset (bm->session_by_disc, bs->local_discr);
   pool_put (bm->sessions, bs);
+  bfd_unlock (bm);
 }
 
 bfd_session_t *
 bfd_find_session_by_idx (bfd_main_t * bm, uword bs_idx)
 {
+  bfd_lock_check (bm);
   if (!pool_is_free_index (bm->sessions, bs_idx))
     {
       return pool_elt_at_index (bm->sessions, bs_idx);
@@ -1243,6 +1405,7 @@ bfd_find_session_by_idx (bfd_main_t * bm, uword bs_idx)
 bfd_session_t *
 bfd_find_session_by_disc (bfd_main_t * bm, u32 disc)
 {
+  bfd_lock_check (bm);
   uword *p = hash_get (bfd_main.session_by_disc, disc);
   if (p)
     {
@@ -1620,6 +1783,8 @@ bfd_verify_pkt_auth (const bfd_pkt_t * pkt, u16 pkt_size, bfd_session_t * bs)
 void
 bfd_consume_pkt (bfd_main_t * bm, const bfd_pkt_t * pkt, u32 bs_idx)
 {
+  bfd_lock_check (bm);
+
   bfd_session_t *bs = bfd_find_session_by_idx (bm, bs_idx);
   if (!bs || (pkt->your_disc && pkt->your_disc != bs->local_discr))
     {
index 9e2a12e..7808834 100644 (file)
@@ -24,6 +24,7 @@
 #include <vnet/bfd/bfd_protocol.h>
 #include <vnet/bfd/bfd_udp.h>
 #include <vlib/log.h>
+#include <vppinfra/os.h>
 
 #define foreach_bfd_mode(F) \
   F (asynchronous)          \
@@ -260,6 +261,23 @@ typedef void (*bfd_notify_fn_t) (bfd_listen_event_e, const bfd_session_t *);
 
 typedef struct
 {
+  /** lock to protect data structures */
+  clib_spinlock_t lock;
+  int lock_recursion_count;
+  uword owner_thread_index;
+
+  /** Number of event wakeup RPCs in flight. Should be 0 or 1 */
+  int bfd_process_wakeup_events_in_flight;
+
+  /** The timestamp of last wakeup event being sent */
+  u64 bfd_process_wakeup_event_start_clocks;
+
+  /** The time it took the last wakeup event to make it to handling */
+  u64 bfd_process_wakeup_event_delay_clocks;
+
+  /** When the bfd process is supposed to wake up next */
+  u64 bfd_process_next_wakeup_clocks;
+
   /** pool of bfd sessions context data */
   bfd_session_t *sessions;
 
@@ -346,6 +364,51 @@ typedef CLIB_PACKED (struct {
 }) bfd_echo_pkt_t;
 /* *INDENT-ON* */
 
+static inline void
+bfd_lock (bfd_main_t * bm)
+{
+  uword my_thread_index = __os_thread_index;
+
+  if (bm->owner_thread_index == my_thread_index
+      && bm->lock_recursion_count > 0)
+    {
+      bm->lock_recursion_count++;
+      return;
+    }
+
+  clib_spinlock_lock_if_init (&bm->lock);
+  bm->lock_recursion_count = 1;
+  bm->owner_thread_index = my_thread_index;
+}
+
+static inline void
+bfd_unlock (bfd_main_t * bm)
+{
+  uword my_thread_index = __os_thread_index;
+  ASSERT (bm->owner_thread_index == my_thread_index);
+
+  if (bm->lock_recursion_count > 1)
+    {
+      bm->lock_recursion_count--;
+      return;
+    }
+  bm->lock_recursion_count = 0;
+  bm->owner_thread_index = ~0;
+  clib_spinlock_unlock_if_init (&bm->lock);
+}
+
+void oingo (void);
+
+static inline void
+bfd_lock_check (bfd_main_t * bm)
+{
+  if (PREDICT_FALSE (bm->lock_recursion_count < 1))
+    {
+      clib_warning ("lock check failure");
+      oingo ();
+    }
+}
+
 u8 *bfd_input_format_trace (u8 * s, va_list * args);
 bfd_session_t *bfd_get_session (bfd_main_t * bm, bfd_transport_e t);
 void bfd_put_session (bfd_main_t * bm, bfd_session_t * bs);
index e05f10f..ab530ed 100644 (file)
@@ -670,6 +670,9 @@ bfd_udp_add_session (u32 sw_if_index, const ip46_address_t * local_addr,
                     u8 detect_mult, u8 is_authenticated, u32 conf_key_id,
                     u8 bfd_key_id)
 {
+  bfd_main_t *bm = &bfd_main;
+  bfd_lock (bm);
+
   vnet_api_error_t rv =
     bfd_api_verify_common (sw_if_index, desired_min_tx_usec,
                           required_min_rx_usec, detect_mult,
@@ -703,6 +706,7 @@ bfd_udp_add_session (u32 sw_if_index, const ip46_address_t * local_addr,
       bfd_session_start (bfd_udp_main.bfd_main, bs);
     }
 
+  bfd_unlock (bm);
   return rv;
 }
 
@@ -714,17 +718,23 @@ bfd_udp_mod_session (u32 sw_if_index,
                     u32 required_min_rx_usec, u8 detect_mult)
 {
   bfd_session_t *bs = NULL;
+  bfd_main_t *bm = &bfd_main;
+  vnet_api_error_t error;
+  bfd_lock (bm);
   vnet_api_error_t rv =
     bfd_udp_find_session_by_api_input (sw_if_index, local_addr, peer_addr,
                                       &bs);
   if (rv)
     {
+      bfd_unlock (bm);
       return rv;
     }
 
-  return bfd_session_set_params (bfd_udp_main.bfd_main, bs,
-                                desired_min_tx_usec, required_min_rx_usec,
-                                detect_mult);
+  error = bfd_session_set_params (bfd_udp_main.bfd_main, bs,
+                                 desired_min_tx_usec, required_min_rx_usec,
+                                 detect_mult);
+  bfd_unlock (bm);
+  return error;
 }
 
 vnet_api_error_t
@@ -733,14 +743,18 @@ bfd_udp_del_session (u32 sw_if_index,
                     const ip46_address_t * peer_addr)
 {
   bfd_session_t *bs = NULL;
+  bfd_main_t *bm = &bfd_main;
+  bfd_lock (bm);
   vnet_api_error_t rv =
     bfd_udp_find_session_by_api_input (sw_if_index, local_addr, peer_addr,
                                       &bs);
   if (rv)
     {
+      bfd_unlock (bm);
       return rv;
     }
   bfd_udp_del_session_internal (bs);
+  bfd_unlock (bm);
   return 0;
 }
 
@@ -750,14 +764,18 @@ bfd_udp_session_set_flags (u32 sw_if_index,
                           const ip46_address_t * peer_addr, u8 admin_up_down)
 {
   bfd_session_t *bs = NULL;
+  bfd_main_t *bm = &bfd_main;
+  bfd_lock (bm);
   vnet_api_error_t rv =
     bfd_udp_find_session_by_api_input (sw_if_index, local_addr, peer_addr,
                                       &bs);
   if (rv)
     {
+      bfd_unlock (bm);
       return rv;
     }
   bfd_session_set_flags (bs, admin_up_down);
+  bfd_unlock (bm);
   return 0;
 }
 
@@ -767,6 +785,10 @@ bfd_udp_auth_activate (u32 sw_if_index,
                       const ip46_address_t * peer_addr,
                       u32 conf_key_id, u8 key_id, u8 is_delayed)
 {
+  bfd_main_t *bm = &bfd_main;
+  bfd_lock (bm);
+  vnet_api_error_t error;
+
 #if WITH_LIBSSL > 0
   bfd_session_t *bs = NULL;
   vnet_api_error_t rv =
@@ -774,12 +796,16 @@ bfd_udp_auth_activate (u32 sw_if_index,
                                       &bs);
   if (rv)
     {
+      bfd_unlock (bm);
       return rv;
     }
-  return bfd_auth_activate (bs, conf_key_id, key_id, is_delayed);
+  error = bfd_auth_activate (bs, conf_key_id, key_id, is_delayed);
+  bfd_unlock (bm);
+  return error;
 #else
   vlib_log_err (bfd_udp_main->log_class,
                "SSL missing, cannot activate BFD authentication");
+  bfd_unlock (bm);
   return VNET_API_ERROR_BFD_NOTSUPP;
 #endif
 }
@@ -789,15 +815,21 @@ bfd_udp_auth_deactivate (u32 sw_if_index,
                         const ip46_address_t * local_addr,
                         const ip46_address_t * peer_addr, u8 is_delayed)
 {
+  bfd_main_t *bm = &bfd_main;
+  vnet_api_error_t error;
+  bfd_lock (bm);
   bfd_session_t *bs = NULL;
   vnet_api_error_t rv =
     bfd_udp_find_session_by_api_input (sw_if_index, local_addr, peer_addr,
                                       &bs);
   if (rv)
     {
+      bfd_unlock (bm);
       return rv;
     }
-  return bfd_auth_deactivate (bs, is_delayed);
+  error = bfd_auth_deactivate (bs, is_delayed);
+  bfd_unlock (bm);
+  return error;
 }
 
 typedef enum
@@ -919,23 +951,13 @@ typedef struct
   bfd_pkt_t pkt;
 } bfd_rpc_update_t;
 
-static void
-bfd_rpc_update_session_cb (const bfd_rpc_update_t * a)
-{
-  bfd_consume_pkt (bfd_udp_main.bfd_main, &a->pkt, a->bs_idx);
-}
-
 static void
 bfd_rpc_update_session (u32 bs_idx, const bfd_pkt_t * pkt)
 {
-  /* packet length was already verified to be correct by the caller */
-  const u32 data_size = sizeof (bfd_rpc_update_t) -
-    STRUCT_SIZE_OF (bfd_rpc_update_t, pkt) + pkt->head.length;
-  u8 data[data_size];
-  bfd_rpc_update_t *update = (bfd_rpc_update_t *) data;
-  update->bs_idx = bs_idx;
-  clib_memcpy (&update->pkt, pkt, pkt->head.length);
-  vl_api_rpc_call_main_thread (bfd_rpc_update_session_cb, data, data_size);
+  bfd_main_t *bm = &bfd_main;
+  bfd_lock (bm);
+  bfd_consume_pkt (bm, pkt, bs_idx);
+  bfd_unlock (bm);
 }
 
 static bfd_udp_error_t
@@ -1165,6 +1187,7 @@ bfd_udp_input (vlib_main_t * vm, vlib_node_runtime_t * rt,
 {
   u32 n_left_from, *from;
   bfd_input_trace_t *t0;
+  bfd_main_t *bm = &bfd_main;
 
   from = vlib_frame_vector_args (f);   /* array of buffer indices */
   n_left_from = f->n_vectors;  /* number of buffer indices */
@@ -1192,6 +1215,7 @@ bfd_udp_input (vlib_main_t * vm, vlib_node_runtime_t * rt,
        }
 
       /* scan this bfd pkt. error0 is the counter index to bmp */
+      bfd_lock (bm);
       if (is_ipv6)
        {
          error0 = bfd_udp6_scan (vm, rt, b0, &bs);
@@ -1244,6 +1268,7 @@ bfd_udp_input (vlib_main_t * vm, vlib_node_runtime_t * rt,
                }
            }
        }
+      bfd_unlock (bm);
       vlib_set_next_frame_buffer (vm, rt, next0, bi0);
 
       from += 1;
@@ -1322,6 +1347,7 @@ bfd_udp_echo_input (vlib_main_t * vm, vlib_node_runtime_t * rt,
 {
   u32 n_left_from, *from;
   bfd_input_trace_t *t0;
+  bfd_main_t *bm = &bfd_main;
 
   from = vlib_frame_vector_args (f);   /* array of buffer indices */
   n_left_from = f->n_vectors;  /* number of buffer indices */
@@ -1346,6 +1372,7 @@ bfd_udp_echo_input (vlib_main_t * vm, vlib_node_runtime_t * rt,
          clib_memcpy (t0->data, vlib_buffer_get_current (b0), len);
        }
 
+      bfd_lock (bm);
       if (bfd_consume_echo_pkt (bfd_udp_main.bfd_main, b0))
        {
          b0->error = rt->errors[BFD_UDP_ERROR_NONE];
@@ -1368,6 +1395,7 @@ bfd_udp_echo_input (vlib_main_t * vm, vlib_node_runtime_t * rt,
          next0 = BFD_UDP_INPUT_NEXT_REPLY_REWRITE;
        }
 
+      bfd_unlock (bm);
       vlib_set_next_frame_buffer (vm, rt, next0, bi0);
 
       from += 1;