svm: allow mq attachments at random offsets
[vpp.git] / src / vnet / session / session_node.c
index 346b9f4..49e4e5a 100644 (file)
@@ -472,8 +472,8 @@ session_mq_worker_update_handler (void *data)
   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
   rmp = (session_worker_update_reply_msg_t *) evt->data;
   rmp->handle = mp->handle;
-  rmp->rx_fifo = pointer_to_uword (s->rx_fifo);
-  rmp->tx_fifo = pointer_to_uword (s->tx_fifo);
+  rmp->rx_fifo = pointer_to_uword (s->rx_fifo->shr);
+  rmp->tx_fifo = pointer_to_uword (s->tx_fifo->shr);
   rmp->segment_handle = session_segment_handle (s);
   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
 
@@ -569,19 +569,23 @@ session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
                        u32 next_index, u32 * to_next, u16 n_segs,
                        session_t * s, u32 n_trace)
 {
-  session_queue_trace_t *t;
-  vlib_buffer_t *b;
-  int i;
-
-  for (i = 0; i < clib_min (n_trace, n_segs); i++)
+  while (n_trace && n_segs)
     {
-      b = vlib_get_buffer (vm, to_next[i]);
-      vlib_trace_buffer (vm, node, next_index, b, 1 /* follow_chain */ );
-      t = vlib_add_trace (vm, node, b, sizeof (*t));
-      t->session_index = s->session_index;
-      t->server_thread_index = s->thread_index;
+      vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
+      if (PREDICT_TRUE
+         (vlib_trace_buffer
+          (vm, node, next_index, b, 1 /* follow_chain */ )))
+       {
+         session_queue_trace_t *t =
+           vlib_add_trace (vm, node, b, sizeof (*t));
+         t->session_index = s->session_index;
+         t->server_thread_index = s->thread_index;
+         n_trace--;
+       }
+      to_next++;
+      n_segs--;
     }
-  vlib_set_trace_count (vm, node, n_trace - i);
+  vlib_set_trace_count (vm, node, n_trace);
 }
 
 always_inline void
@@ -946,7 +950,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
     }
 
   next_index = smm->session_type_to_next[ctx->s->session_type];
-  max_burst = VLIB_FRAME_SIZE - *n_tx_packets;
+  max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
 
   tp = session_get_transport_proto (ctx->s);
   ctx->transport_vft = transport_protocol_get_vft (tp);
@@ -1033,7 +1037,8 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
       return SESSION_TX_NO_BUFFERS;
     }
 
-  transport_connection_update_tx_bytes (ctx->tc, ctx->max_len_to_snd);
+  if (transport_connection_is_tx_paced (ctx->tc))
+    transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
 
   ctx->left_to_snd = ctx->max_len_to_snd;
   n_left = ctx->n_segs_per_evt;
@@ -1123,7 +1128,10 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
 
   if (!peek_data)
     {
-      if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd))
+      u32 n_dequeued = ctx->max_len_to_snd;
+      if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
+       n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
+      if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
        session_dequeue_notify (ctx->s);
     }
   return SESSION_TX_OK;
@@ -1160,7 +1168,7 @@ session_tx_fifo_dequeue_internal (session_worker_t * wrk,
   /* Clear custom-tx flag used to request reschedule for tx */
   s->flags &= ~SESSION_F_CUSTOM_TX;
 
-  sp->max_burst_size = clib_min (VLIB_FRAME_SIZE - *n_tx_packets,
+  sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
                                 TRANSPORT_PACER_MAX_BURST_PKTS);
 
   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
@@ -1455,7 +1463,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   old_ti = clib_llist_prev_index (old_he, evt_list);
 
   ei = clib_llist_next_index (new_he, evt_list);
-  while (ei != wrk->new_head && n_tx_packets < VLIB_FRAME_SIZE)
+  while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
     {
       elt = pool_elt_at_index (wrk->event_elts, ei);
       ei = clib_llist_next_index (elt, evt_list);
@@ -1474,7 +1482,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
       old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
       ei = clib_llist_next_index (old_he, evt_list);
 
-      while (n_tx_packets < VLIB_FRAME_SIZE)
+      while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
        {
          elt = pool_elt_at_index (wrk->event_elts, ei);
          next_ei = clib_llist_next_index (elt, evt_list);
@@ -1564,6 +1572,8 @@ session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
          session_queue_run_on_main (vm);
          break;
        case SESSION_Q_PROCESS_STOP:
+         vlib_node_set_state (vm, session_queue_process_node.index,
+                              VLIB_NODE_STATE_DISABLED);
          timeout = 100000.0;
          break;
        case ~0: