session: improve event logging
[vpp.git] / src / vnet / session / session_node.c
index db5123b..a847888 100644 (file)
@@ -82,14 +82,14 @@ session_mq_accepted_reply_handler (void *data)
     {
       old_state = s->session_state;
       s->session_state = SESSION_STATE_READY;
-      if (!svm_fifo_is_empty (s->rx_fifo))
+
+      if (!svm_fifo_is_empty_prod (s->rx_fifo))
        app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
 
       /* Closed while waiting for app to reply. Resend disconnect */
       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
        {
-         application_t *app = application_get (app_wrk->app_index);
-         app->cb_fns.session_disconnect_callback (s);
+         app_worker_close_notify (app_wrk, s);
          s->session_state = old_state;
          return;
        }
@@ -289,7 +289,7 @@ session_mq_worker_update_handler (void *data)
     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
 
   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
-    app->cb_fns.session_disconnect_callback (s);
+    app_worker_close_notify (app_wrk, s);
 }
 
 vlib_node_registration_t session_queue_node;
@@ -395,7 +395,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
        }
       else
        {
-         if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+         if (ctx->transport_vft->transport_options.tx_type ==
+             TRANSPORT_TX_DGRAM)
            {
              svm_fifo_t *f = ctx->s->tx_fifo;
              session_dgram_hdr_t *hdr = &ctx->hdr;
@@ -414,8 +415,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
                }
            }
          else
-           n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->tx_fifo,
-                                                   len_to_deq, data);
+           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
+                                            len_to_deq, data);
        }
       ASSERT (n_bytes_read == len_to_deq);
       chain_b->current_length = n_bytes_read;
@@ -466,7 +467,7 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
     }
   else
     {
-      if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+      if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
        {
          session_dgram_hdr_t *hdr = &ctx->hdr;
          svm_fifo_t *f = ctx->s->tx_fifo;
@@ -494,8 +495,8 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
        }
       else
        {
-         n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->tx_fifo,
-                                                 len_to_deq, data0);
+         n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
+                                          len_to_deq, data0);
          ASSERT (n_bytes_read > 0);
        }
     }
@@ -507,15 +508,6 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
    */
   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
-
-  /* *INDENT-OFF* */
-  SESSION_EVT_DBG(SESSION_EVT_DEQ, ctx->s, ({
-       ed->data[0] = SESSION_IO_EVT_TX;
-       ed->data[1] = ctx->max_dequeue;
-       ed->data[2] = len_to_deq;
-       ed->data[3] = ctx->left_to_snd;
-  }));
-  /* *INDENT-ON* */
 }
 
 always_inline u8
