session: use llist in session node evt handling 73/20673/14
authorFlorin Coras <fcoras@cisco.com>
Mon, 15 Jul 2019 20:15:18 +0000 (13:15 -0700)
committerDave Barach <openvpp@barachs.net>
Wed, 17 Jul 2019 12:20:23 +0000 (12:20 +0000)
Type: refactor

Change-Id: I24159e0a848f552b4e27acfb5fe6f2cd91b50a19
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c
src/vppinfra/llist.h

index e7542ae..9769c01 100644 (file)
@@ -122,18 +122,19 @@ static void
 session_program_transport_close (session_t * s)
 {
   u32 thread_index = vlib_get_thread_index ();
+  session_evt_elt_t *elt;
   session_worker_t *wrk;
-  session_event_t *evt;
 
   /* If we are in the handler thread, or being called with the worker barrier
    * held, just append a new event to pending disconnects vector. */
   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
     {
       wrk = session_main_get_worker (s->thread_index);
-      vec_add2 (wrk->pending_disconnects, evt, 1);
-      clib_memset (evt, 0, sizeof (*evt));
-      evt->session_handle = session_handle (s);
-      evt->event_type = SESSION_CTRL_EVT_CLOSE;
+      elt = session_evt_elt_alloc (wrk);
+      clib_memset (&elt->evt, 0, sizeof (session_event_t));
+      elt->evt.session_handle = session_handle (s);
+      elt->evt.event_type = SESSION_CTRL_EVT_CLOSE;
+      session_evt_add_pending_disconnects (wrk, elt);
     }
   else
     session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE);
@@ -1380,15 +1381,11 @@ session_manager_main_enable (vlib_main_t * vm)
   for (i = 0; i < num_threads; i++)
     {
       wrk = &smm->wrk[i];
-      vec_validate (wrk->free_event_vector, 128);
-      _vec_len (wrk->free_event_vector) = 0;
-      vec_validate (wrk->pending_event_vector, 128);
-      _vec_len (wrk->pending_event_vector) = 0;
-      vec_validate (wrk->pending_disconnects, 128);
-      _vec_len (wrk->pending_disconnects) = 0;
-      vec_validate (wrk->postponed_event_vector, 128);
-      _vec_len (wrk->postponed_event_vector) = 0;
-
+      wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
+      wrk->pending_head = clib_llist_make_head (wrk->event_elts, evt_list);
+      wrk->postponed_head = clib_llist_make_head (wrk->event_elts, evt_list);
+      wrk->disconnects_head = clib_llist_make_head (wrk->event_elts,
+                                                   evt_list);
       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
       wrk->dispatch_period = 500e-6;
 
index d126818..73c6dc8 100644 (file)
@@ -15,6 +15,7 @@
 #ifndef __included_session_h__
 #define __included_session_h__
 
+#include <vppinfra/llist.h>
 #include <vnet/session/session_types.h>
 #include <vnet/session/session_lookup.h>
 #include <vnet/session/session_debug.h>
@@ -61,6 +62,12 @@ typedef struct session_tx_context_
   session_dgram_hdr_t hdr;
 } session_tx_context_t;
 
+typedef struct session_evt_elt
+{
+  clib_llist_anchor_t evt_list;
+  session_event_t evt;
+} session_evt_elt_t;
+
 typedef struct session_worker_
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -86,17 +93,20 @@ typedef struct session_worker_
   /** Vector of tx buffer free lists */
   u32 *tx_buffers;
 
-  /** Vector of partially read events */
-  session_event_t *free_event_vector;
+  /** Pool of session event list elements */
+  session_evt_elt_t *event_elts;
 
-  /** Vector of active event vectors */
-  session_event_t *pending_event_vector;
+  /** Head of list of elements */
+  clib_llist_index_t new_head;
 
-  /** Vector of postponed disconnects */
-  session_event_t *pending_disconnects;
+  /** Head of list of pending events */
+  clib_llist_index_t pending_head;
 
-  /** Vector of postponed events */
-  session_event_t *postponed_event_vector;
+  /** Head of list of postponed events */
+  clib_llist_index_t postponed_head;
+
+  /** Head of list of disconnect events */
+  clib_llist_index_t disconnects_head;
 
   /** Peekers rw lock */
   clib_rwlock_t peekers_rw_locks;
