session: separate ctrl, new and old events 64/20764/20
authorFlorin Coras <fcoras@cisco.com>
Mon, 22 Jul 2019 02:23:46 +0000 (19:23 -0700)
committerFlorin Coras <fcoras@cisco.com>
Fri, 26 Jul 2019 23:18:50 +0000 (16:18 -0700)
Type: feature

Change-Id: I5e030b23943c012d8191ff657165055d33ec87a2
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c
src/vppinfra/llist.h

index 1c8b7fb..318e01d 100644 (file)
@@ -155,11 +155,10 @@ session_program_transport_close (session_t * s)
   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
     {
       wrk = session_main_get_worker (s->thread_index);
-      elt = session_evt_elt_alloc (wrk);
+      elt = session_evt_alloc_ctrl (wrk);
       clib_memset (&elt->evt, 0, sizeof (session_event_t));
       elt->evt.session_handle = session_handle (s);
       elt->evt.event_type = SESSION_CTRL_EVT_CLOSE;
-      session_evt_add_pending_disconnects (wrk, elt);
     }
   else
     session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE);
@@ -1404,11 +1403,9 @@ session_manager_main_enable (vlib_main_t * vm)
   for (i = 0; i < num_threads; i++)
     {
       wrk = &smm->wrk[i];
+      wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
-      wrk->postponed_head = clib_llist_make_head (wrk->event_elts, evt_list);
-      wrk->disconnects_head = clib_llist_make_head (wrk->event_elts,
-                                                   evt_list);
       wrk->vm = vlib_mains[i];
       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
 
index 8f7bd6c..5af824a 100644 (file)
@@ -96,18 +96,15 @@ typedef struct session_worker_
   /** Pool of session event list elements */
   session_evt_elt_t *event_elts;
 
+  /** Head of control events list */
+  clib_llist_index_t ctrl_head;
+
   /** Head of list of elements */
   clib_llist_index_t new_head;
 
   /** Head of list of pending events */
   clib_llist_index_t old_head;
 
-  /** Head of list of postponed events */
-  clib_llist_index_t postponed_head;
-
-  /** Head of list of disconnect events */
-  clib_llist_index_t disconnects_head;
-
   /** Peekers rw lock */
   clib_rwlock_t peekers_rw_locks;
 
@@ -203,44 +200,21 @@ session_evt_elt_free (session_worker_t * wrk, session_evt_elt_t * elt)
   pool_put (wrk->event_elts, elt);
 }
 
-static inline session_evt_elt_t *
-session_evt_old_head (session_worker_t * wrk)
-{
-  return pool_elt_at_index (wrk->event_elts, wrk->old_head);
-}
-
-static inline session_evt_elt_t *
-session_evt_postponed_head (session_worker_t * wrk)
-{
-  return pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
-}
-
-static inline session_evt_elt_t *
-session_evt_pending_disconnects_head (session_worker_t * wrk)
-{
-  return pool_elt_at_index (wrk->event_elts, wrk->disconnects_head);
-}
-
 static inline void
 session_evt_add_old (session_worker_t * wrk, session_evt_elt_t * elt)
 {
   clib_llist_add_tail (wrk->event_elts, evt_list, elt,
-                      session_evt_old_head (wrk));
-}
-
-static inline void
-session_evt_add_postponed (session_worker_t * wrk, session_evt_elt_t * elt)
-{
-  clib_llist_add_tail (wrk->event_elts, evt_list, elt,
-                      session_evt_postponed_head (wrk));
+                      pool_elt_at_index (wrk->event_elts, wrk->old_head));
 }
 
-static inline void
-session_evt_add_pending_disconnects (session_worker_t * wrk,
-                                    session_evt_elt_t * elt)
+static inline session_evt_elt_t *
+session_evt_alloc_ctrl (session_worker_t * wrk)
 {
+  session_evt_elt_t *elt;
+  elt = session_evt_elt_alloc (wrk);
   clib_llist_add_tail (wrk->event_elts, evt_list, elt,
-                      session_evt_pending_disconnects_head (wrk));
+                      pool_elt_at_index (wrk->event_elts, wrk->ctrl_head));
+  return elt;
 }
 
 static inline session_evt_elt_t *
index 1bb5cb2..35704b7 100644 (file)
@@ -665,7 +665,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
     }
 
   ctx->snd_space = transport_connection_snd_space (ctx->tc,
-                                                  wrk->vm->clib_time.
+                                                  vm->clib_time.
                                                   last_cpu_time,
                                                   ctx->snd_mss);
 