@@ -558,7 +550,7 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
                               u32 max_segs, u8 peek_data)
 {
   u32 n_bytes_per_buf, n_bytes_per_seg;
-  ctx->max_dequeue = svm_fifo_max_dequeue (ctx->s->tx_fifo);
+  ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
   if (peek_data)
     {
       /* Offset in rx fifo from where to peek data */
@@ -572,7 +564,7 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
     }
   else
     {
-      if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+      if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
        {
          if (ctx->max_dequeue <= sizeof (ctx->hdr))
            {
@@ -623,13 +615,14 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
 always_inline int
 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                session_worker_t * wrk,
-                               session_event_t * e, int *n_tx_packets,
+                               session_evt_elt_t * elt, int *n_tx_packets,
                                u8 peek_data)
 {
   u32 next_index, next0, next1, *to_next, n_left_to_next, n_left, pbi;
   u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0;
   session_main_t *smm = &session_main;
   session_tx_context_t *ctx = &wrk->ctx;
+  session_event_t *e = &elt->evt;
   transport_proto_t tp;
   vlib_buffer_t *pb;
   u16 n_bufs, rv;
@@ -637,7 +630,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
     {
       if (rv < 2)
-       vec_add1 (wrk->pending_event_vector, *e);
+       session_evt_add_pending (wrk, elt);
       return SESSION_TX_NO_DATA;
     }
 
@@ -661,7 +654,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                                   ctx->snd_mss);
   if (ctx->snd_space == 0 || ctx->snd_mss == 0)
     {
-      vec_add1 (wrk->pending_event_vector, *e);
+      session_evt_add_pending (wrk, elt);
       return SESSION_TX_NO_DATA;
     }
 
@@ -683,7 +676,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
     {
       if (n_bufs)
        vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
-      vec_add1 (wrk->pending_event_vector, *e);
+      session_evt_add_pending (wrk, elt);
       return SESSION_TX_NO_BUFFERS;
     }
 
@@ -769,29 +762,35 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                            ctx->n_segs_per_evt, ctx->s, n_trace);
   if (PREDICT_FALSE (n_bufs))
     {
-      clib_warning ("not all buffers consumed");
       vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
     }
   *n_tx_packets += ctx->n_segs_per_evt;
   transport_connection_update_tx_stats (ctx->tc, ctx->max_len_to_snd);
   vlib_put_next_frame (vm, node, next_index, n_left_to_next);
 
+  SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
+              ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
+
   /* If we couldn't dequeue all bytes mark as partially read */
   ASSERT (ctx->left_to_snd == 0);
   if (ctx->max_len_to_snd < ctx->max_dequeue)
     if (svm_fifo_set_event (ctx->s->tx_fifo))
-      vec_add1 (wrk->pending_event_vector, *e);
+      session_evt_add_pending (wrk, elt);
 
-  if (!peek_data && ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+  if (!peek_data
+      && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
     {
       /* Fix dgram pre header */
       if (ctx->max_len_to_snd < ctx->max_dequeue)
        svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
                                 sizeof (session_dgram_pre_hdr_t));
       /* More data needs to be read */
-      else if (svm_fifo_max_dequeue (ctx->s->tx_fifo) > 0)
+      else if (svm_fifo_max_dequeue_cons (ctx->s->tx_fifo) > 0)
        if (svm_fifo_set_event (ctx->s->tx_fifo))
-         vec_add1 (wrk->pending_event_vector, *e);
+         session_evt_add_pending (wrk, elt);
+
+      if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd))
+       session_dequeue_notify (ctx->s);
     }
   return SESSION_TX_OK;
 }
@@ -799,7 +798,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
 int
 session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
                              session_worker_t * wrk,
-                             session_event_t * e, int *n_tx_pkts)
+                             session_evt_elt_t * e, int *n_tx_pkts)
 {
   return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 1);
 }
@@ -807,7 +806,7 @@ session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
 int
 session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
                                 session_worker_t * wrk,
-                                session_event_t * e, int *n_tx_pkts)
+                                session_evt_elt_t * e, int *n_tx_pkts)
 {
   return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 0);
 }
@@ -816,22 +815,20 @@ int
 session_tx_fifo_dequeue_internal (vlib_main_t * vm,
                                  vlib_node_runtime_t * node,
                                  session_worker_t * wrk,
-                                 session_event_t * e, int *n_tx_pkts)
+                                 session_evt_elt_t * e, int *n_tx_pkts)
 {
   session_t *s = wrk->ctx.s;
-  application_t *app;
 
-  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+  if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
     return 0;
-  app = application_get (s->t_app_index);
   svm_fifo_unset_event (s->tx_fifo);
-  return app->cb_fns.builtin_app_tx_callback (s);
+  return transport_custom_tx (session_get_transport_proto (s), s);
 }
 
 always_inline session_t *
 session_event_get_session (session_event_t * e, u8 thread_index)
 {
-  return session_get_if_valid (e->fifo->master_session_index, thread_index);
+  return session_get_if_valid (e->session_index, thread_index);
 }
 
 static void