@@ -108,7 +118,7 @@ typedef struct session_worker_
 typedef int (session_fifo_rx_fn) (vlib_main_t * vm,
                                  vlib_node_runtime_t * node,
                                  session_worker_t * wrk,
-                                 session_event_t * e, int *n_tx_pkts);
+                                 session_evt_elt_t * e, int *n_tx_pkts);
 
 extern session_fifo_rx_fn session_tx_fifo_peek_and_snd;
 extern session_fifo_rx_fn session_tx_fifo_dequeue_and_snd;
@@ -186,6 +196,60 @@ extern vlib_node_registration_t session_queue_pre_input_node;
 #define SESSION_Q_PROCESS_FLUSH_FRAMES 1
 #define SESSION_Q_PROCESS_STOP         2
 
+static inline session_evt_elt_t *
+session_evt_elt_alloc (session_worker_t * wrk)
+{
+  session_evt_elt_t *elt;
+  pool_get (wrk->event_elts, elt);
+  return elt;
+}
+
+static inline void
+session_evt_elt_free (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+  pool_put (wrk->event_elts, elt);
+}
+
+static inline session_evt_elt_t *
+session_evt_pending_head (session_worker_t * wrk)
+{
+  return pool_elt_at_index (wrk->event_elts, wrk->pending_head);
+}
+
+static inline session_evt_elt_t *
+session_evt_postponed_head (session_worker_t * wrk)
+{
+  return pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
+}
+
+static inline session_evt_elt_t *
+session_evt_pending_disconnects_head (session_worker_t * wrk)
+{
+  return pool_elt_at_index (wrk->event_elts, wrk->disconnects_head);
+}
+
+static inline void
+session_evt_add_pending (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+  clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+                      session_evt_pending_head (wrk));
+}
+
+static inline void
+session_evt_add_postponed (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+  clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+                      session_evt_postponed_head (wrk));
+}
+
+static inline void
+session_evt_add_pending_disconnects (session_worker_t * wrk,
+                                    session_evt_elt_t * elt)
+{
+  clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+                      session_evt_pending_disconnects_head (wrk));
+}
+
 always_inline u8
 session_is_valid (u32 si, u8 thread_index)
 {
index 6c43207..ca6663c 100644 (file)
@@ -624,13 +624,14 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
 always_inline int
 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                session_worker_t * wrk,
-                               session_event_t * e, int *n_tx_packets,
+                               session_evt_elt_t * elt, int *n_tx_packets,
                                u8 peek_data)
 {
   u32 next_index, next0, next1, *to_next, n_left_to_next, n_left, pbi;
   u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0;
   session_main_t *smm = &session_main;
   session_tx_context_t *ctx = &wrk->ctx;
+  session_event_t *e = &elt->evt;
   transport_proto_t tp;
   vlib_buffer_t *pb;
   u16 n_bufs, rv;
@@ -638,7 +639,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
     {
       if (rv < 2)
-       vec_add1 (wrk->pending_event_vector, *e);
+       session_evt_add_pending (wrk, elt);
       return SESSION_TX_NO_DATA;
     }
 
@@ -662,7 +663,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                                   ctx->snd_mss);
   if (ctx->snd_space == 0 || ctx->snd_mss == 0)
     {
-      vec_add1 (wrk->pending_event_vector, *e);
+      session_evt_add_pending (wrk, elt);
       return SESSION_TX_NO_DATA;
     }
 
@@ -684,7 +685,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
     {
       if (n_bufs)
        vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
-      vec_add1 (wrk->pending_event_vector, *e);
+      session_evt_add_pending (wrk, elt);
       return SESSION_TX_NO_BUFFERS;
     }
 
@@ -780,7 +781,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   ASSERT (ctx->left_to_snd == 0);
   if (ctx->max_len_to_snd < ctx->max_dequeue)
     if (svm_fifo_set_event (ctx->s->tx_fifo))
-      vec_add1 (wrk->pending_event_vector, *e);
+      session_evt_add_pending (wrk, elt);
 
   if (!peek_data
       && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
@@ -792,7 +793,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
       /* More data needs to be read */
       else if (svm_fifo_max_dequeue_cons (ctx->s->tx_fifo) > 0)
        if (svm_fifo_set_event (ctx->s->tx_fifo))
-         vec_add1 (wrk->pending_event_vector, *e);
+         session_evt_add_pending (wrk, elt);
 
       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd))
        session_dequeue_notify (ctx->s);
@@ -803,7 +804,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
 int
 session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
                              session_worker_t * wrk,
-                             session_event_t * e, int *n_tx_pkts)
+                             session_evt_elt_t * e, int *n_tx_pkts)
 {
   return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 1);
 }
@@ -811,7 +812,7 @@ session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
 int
 session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
                                 session_worker_t * wrk,