@@ -693,6 +693,8 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
       if (n_bufs)
        vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
       session_evt_add_old (wrk, elt);
+      vlib_node_increment_counter (wrk->vm, node->node_index,
+                                  SESSION_QUEUE_ERROR_NO_BUFFER, 1);
       return SESSION_TX_NO_BUFFERS;
     }
 
@@ -854,41 +856,29 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
 {
   session_main_t *smm = &session_main;
   app_worker_t *app_wrk;
+  clib_llist_index_t ei;
   void (*fp) (void *);
   session_event_t *e;
   session_t *s;
-  int rv;
 
+  ei = clib_llist_entry_index (wrk->event_elts, elt);
   e = &elt->evt;
+
   switch (e->event_type)
     {
     case SESSION_IO_EVT_TX_FLUSH:
     case SESSION_IO_EVT_TX:
-      /* Don't try to send more that one frame per dispatch cycle */
-      if (*n_tx_packets == VLIB_FRAME_SIZE)
-       {
-         session_evt_add_postponed (wrk, elt);
-         return;
-       }
-
       s = session_event_get_session (e, thread_index);
       if (PREDICT_FALSE (!s))
        {
-         clib_warning ("session was freed!");
+         clib_warning ("session %u was freed!", e->session_index);
          break;
        }
       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
       wrk->ctx.s = s;
       /* Spray packets in per session type frames, since they go to
        * different nodes */
-      rv = (smm->session_tx_fns[s->session_type]) (wrk, node, elt,
-                                                  n_tx_packets);
-      if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
-       {
-         vlib_node_increment_counter (wrk->vm, node->node_index,
-                                      SESSION_QUEUE_ERROR_NO_BUFFER, 1);
-         break;
-       }
+      (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
       break;
     case SESSION_IO_EVT_RX:
       s = session_event_get_session (e, thread_index);
@@ -941,6 +931,11 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
     default:
       clib_warning ("unhandled event type %d", e->event_type);
     }
+
+  /* Regrab elements in case pool moved */
+  elt = pool_elt_at_index (wrk->event_elts, ei);
+  if (!clib_llist_elt_is_linked (elt, evt_list))
+    session_evt_elt_free (wrk, elt);
 }
 
 static uword
@@ -950,10 +945,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   session_main_t *smm = vnet_get_session_main ();
   u32 thread_index = vm->thread_index, n_to_dequeue;
   session_worker_t *wrk = &smm->wrk[thread_index];
-  session_evt_elt_t *elt, *new_he, *new_te, *old_he;
-  session_evt_elt_t *disconnects_he, *postponed_he;
+  session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
   svm_msg_q_msg_t _msg, *msg = &_msg;
+  clib_llist_index_t old_ti;
   int i, n_tx_packets = 0;
+  session_event_t *evt;
   svm_msg_q_t *mq;
 
   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
@@ -965,12 +961,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
    */
   transport_update_time (wrk->last_vlib_time, thread_index);
 
-  /* Make sure postponed events are handled first */
-  new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
-  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
-
-  postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
-  clib_llist_splice (wrk->event_elts, evt_list, new_te, postponed_he);
+  /*
+   *  Dequeue and handle new events
+   */
 
   /* Try to dequeue what is available. Don't wait for lock.
    * XXX: we may need priorities here */
@@ -980,44 +973,78 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
     {
       for (i = 0; i < n_to_dequeue; i++)
        {
-         elt = session_evt_elt_alloc (wrk);
          svm_msg_q_sub_w_lock (mq, msg);
+         evt = svm_msg_q_msg_data (mq, msg);
+         if (evt->event_type > SESSION_IO_EVT_BUILTIN_TX)
+           elt = session_evt_alloc_ctrl (wrk);
+         else
+           elt = session_evt_alloc_new (wrk);
          /* Works because reply messages are smaller than a session evt.
           * If we ever need to support bigger messages this needs to be
           * fixed */
-         clib_memcpy_fast (&elt->evt, svm_msg_q_msg_data (mq, msg),
-                           sizeof (elt->evt));
+         clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
          svm_msg_q_free_msg (mq, msg);
-         new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
-         clib_llist_add_tail (wrk->event_elts, evt_list, elt, new_he);
        }
       svm_msg_q_unlock (mq);
     }
 
-  old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
-  disconnects_he = pool_elt_at_index (wrk->event_elts, wrk->disconnects_head);
+  /*
+   * Handle control events
+   */
 
-  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
-  clib_llist_splice (wrk->event_elts, evt_list, new_te, old_he);
-  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
-  clib_llist_splice (wrk->event_elts, evt_list, new_te, disconnects_he);
+  ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
 
-  while (!clib_llist_is_empty (wrk->event_elts, evt_list, new_he))
+  /* *INDENT-OFF* */
+  clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
+    clib_llist_remove (wrk->event_elts, evt_list, elt);
+    session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+  }));
+  /* *INDENT-ON* */
+
+  /*
+   * Handle the new io events.
+   */
+
+  new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
+
+  /* *INDENT-OFF* */
+  clib_llist_foreach_safe (wrk->event_elts, evt_list, new_he, elt, ({
+    session_evt_type_t et;
+
+    et = elt->evt.event_type;
+    clib_llist_remove (wrk->event_elts, evt_list, elt);
+
+    /* Postpone tx events if we can't handle them this dispatch cycle */
+    if (n_tx_packets >= VLIB_FRAME_SIZE
+       && (et == SESSION_IO_EVT_TX || et == SESSION_IO_EVT_TX_FLUSH))
+      {
+       clib_llist_add (wrk->event_elts, evt_list, elt, new_he);
+       continue;
+      }
+
+    session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+  }));
+  /* *INDENT-ON* */
+
+  /*
+   * Handle the old io events
+   */
+
+  old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
+  old_ti = clib_llist_prev_index (old_he, evt_list);
+
+  while (!clib_llist_is_empty (wrk->event_elts, evt_list, old_he))
     {
       clib_llist_index_t ei;
 
-      clib_llist_pop_first (wrk->event_elts, evt_list, elt, new_he);
+      clib_llist_pop_first (wrk->event_elts, evt_list, elt, old_he);
       ei = clib_llist_entry_index (wrk->event_elts, elt);
-
       session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
 
-      /* Regrab elements in case pool moved */
-      elt = pool_elt_at_index (wrk->event_elts, ei);
-      if (!clib_llist_elt_is_linked (elt, evt_list))
-       session_evt_elt_free (wrk, elt);
-
-      new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
-    }
+      old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
+      if (n_tx_packets >= VLIB_FRAME_SIZE || ei == old_ti)
+       break;
+    };
 
   vlib_node_increment_counter (vm, session_queue_node.index,
                               SESSION_QUEUE_ERROR_TX, n_tx_packets);