@@ -851,18 +848,18 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
 {
   session_main_t *smm = vnet_get_session_main ();
-  u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
+  u32 thread_index = vm->thread_index, n_to_dequeue;
   session_worker_t *wrk = &smm->wrk[thread_index];
-  session_event_t *e, *fifo_events;
+  session_evt_elt_t *elt, *new_he, *new_te, *pending_he;
+  session_evt_elt_t *disconnects_he, *postponed_he;
   svm_msg_q_msg_t _msg, *msg = &_msg;
   f64 now = vlib_time_now (vm);
   int n_tx_packets = 0, i, rv;
   app_worker_t *app_wrk;
-  application_t *app;
   svm_msg_q_t *mq;
   void (*fp) (void *);
 
-  SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
+  SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
 
   /*
    *  Update transport time
@@ -870,12 +867,12 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   session_update_dispatch_period (wrk, now, thread_index);
   transport_update_time (now, thread_index);
 
-  SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
-
   /* Make sure postponed events are handled first */
-  fifo_events = wrk->free_event_vector;
-  vec_append (fifo_events, wrk->postponed_event_vector);
-  _vec_len (wrk->postponed_event_vector) = 0;
+  new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
+  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
+
+  postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
+  clib_llist_splice (wrk->event_elts, evt_list, new_te, postponed_he);
 
   /* Try to dequeue what is available. Don't wait for lock.
    * XXX: we may need priorities here */
@@ -885,34 +882,38 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
     {
       for (i = 0; i < n_to_dequeue; i++)
        {
-         vec_add2 (fifo_events, e, 1);
+         elt = session_evt_elt_alloc (wrk);
          svm_msg_q_sub_w_lock (mq, msg);
          /* Works because reply messages are smaller than a session evt.
           * If we ever need to support bigger messages this needs to be
           * fixed */
-         clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
+         clib_memcpy_fast (&elt->evt, svm_msg_q_msg_data (mq, msg),
+                           sizeof (elt->evt));
          svm_msg_q_free_msg (mq, msg);
+         new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
+         clib_llist_add_tail (wrk->event_elts, evt_list, elt, new_he);
        }
       svm_msg_q_unlock (mq);
     }
 
-  vec_append (fifo_events, wrk->pending_event_vector);
-  vec_append (fifo_events, wrk->pending_disconnects);
+  pending_he = pool_elt_at_index (wrk->event_elts, wrk->pending_head);
+  postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
+  disconnects_he = pool_elt_at_index (wrk->event_elts, wrk->disconnects_head);
 
-  _vec_len (wrk->pending_event_vector) = 0;
-  _vec_len (wrk->pending_disconnects) = 0;
-
-  n_events = vec_len (fifo_events);
-  if (PREDICT_FALSE (!n_events))
-    return 0;
+  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
+  clib_llist_splice (wrk->event_elts, evt_list, new_te, pending_he);
+  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
+  clib_llist_splice (wrk->event_elts, evt_list, new_te, disconnects_he);
 
-  for (i = 0; i < n_events; i++)
+  while (!clib_llist_is_empty (wrk->event_elts, evt_list, new_he))
     {
-      session_t *s;            /* $$$ prefetch 1 ahead maybe */
+      clib_llist_index_t ei;
       session_event_t *e;
-      u8 need_tx_ntf;
+      session_t *s;
 
-      e = &fifo_events[i];
+      clib_llist_pop_first (wrk->event_elts, evt_list, elt, new_he);
+      ei = clib_llist_entry_index (wrk->event_elts, elt);
+      e = &elt->evt;
       switch (e->event_type)
        {
        case SESSION_IO_EVT_TX_FLUSH:
@@ -920,35 +921,36 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
          /* Don't try to send more that one frame per dispatch cycle */
          if (n_tx_packets == VLIB_FRAME_SIZE)
            {
-             vec_add1 (wrk->postponed_event_vector, *e);
-             break;
+             session_evt_add_postponed (wrk, elt);
+             continue;
            }
 
          s = session_event_get_session (e, thread_index);
          if (PREDICT_FALSE (!s))
            {
              clib_warning ("session was freed!");
-             continue;
+             break;
            }
+         CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
          wrk->ctx.s = s;
          /* Spray packets in per session type frames, since they go to
           * different nodes */
-         rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, e,
+         rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, elt,
                                                       &n_tx_packets);
-         if (PREDICT_TRUE (rv == SESSION_TX_OK))
-           {
-             need_tx_ntf = svm_fifo_needs_tx_ntf (s->tx_fifo,
-                                                  wrk->ctx.max_len_to_snd);
-             if (PREDICT_FALSE (need_tx_ntf))
-               session_dequeue_notify (s);
-           }
-         else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
+         if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
            {
              vlib_node_increment_counter (vm, node->node_index,
                                           SESSION_QUEUE_ERROR_NO_BUFFER, 1);
-             continue;
+             break;
            }
          break;
+       case SESSION_IO_EVT_RX:
+         s = session_event_get_session (e, thread_index);
+         if (!s)
+           break;
+         transport_app_rx_evt (session_get_transport_proto (s),
+                               s->connection_index, s->thread_index);
+         break;
        case SESSION_CTRL_EVT_CLOSE:
          s = session_get_from_handle_if_valid (e->session_handle);
          if (PREDICT_FALSE (!s))
@@ -959,10 +961,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
           * and the tx queue is still not empty, try to wait for some
           * dispatch cycles */
          if (!e->postponed
-             || (e->postponed < 200 && svm_fifo_max_dequeue (s->tx_fifo)))
+             || (e->postponed < 200
+                 && svm_fifo_max_dequeue_cons (s->tx_fifo)))
            {
              e->postponed += 1;
-             vec_add1 (wrk->pending_disconnects, *e);
+             session_evt_add_pending (wrk, elt);
              continue;
            }
 
@@ -971,17 +974,16 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        case SESSION_IO_EVT_BUILTIN_RX:
          s = session_event_get_session (e, thread_index);
          if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
-           continue;
+           break;
          svm_fifo_unset_event (s->rx_fifo);
          app_wrk = app_worker_get (s->app_wrk_index);
-         app = application_get (app_wrk->app_index);
-         app->cb_fns.builtin_app_rx_callback (s);
+         app_worker_builtin_rx (app_wrk, s);
          break;
        case SESSION_IO_EVT_BUILTIN_TX:
          s = session_get_from_handle_if_valid (e->session_handle);
          wrk->ctx.s = s;
          if (PREDICT_TRUE (s != 0))
-           session_tx_fifo_dequeue_internal (vm, node, wrk, e,
+           session_tx_fifo_dequeue_internal (vm, node, wrk, elt,
                                              &n_tx_packets);
          break;
        case SESSION_CTRL_EVT_RPC:
@@ -1008,16 +1010,21 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        default:
          clib_warning ("unhandled event type %d", e->event_type);
        }
+
+      /* Regrab elements in case pool moved */
+      elt = pool_elt_at_index (wrk->event_elts, ei);
+      if (!clib_llist_elt_is_linked (elt, evt_list))
+       session_evt_elt_free (wrk, elt);
+
+      new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
     }
 