-                                session_event_t * e, int *n_tx_pkts)
+                                session_evt_elt_t * e, int *n_tx_pkts)
 {
   return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 0);
 }
@@ -820,7 +821,7 @@ int
 session_tx_fifo_dequeue_internal (vlib_main_t * vm,
                                  vlib_node_runtime_t * node,
                                  session_worker_t * wrk,
-                                 session_event_t * e, int *n_tx_pkts)
+                                 session_evt_elt_t * e, int *n_tx_pkts)
 {
   session_t *s = wrk->ctx.s;
 
@@ -853,9 +854,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
 {
   session_main_t *smm = vnet_get_session_main ();
-  u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
+  u32 thread_index = vm->thread_index, n_to_dequeue;
   session_worker_t *wrk = &smm->wrk[thread_index];
-  session_event_t *e, *fifo_events;
+  session_evt_elt_t *elt, *new_he, *new_te, *pending_he;
+  session_evt_elt_t *disconnects_he, *postponed_he;
   svm_msg_q_msg_t _msg, *msg = &_msg;
   f64 now = vlib_time_now (vm);
   int n_tx_packets = 0, i, rv;
@@ -874,9 +876,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
 
   /* Make sure postponed events are handled first */
-  fifo_events = wrk->free_event_vector;
-  vec_append (fifo_events, wrk->postponed_event_vector);
-  _vec_len (wrk->postponed_event_vector) = 0;
+  new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
+  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
+
+  postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
+  clib_llist_splice (wrk->event_elts, evt_list, new_te, postponed_he);
 
   /* Try to dequeue what is available. Don't wait for lock.
    * XXX: we may need priorities here */
@@ -886,33 +890,38 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
     {
       for (i = 0; i < n_to_dequeue; i++)
        {
-         vec_add2 (fifo_events, e, 1);
+         elt = session_evt_elt_alloc (wrk);
          svm_msg_q_sub_w_lock (mq, msg);
          /* Works because reply messages are smaller than a session evt.
           * If we ever need to support bigger messages this needs to be
           * fixed */
-         clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
+         clib_memcpy_fast (&elt->evt, svm_msg_q_msg_data (mq, msg),
+                           sizeof (elt->evt));
          svm_msg_q_free_msg (mq, msg);
+         new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
+         clib_llist_add_tail (wrk->event_elts, evt_list, elt, new_he);
        }
       svm_msg_q_unlock (mq);
     }
 
-  vec_append (fifo_events, wrk->pending_event_vector);
-  vec_append (fifo_events, wrk->pending_disconnects);
-
-  _vec_len (wrk->pending_event_vector) = 0;
-  _vec_len (wrk->pending_disconnects) = 0;
+  pending_he = pool_elt_at_index (wrk->event_elts, wrk->pending_head);
+  postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
+  disconnects_he = pool_elt_at_index (wrk->event_elts, wrk->disconnects_head);
 
-  n_events = vec_len (fifo_events);
-  if (PREDICT_FALSE (!n_events))
-    return 0;
+  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
+  clib_llist_splice (wrk->event_elts, evt_list, new_te, pending_he);
+  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
+  clib_llist_splice (wrk->event_elts, evt_list, new_te, disconnects_he);
 
-  for (i = 0; i < n_events; i++)
+  while (!clib_llist_is_empty (wrk->event_elts, evt_list, new_he))
     {
+      clib_llist_index_t ei;
       session_event_t *e;
       session_t *s;
 
-      e = &fifo_events[i];
+      clib_llist_pop_first (wrk->event_elts, evt_list, elt, new_he);
+      ei = clib_llist_entry_index (wrk->event_elts, elt);
+      e = &elt->evt;
       switch (e->event_type)
        {
        case SESSION_IO_EVT_TX_FLUSH:
@@ -920,27 +929,27 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
          /* Don't try to send more that one frame per dispatch cycle */
          if (n_tx_packets == VLIB_FRAME_SIZE)
            {
-             vec_add1 (wrk->postponed_event_vector, *e);
-             break;
+             session_evt_add_postponed (wrk, elt);
+             continue;
            }
 
          s = session_event_get_session (e, thread_index);
          if (PREDICT_FALSE (!s))
            {
              clib_warning ("session was freed!");
-             continue;
+             break;
            }
          CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
          wrk->ctx.s = s;
          /* Spray packets in per session type frames, since they go to
           * different nodes */
-         rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, e,
+         rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, elt,
                                                       &n_tx_packets);
          if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
            {
              vlib_node_increment_counter (vm, node->node_index,
                                           SESSION_QUEUE_ERROR_NO_BUFFER, 1);
-             continue;
+             break;
            }
          break;
        case SESSION_IO_EVT_RX:
@@ -964,7 +973,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                  && svm_fifo_max_dequeue_cons (s->tx_fifo)))
            {
              e->postponed += 1;
-             vec_add1 (wrk->pending_disconnects, *e);
+             session_evt_add_pending (wrk, elt);
              continue;
            }
 
@@ -973,7 +982,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        case SESSION_IO_EVT_BUILTIN_RX:
          s = session_event_get_session (e, thread_index);
          if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
-           continue;
+           break;
          svm_fifo_unset_event (s->rx_fifo);
          app_wrk = app_worker_get (s->app_wrk_index);
          app_worker_builtin_rx (app_wrk, s);
@@ -982,7 +991,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
          s = session_get_from_handle_if_valid (e->session_handle);
          wrk->ctx.s = s;
          if (PREDICT_TRUE (s != 0))
-           session_tx_fifo_dequeue_internal (vm, node, wrk, e,
+           session_tx_fifo_dequeue_internal (vm, node, wrk, elt,
                                              &n_tx_packets);
          break;
        case SESSION_CTRL_EVT_RPC:
@@ -1009,10 +1018,15 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        default:
          clib_warning ("unhandled event type %d", e->event_type);
        }
+
+      /* Regrab elements in case pool moved */
+      elt = pool_elt_at_index (wrk->event_elts, ei);
+      if (!clib_llist_elt_is_linked (elt, evt_list))
+       session_evt_elt_free (wrk, elt);
+
+      new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
     }
 
