session: use msg queue for events
[vpp.git] / src / vnet / session / session_node.c
index aec353d..350282b 100644 (file)
@@ -575,15 +575,14 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
+  u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
   session_fifo_event_t *pending_events, *e;
   session_fifo_event_t *fifo_events;
-  u32 n_to_dequeue, n_events;
-  svm_queue_t *q;
-  application_t *app;
-  int n_tx_packets = 0;
-  u32 thread_index = vm->thread_index;
-  int i, rv;
+  svm_msg_q_msg_t _msg, *msg = &_msg;
   f64 now = vlib_time_now (vm);
+  int n_tx_packets = 0, i, rv;
+  application_t *app;
+  svm_msg_q_t *mq;
   void (*fp) (void *);
 
   SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
@@ -594,16 +593,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   transport_update_time (now, thread_index);
 
   /*
-   * Get vpp queue events
+   * Get vpp queue events that we can dequeue without blocking
    */
-  q = smm->vpp_event_queues[thread_index];
-  if (PREDICT_FALSE (q == 0))
-    return 0;
-
+  mq = smm->vpp_event_queues[thread_index];
   fifo_events = smm->free_event_vector[thread_index];
-
-  /* min number of events we can dequeue without blocking */
-  n_to_dequeue = q->cursize;
+  n_to_dequeue = svm_msg_q_size (mq);
   pending_events = smm->pending_event_vector[thread_index];
 
   if (!n_to_dequeue && !vec_len (pending_events)
@@ -624,21 +618,19 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
     }
 
   /* See you in the next life, don't be late
-   * XXX: we may need priorities here
-   */
-  if (pthread_mutex_trylock (&q->mutex))
+   * XXX: we may need priorities here */
+  if (svm_msg_q_try_lock (mq))
     return 0;
 
   for (i = 0; i < n_to_dequeue; i++)
     {
       vec_add2 (fifo_events, e, 1);
-      svm_queue_sub_raw (q, (u8 *) e);
+      svm_msg_q_sub_w_lock (mq, msg);
+      clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
+      svm_msg_q_free_msg (mq, msg);
     }
 
-  /* The other side of the connection is not polling */
-  if (q->cursize < (q->maxsize / 8))
-    (void) pthread_cond_broadcast (&q->condvar);
-  pthread_mutex_unlock (&q->mutex);
+  svm_msg_q_unlock (mq);
 
   vec_append (fifo_events, pending_events);
   vec_append (fifo_events, smm->pending_disconnects[thread_index]);
@@ -653,6 +645,7 @@ skip_dequeue:
     {
       stream_session_t *s;     /* $$$ prefetch 1 ahead maybe */
       session_fifo_event_t *e;
+      u32 to_dequeue;
 
       e = &fifo_events[i];
       switch (e->event_type)
@@ -671,6 +664,7 @@ skip_dequeue:
              clib_warning ("It's dead, Jim!");
              continue;
            }
+         to_dequeue = svm_fifo_max_dequeue (s->server_tx_fifo);
 
          /* Spray packets in per session type frames, since they go to
           * different nodes */
@@ -678,7 +672,10 @@ skip_dequeue:
                                                       &n_tx_packets);
          if (PREDICT_TRUE (rv == SESSION_TX_OK))
            {
-             session_dequeue_notify (s);
+             /* Notify app there's tx space if not polling */
+             if (PREDICT_FALSE (to_dequeue == s->server_tx_fifo->nitems
+                                && !svm_fifo_has_event (s->server_tx_fifo)))
+               session_dequeue_notify (s);
            }
          else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
            {
@@ -755,19 +752,20 @@ dump_thread_0_event_queue (void)
   vlib_main_t *vm = &vlib_global_main;
   u32 my_thread_index = vm->thread_index;
   session_fifo_event_t _e, *e = &_e;
+  svm_msg_q_ring_t *ring;
   stream_session_t *s0;
+  svm_msg_q_msg_t *msg;
+  svm_msg_q_t *mq;
   int i, index;
-  i8 *headp;
-
-  svm_queue_t *q;
-  q = smm->vpp_event_queues[my_thread_index];
 
-  index = q->head;
+  mq = smm->vpp_event_queues[my_thread_index];
+  index = mq->q->head;
 
-  for (i = 0; i < q->cursize; i++)
+  for (i = 0; i < mq->q->cursize; i++)
     {
-      headp = (i8 *) (&q->data[0] + q->elsize * index);
-      clib_memcpy (e, headp, q->elsize);
+      msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+      ring = svm_msg_q_ring (mq, msg->ring_index);
+      clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
 
       switch (e->event_type)
        {
@@ -800,7 +798,7 @@ dump_thread_0_event_queue (void)
 
       index++;
 
-      if (index == q->maxsize)
+      if (index == mq->q->maxsize)
        index = 0;
     }
 }
@@ -839,10 +837,11 @@ u8
 session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
-  svm_queue_t *q;
+  svm_msg_q_t *mq;
   session_fifo_event_t *pending_event_vector, *evt;
   int i, index, found = 0;
-  i8 *headp;
+  svm_msg_q_msg_t *msg;
+  svm_msg_q_ring_t *ring;
   u8 thread_index;
 
   ASSERT (e);
@@ -850,16 +849,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
   /*
    * Search evt queue
    */
-  q = smm->vpp_event_queues[thread_index];
-  index = q->head;
-  for (i = 0; i < q->cursize; i++)
+  mq = smm->vpp_event_queues[thread_index];
+  index = mq->q->head;
+  for (i = 0; i < mq->q->cursize; i++)
     {
-      headp = (i8 *) (&q->data[0] + q->elsize * index);
-      clib_memcpy (e, headp, q->elsize);
+      msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+      ring = svm_msg_q_ring (mq, msg->ring_index);
+      clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
       found = session_node_cmp_event (e, f);
       if (found)
        return 1;
-      if (++index == q->maxsize)
+      if (++index == mq->q->maxsize)
        index = 0;
     }
   /*