-  _vec_len (fifo_events) = 0;
-  wrk->free_event_vector = fifo_events;
   wrk->last_tx_packets = n_tx_packets;
 
   vlib_node_increment_counter (vm, session_queue_node.index,
                               SESSION_QUEUE_ERROR_TX, n_tx_packets);
 
-  SESSION_EVT_DBG (SESSION_EVT_DISPATCH_END, smm, thread_index);
+  SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
 
   return n_tx_packets;
 }
@@ -1103,7 +1110,7 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
     case SESSION_IO_EVT_RX:
     case SESSION_IO_EVT_TX:
     case SESSION_IO_EVT_BUILTIN_RX:
-      if (e->fifo == f)
+      if (e->session_index == f->master_session_index)
        return 1;
       break;
     case SESSION_CTRL_EVT_CLOSE:
@@ -1127,7 +1134,7 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
 u8
 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
 {
-  session_event_t *pending_event_vector, *evt;
+  session_evt_elt_t *elt;
   session_worker_t *wrk;
   int i, index, found = 0;
   svm_msg_q_msg_t *msg;
@@ -1158,16 +1165,20 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
   /*
    * Search pending events vector
    */
-  pending_event_vector = wrk->pending_event_vector;
-  vec_foreach (evt, pending_event_vector)
-  {
-    found = session_node_cmp_event (evt, f);
+
+  /* *INDENT-OFF* */
+  clib_llist_foreach (wrk->event_elts, evt_list,
+                      session_evt_pending_head (wrk), elt, ({
+    found = session_node_cmp_event (&elt->evt, f);
     if (found)
       {
-       clib_memcpy_fast (e, evt, sizeof (*evt));
+       clib_memcpy_fast (e, &elt->evt, sizeof (*e));
        break;
       }
-  }
+
+  }));
+  /* *INDENT-ON* */
+
   return found;
 }