svm: rename fifo tx notifications to reflect use
[vpp.git] / src / vnet / session / session_node.c
index 8167947..8abfb2f 100644 (file)
@@ -82,14 +82,14 @@ session_mq_accepted_reply_handler (void *data)
     {
       old_state = s->session_state;
       s->session_state = SESSION_STATE_READY;
-      if (!svm_fifo_is_empty (s->rx_fifo))
+
+      if (!svm_fifo_is_empty_prod (s->rx_fifo))
        app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
 
       /* Closed while waiting for app to reply. Resend disconnect */
       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
        {
-         application_t *app = application_get (app_wrk->app_index);
-         app->cb_fns.session_disconnect_callback (s);
+         app_worker_close_notify (app_wrk, s);
          s->session_state = old_state;
          return;
        }
@@ -289,7 +289,7 @@ session_mq_worker_update_handler (void *data)
     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
 
   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
-    app->cb_fns.session_disconnect_callback (s);
+    app_worker_close_notify (app_wrk, s);
 }
 
 vlib_node_registration_t session_queue_node;
@@ -395,7 +395,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
        }
       else
        {
-         if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+         if (ctx->transport_vft->transport_options.tx_type ==
+             TRANSPORT_TX_DGRAM)
            {
              svm_fifo_t *f = ctx->s->tx_fifo;
              session_dgram_hdr_t *hdr = &ctx->hdr;
@@ -414,8 +415,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
                }
            }
          else
-           n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->tx_fifo,
-                                                   len_to_deq, data);
+           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
+                                            len_to_deq, data);
        }
       ASSERT (n_bytes_read == len_to_deq);
       chain_b->current_length = n_bytes_read;
@@ -466,7 +467,7 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
     }
   else
     {
-      if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+      if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
        {
          session_dgram_hdr_t *hdr = &ctx->hdr;
          svm_fifo_t *f = ctx->s->tx_fifo;
@@ -494,8 +495,8 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
        }
       else
        {
-         n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->tx_fifo,
-                                                 len_to_deq, data0);
+         n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
+                                          len_to_deq, data0);
          ASSERT (n_bytes_read > 0);
        }
     }
@@ -558,7 +559,7 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
                               u32 max_segs, u8 peek_data)
 {
   u32 n_bytes_per_buf, n_bytes_per_seg;
-  ctx->max_dequeue = svm_fifo_max_dequeue (ctx->s->tx_fifo);
+  ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
   if (peek_data)
     {
       /* Offset in rx fifo from where to peek data */
@@ -572,7 +573,7 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
     }
   else
     {
-      if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+      if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
        {
          if (ctx->max_dequeue <= sizeof (ctx->hdr))
            {
@@ -782,16 +783,20 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
     if (svm_fifo_set_event (ctx->s->tx_fifo))
       vec_add1 (wrk->pending_event_vector, *e);
 
-  if (!peek_data && ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+  if (!peek_data
+      && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
     {
       /* Fix dgram pre header */
       if (ctx->max_len_to_snd < ctx->max_dequeue)
        svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
                                 sizeof (session_dgram_pre_hdr_t));
       /* More data needs to be read */
-      else if (svm_fifo_max_dequeue (ctx->s->tx_fifo) > 0)
+      else if (svm_fifo_max_dequeue_cons (ctx->s->tx_fifo) > 0)
        if (svm_fifo_set_event (ctx->s->tx_fifo))
          vec_add1 (wrk->pending_event_vector, *e);
+
+      if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd))
+       session_dequeue_notify (ctx->s);
     }
   return SESSION_TX_OK;
 }
@@ -905,9 +910,8 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   for (i = 0; i < n_events; i++)
     {
-      session_t *s;            /* $$$ prefetch 1 ahead maybe */
       session_event_t *e;
-      u8 need_tx_ntf;
+      session_t *s;
 
       e = &fifo_events[i];
       switch (e->event_type)
@@ -933,20 +937,20 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
           * different nodes */
          rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, e,
                                                       &n_tx_packets);
-         if (PREDICT_TRUE (rv == SESSION_TX_OK))
-           {
-             need_tx_ntf = svm_fifo_needs_tx_ntf (s->tx_fifo,
-                                                  wrk->ctx.max_len_to_snd);
-             if (PREDICT_FALSE (need_tx_ntf))
-               session_dequeue_notify (s);
-           }
-         else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
+         if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
            {
              vlib_node_increment_counter (vm, node->node_index,
                                           SESSION_QUEUE_ERROR_NO_BUFFER, 1);
              continue;
            }
          break;
+       case SESSION_IO_EVT_RX:
+         s = session_event_get_session (e, thread_index);
+         if (!s)
+           break;
+         transport_app_rx_evt (session_get_transport_proto (s),
+                               s->connection_index, s->thread_index);
+         break;
        case SESSION_CTRL_EVT_CLOSE:
          s = session_get_from_handle_if_valid (e->session_handle);
          if (PREDICT_FALSE (!s))
@@ -957,7 +961,8 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
           * and the tx queue is still not empty, try to wait for some
           * dispatch cycles */
          if (!e->postponed
-             || (e->postponed < 200 && svm_fifo_max_dequeue (s->tx_fifo)))
+             || (e->postponed < 200
+                 && svm_fifo_max_dequeue_cons (s->tx_fifo)))
            {
              e->postponed += 1;
              vec_add1 (wrk->pending_disconnects, *e);