-  _vec_len (fifo_events) = 0;
-  wrk->free_event_vector = fifo_events;
   wrk->last_tx_packets = n_tx_packets;
 
   vlib_node_increment_counter (vm, session_queue_node.index,
@@ -1128,7 +1142,7 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
 u8
 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
 {
-  session_event_t *pending_event_vector, *evt;
+  session_evt_elt_t *elt;
   session_worker_t *wrk;
   int i, index, found = 0;
   svm_msg_q_msg_t *msg;
@@ -1159,16 +1173,20 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
   /*
    * Search pending events vector
    */
-  pending_event_vector = wrk->pending_event_vector;
-  vec_foreach (evt, pending_event_vector)
-  {
-    found = session_node_cmp_event (evt, f);
+
+  /* *INDENT-OFF* */
+  clib_llist_foreach (wrk->event_elts, evt_list,
+                      session_evt_pending_head (wrk), elt, ({
+    found = session_node_cmp_event (&elt->evt, f);
     if (found)
       {
-       clib_memcpy_fast (e, evt, sizeof (*evt));
+       clib_memcpy_fast (e, &elt->evt, sizeof (*e));
        break;
       }
-  }
+
+  }));
+  /* *INDENT-ON* */
+
   return found;
 }
 
index 1648021..d521a72 100644 (file)
@@ -102,7 +102,17 @@ do {                                                                       \
  * @param H    list head
  * @return     1 if sentinel is the only node part of the list, 0 otherwise
  */
-#define clib_llist_is_empty(LP,name,H) ((H) == clib_llist_next((LP),name,(H)))
+#define clib_llist_is_empty(LP,name,H)                                         \
+  (clib_llist_entry_index (LP,H) == (H)->name.next)
+/**
+ * Check if element is linked in a list
+ *
+ * @param E    list element
+ * @param name list anchor name
+ */
+#define clib_llist_elt_is_linked(E,name)                               \
+  ((E)->name.next != CLIB_LLIST_INVALID_INDEX                          \
+   && (E)->name.prev != CLIB_LLIST_INVALID_INDEX)
 /**
  * Insert entry between previous and next
  *
@@ -175,7 +185,22 @@ do {                                                                       \
   _lprev (_ll_var (N),name) = _lprev (E,name);                         \
   _lnext (E,name) = _lprev (E,name) = CLIB_LLIST_INVALID_INDEX;                \
 }while (0)
-
+/**
+ * Removes and returns the first element in the list.
+ *
+ * The element is not freed. It's the responsability of the caller to
+ * free it.
+ *
+ * @param LP   linked list pool
+ * @param name list anchor name
+ * @param E    storage the first entry
+ * @param H    list head entry
+ */
+#define clib_llist_pop_first(LP,name,E,H)                              \
+do {                                                                   \
+  E = clib_llist_next (LP,name,H);                                     \
+  clib_llist_remove (LP,name,E);                                       \
+} while (0)
 /**
  * Splice two lists at a given position
  *