session: separate ctrl, new and old events
[vpp.git] / src / vnet / session / session_node.c
index 1bb5cb2..35704b7 100644 (file)
@@ -665,7 +665,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
     }
 
   ctx->snd_space = transport_connection_snd_space (ctx->tc,
-                                                  wrk->vm->clib_time.
+                                                  vm->clib_time.
                                                   last_cpu_time,
                                                   ctx->snd_mss);
 
@@ -693,6 +693,8 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
       if (n_bufs)
        vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
       session_evt_add_old (wrk, elt);
+      vlib_node_increment_counter (wrk->vm, node->node_index,
+                                  SESSION_QUEUE_ERROR_NO_BUFFER, 1);
       return SESSION_TX_NO_BUFFERS;
     }
 
@@ -854,41 +856,29 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
 {
   session_main_t *smm = &session_main;
   app_worker_t *app_wrk;
+  clib_llist_index_t ei;
   void (*fp) (void *);
   session_event_t *e;
   session_t *s;
-  int rv;
 
+  ei = clib_llist_entry_index (wrk->event_elts, elt);
   e = &elt->evt;
+
   switch (e->event_type)
     {
     case SESSION_IO_EVT_TX_FLUSH:
     case SESSION_IO_EVT_TX:
-      /* Don't try to send more that one frame per dispatch cycle */
-      if (*n_tx_packets == VLIB_FRAME_SIZE)
-       {
-         session_evt_add_postponed (wrk, elt);
-         return;
-       }
-
       s = session_event_get_session (e, thread_index);
       if (PREDICT_FALSE (!s))
        {
-         clib_warning ("session was freed!");
+         clib_warning ("session %u was freed!", e->session_index);
          break;
        }
       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
       wrk->ctx.s = s;
       /* Spray packets in per session type frames, since they go to
        * different nodes */
-      rv = (smm->session_tx_fns[s->session_type]) (wrk, node, elt,
-                                                  n_tx_packets);
-      if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
-       {
-         vlib_node_increment_counter (wrk->vm, node->node_index,
-                                      SESSION_QUEUE_ERROR_NO_BUFFER, 1);
-         break;
-       }
+      (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
       break;
     case SESSION_IO_EVT_RX:
       s = session_event_get_session (e, thread_index);
@@ -941,6 +931,11 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
     default:
       clib_warning ("unhandled event type %d", e->event_type);
     }
+
+  /* Regrab elements in case pool moved */
+  elt = pool_elt_at_index (wrk->event_elts, ei);
+  if (!clib_llist_elt_is_linked (elt, evt_list))
+    session_evt_elt_free (wrk, elt);
 }
 
 static uword
@@ -950,10 +945,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   session_main_t *smm = vnet_get_session_main ();
   u32 thread_index = vm->thread_index, n_to_dequeue;
   session_worker_t *wrk = &smm->wrk[thread_index];
-  session_evt_elt_t *elt, *new_he, *new_te, *old_he;
-  session_evt_elt_t *disconnects_he, *postponed_he;
+  session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
   svm_msg_q_msg_t _msg, *msg = &_msg;
+  clib_llist_index_t old_ti;
   int i, n_tx_packets = 0;
+  session_event_t *evt;
   svm_msg_q_t *mq;
 
   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
@@ -965,12 +961,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
    */
   transport_update_time (wrk->last_vlib_time, thread_index);
 
-  /* Make sure postponed events are handled first */
-  new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
-  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
-
-  postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
-  clib_llist_splice (wrk->event_elts, evt_list, new_te, postponed_he);
+  /*
+   *  Dequeue and handle new events
+   */
 
   /* Try to dequeue what is available. Don't wait for lock.
    * XXX: we may need priorities here */
@@ -980,44 +973,78 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
     {
       for (i = 0; i < n_to_dequeue; i++)
        {
-         elt = session_evt_elt_alloc (wrk);
          svm_msg_q_sub_w_lock (mq, msg);
+         evt = svm_msg_q_msg_data (mq, msg);
+         if (evt->event_type > SESSION_IO_EVT_BUILTIN_TX)
+           elt = session_evt_alloc_ctrl (wrk);
+         else
+           elt = session_evt_alloc_new (wrk);
          /* Works because reply messages are smaller than a session evt.
           * If we ever need to support bigger messages this needs to be
           * fixed */
-         clib_memcpy_fast (&elt->evt, svm_msg_q_msg_data (mq, msg),
-                           sizeof (elt->evt));
+         clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
          svm_msg_q_free_msg (mq, msg);
-         new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
-         clib_llist_add_tail (wrk->event_elts, evt_list, elt, new_he);
        }
       svm_msg_q_unlock (mq);
     }
 
-  old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
-  disconnects_he = pool_elt_at_index (wrk->event_elts, wrk->disconnects_head);
+  /*
+   * Handle control events
+   */
 
-  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
-  clib_llist_splice (wrk->event_elts, evt_list, new_te, old_he);
-  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
-  clib_llist_splice (wrk->event_elts, evt_list, new_te, disconnects_he);
+  ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
 
-  while (!clib_llist_is_empty (wrk->event_elts, evt_list, new_he))
+  /* *INDENT-OFF* */
+  clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
+    clib_llist_remove (wrk->event_elts, evt_list, elt);
+    session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+  }));
+  /* *INDENT-ON* */
+
+  /*
+   * Handle the new io events.
+   */
+
+  new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
+
+  /* *INDENT-OFF* */
+  clib_llist_foreach_safe (wrk->event_elts, evt_list, new_he, elt, ({
+    session_evt_type_t et;
+
+    et = elt->evt.event_type;
+    clib_llist_remove (wrk->event_elts, evt_list, elt);
+
+    /* Postpone tx events if we can't handle them this dispatch cycle */
+    if (n_tx_packets >= VLIB_FRAME_SIZE
+       && (et == SESSION_IO_EVT_TX || et == SESSION_IO_EVT_TX_FLUSH))
+      {
+       clib_llist_add (wrk->event_elts, evt_list, elt, new_he);
+       continue;
+      }
+
+    session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+  }));
+  /* *INDENT-ON* */
+
+  /*
+   * Handle the old io events
+   */
+
+  old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
+  old_ti = clib_llist_prev_index (old_he, evt_list);
+
+  while (!clib_llist_is_empty (wrk->event_elts, evt_list, old_he))
     {
       clib_llist_index_t ei;
 
-      clib_llist_pop_first (wrk->event_elts, evt_list, elt, new_he);
+      clib_llist_pop_first (wrk->event_elts, evt_list, elt, old_he);
       ei = clib_llist_entry_index (wrk->event_elts, elt);
-
       session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
 
-      /* Regrab elements in case pool moved */
-      elt = pool_elt_at_index (wrk->event_elts, ei);
-      if (!clib_llist_elt_is_linked (elt, evt_list))
-       session_evt_elt_free (wrk, elt);
-
-      new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
-    }
+      old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
+      if (n_tx_packets >= VLIB_FRAME_SIZE || ei == old_ti)
+       break;
+    };
 
   vlib_node_increment_counter (vm, session_queue_node.index,
                               SESSION_QUEUE_ERROR_TX, n_tx_packets);
@@ -1166,7 +1193,8 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
 
   /* *INDENT-OFF* */
   clib_llist_foreach (wrk->event_elts, evt_list,
-                      session_evt_old_head (wrk), elt, ({
+                      pool_elt_at_index (wrk->event_elts, wrk->old_head),
+                      elt, ({
     found = session_node_cmp_event (&elt->evt, f);
     if (found)
       {