@@ -1166,7 +1193,8 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
 
   /* *INDENT-OFF* */
   clib_llist_foreach (wrk->event_elts, evt_list,
-                      session_evt_old_head (wrk), elt, ({
+                      pool_elt_at_index (wrk->event_elts, wrk->old_head),
+                      elt, ({
     found = session_node_cmp_event (&elt->evt, f);
     if (found)
       {
index d521a72..25fb95f 100644 (file)
@@ -51,6 +51,22 @@ typedef struct clib_llist_anchor
  * @return     pool entry index
  */
 #define clib_llist_entry_index(LP,E) ((E) - (LP))
+/**
+ * Get prev list entry index
+ *
+ * @param E    pool entry
+ * @name       list anchor name
+ * @return     previous index
+ */
+#define clib_llist_prev_index(E,name) _lprev(E,name)
+/**
+ * Get next list entry index
+ *
+ * @param E    pool entry
+ * @name       list anchor name
+ * @return     next index
+ */
+#define clib_llist_next_index(E,name) _lnext(E,name)
 /**
  * Get next pool entry
  *
@@ -247,6 +263,28 @@ do {                                                                       \
       (E) = _ll_var (n);                                               \
     }                                                                  \
 } while (0)
+/**
+ * Walk list starting at head safe
+ *
+ * @param LP   linked list pool
+ * @param name list anchor name
+ * @param HI   head index
+ * @param EI   entry index iterator
+ * @param body code to be executed
+ */
+#define clib_llist_foreach_safe(LP,name,H,E,body)                      \
+do {                                                                   \
+  clib_llist_index_t _ll_var (HI) = clib_llist_entry_index (LP, H);    \
+  clib_llist_index_t _ll_var (EI), _ll_var (NI);                       \
+  _ll_var (EI) = _lnext ((H),name);                                    \
+  while (_ll_var (EI) != _ll_var (HI))                                 \
+    {                                                                  \
+      (E) = pool_elt_at_index (LP, _ll_var (EI));                      \
+      _ll_var (NI) = _lnext ((E),name);                                        \
+      do { body; } while (0);                                          \
+      _ll_var (EI) = _ll_var (NI);                                     \
+    }                                                                  \
+} while (0)
 /**
  * Walk list starting at head in reverse order
  *