session: minimize number of tx events
[vpp.git] / src / vnet / session / session_node.c
index bc22cc0..44dc9cc 100644 (file)
@@ -814,6 +814,19 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
                                     TRANSPORT_MAX_HDRS_LEN);
 }
 
+always_inline void
+session_tx_maybe_reschedule (session_worker_t * wrk,
+                            session_tx_context_t * ctx,
+                            session_evt_elt_t * elt, u8 is_peek)
+{
+  session_t *s = ctx->s;
+
+  svm_fifo_unset_event (s->tx_fifo);
+  if (svm_fifo_max_dequeue_cons (s->tx_fifo) > is_peek ? ctx->tx_offset : 0)
+    if (svm_fifo_set_event (s->tx_fifo))
+      session_evt_add_head_old (wrk, elt);
+}
+
 always_inline int
 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
                                vlib_node_runtime_t * node,
@@ -897,15 +910,13 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
        snd_space - snd_space % ctx->snd_mss : snd_space;
     }
 
-  /* Allow enqueuing of a new event */
-  svm_fifo_unset_event (ctx->s->tx_fifo);
-
   /* Check how much we can pull. */
   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
 
   if (PREDICT_FALSE (!ctx->max_len_to_snd))
     {
       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
+      session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
       return SESSION_TX_NO_DATA;
     }
 
@@ -917,8 +928,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
     {
       if (n_bufs)
        vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
-      if (svm_fifo_set_event (ctx->s->tx_fifo))
-       session_evt_add_head_old (wrk, elt);
+      session_evt_add_head_old (wrk, elt);
       vlib_node_increment_counter (wrk->vm, node->node_index,
                                   SESSION_QUEUE_ERROR_NO_BUFFER, 1);
       return SESSION_TX_NO_BUFFERS;
@@ -1002,11 +1012,14 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
               ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
 
-  /* If we couldn't dequeue all bytes mark as partially read */
   ASSERT (ctx->left_to_snd == 0);
+
+  /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
+   * check if application enqueued more data and reschedule accordingly */
   if (ctx->max_len_to_snd < ctx->max_dequeue)
-    if (svm_fifo_set_event (ctx->s->tx_fifo))
-      session_evt_add_old (wrk, elt);
+    session_evt_add_old (wrk, elt);
+  else
+    session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
 
   if (!peek_data
       && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
@@ -1257,8 +1270,8 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   u32 thread_index = vm->thread_index, n_to_dequeue;
   session_worker_t *wrk = &smm->wrk[thread_index];
   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
+  clib_llist_index_t ei, next_ei, old_ti;
   svm_msg_q_msg_t _msg, *msg = &_msg;
-  clib_llist_index_t old_ti;
   int i, n_tx_packets = 0;
   session_event_t *evt;
   svm_msg_q_t *mq;
@@ -1314,24 +1327,14 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
   old_ti = clib_llist_prev_index (old_he, evt_list);
 
-  /* *INDENT-OFF* */
-  clib_llist_foreach_safe (wrk->event_elts, evt_list, new_he, elt, ({
-    session_evt_type_t et;
-
-    et = elt->evt.event_type;
-    clib_llist_remove (wrk->event_elts, evt_list, elt);
-
-    /* Postpone tx events if we can't handle them this dispatch cycle */
-    if (n_tx_packets >= VLIB_FRAME_SIZE
-       && (et == SESSION_IO_EVT_TX || et == SESSION_IO_EVT_TX_FLUSH))
-      {
-       clib_llist_add (wrk->event_elts, evt_list, elt, new_he);
-       continue;
-      }
-
-    session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets);
-  }));
-  /* *INDENT-ON* */
+  ei = clib_llist_next_index (new_he, evt_list);
+  while (ei != wrk->new_head && n_tx_packets < VLIB_FRAME_SIZE)
+    {
+      elt = pool_elt_at_index (wrk->event_elts, ei);
+      ei = clib_llist_next_index (elt, evt_list);
+      clib_llist_remove (wrk->event_elts, evt_list, elt);
+      session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets);
+    }
 
   /*
    * Handle the old io events, if we had any prior to processing the new ones
@@ -1339,8 +1342,6 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   if (old_ti != wrk->old_head)
     {
-      clib_llist_index_t ei, next_ei;
-
       old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
       ei = clib_llist_next_index (old_he, evt_list);