session svm: non blocking mq
[vpp.git] / src / plugins / hs_apps / sapi / vpp_echo.c
index 4cce41e..816b7d4 100644 (file)
@@ -69,9 +69,9 @@ echo_session_dequeue_notify (echo_session_t * s)
   int rv;
   if (!svm_fifo_set_event (s->rx_fifo))
     return;
-  if ((rv =
-       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index,
-                              SESSION_IO_EVT_RX, SVM_Q_WAIT)))
+  if ((rv = app_send_io_evt_to_vpp (s->vpp_evt_q,
+                                   s->rx_fifo->shr->master_session_index,
+                                   SESSION_IO_EVT_RX, SVM_Q_WAIT)))
     ECHO_FAIL (ECHO_FAIL_SEND_IO_EVT, "app_send_io_evt_to_vpp errored %d",
               rv);
   svm_fifo_clear_deq_ntf (s->rx_fifo);
@@ -542,7 +542,6 @@ session_accepted_handler (session_accepted_msg_t * mp)
 {
   app_session_evt_t _app_evt, *app_evt = &_app_evt;
   session_accepted_reply_msg_t *rmp;
-  svm_fifo_t *rx_fifo, *tx_fifo;
   echo_main_t *em = &echo_main;
   echo_session_t *session, *ls;
 
@@ -552,25 +551,21 @@ session_accepted_handler (session_accepted_msg_t * mp)
                 "Unknown listener handle 0x%lx", mp->listener_handle);
       return;
     }
-  if (echo_segment_lookup (mp->segment_handle) == ~0)
+
+  /* Allocate local session and set it up */
+  session = echo_session_new (em);
+
+  if (echo_attach_session (mp->segment_handle, mp->server_rx_fifo,
+                          mp->server_tx_fifo, mp->vpp_event_queue_address,
+                          session))
     {
       ECHO_FAIL (ECHO_FAIL_ACCEPTED_WAIT_FOR_SEG_ALLOC,
                 "accepted wait_for_segment_allocation errored");
       return;
     }
 
-  /* Allocate local session and set it up */
-  session = echo_session_new (em);
   session->vpp_session_handle = mp->handle;
 
-  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session->session_index;
-  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session->session_index;
-
-  session->rx_fifo = rx_fifo;
-  session->tx_fifo = tx_fifo;
-
   /* session->transport needed by app_send_dgram */
   clib_memcpy_fast (&session->transport.rmt_ip, &mp->rmt.ip,
                    sizeof (ip46_address_t));
@@ -581,10 +576,8 @@ session_accepted_handler (session_accepted_msg_t * mp)
   session->transport.lcl_port = em->uri_elts.port;
 
   session->vpp_session_handle = mp->handle;
-  session->start = clib_time_now (&em->clib_time);
-  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_msg_q_t *);
   session->listener_index = ls->session_index;
+  session->start = clib_time_now (&em->clib_time);
 
   /* Add it to lookup table */
   ECHO_LOG (2, "Accepted session 0x%lx S[%u] -> 0x%lx S[%u]",
@@ -607,7 +600,6 @@ session_connected_handler (session_connected_msg_t * mp)
   echo_main_t *em = &echo_main;
   echo_session_t *session;
   u32 listener_index = htonl (mp->context);
-  svm_fifo_t *rx_fifo, *tx_fifo;
 
   clib_atomic_add_fetch (&em->max_sim_connects, 1);
 
@@ -621,24 +613,18 @@ session_connected_handler (session_connected_msg_t * mp)
     }
 
   session = echo_session_new (em);
-  if (echo_segment_lookup (mp->segment_handle) == ~0)
+
+  if (echo_attach_session (mp->segment_handle, mp->server_rx_fifo,
+                          mp->server_tx_fifo, mp->vpp_event_queue_address,
+                          session))
     {
       ECHO_FAIL (ECHO_FAIL_CONNECTED_WAIT_FOR_SEG_ALLOC,
                 "connected wait_for_segment_allocation errored");
       return;
     }
 
-  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session->session_index;
-  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session->session_index;
-
-  session->rx_fifo = rx_fifo;
-  session->tx_fifo = tx_fifo;
   session->vpp_session_handle = mp->handle;
   session->start = clib_time_now (&em->clib_time);
-  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_msg_q_t *);
   session->listener_index = listener_index;
   /* session->transport needed by app_send_dgram */
   clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip,
@@ -818,19 +804,16 @@ echo_process_rpcs (echo_main_t * em)
 {
   echo_rpc_msg_t *rpc;
   svm_msg_q_msg_t msg;
-  svm_msg_q_t *mq = em->rpc_msq_queue;
+  svm_msg_q_t *mq = &em->rpc_msq_queue;
 
   while (em->state < STATE_DATA_DONE && !em->time_to_stop)
     {
-      svm_msg_q_lock (mq);
       if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 1))
        {
-         svm_msg_q_unlock (mq);
          continue;
        }
-      svm_msg_q_sub_w_lock (mq, &msg);
+      svm_msg_q_sub_raw (mq, &msg);
       rpc = svm_msg_q_msg_data (mq, &msg);
-      svm_msg_q_unlock (mq);
       ((echo_rpc_t) rpc->fp) (em, &rpc->args);
       svm_msg_q_free_msg (mq, &msg);
     }
@@ -890,18 +873,15 @@ echo_mq_thread_fn (void *arg)
       if (em->periodic_stats_delta)
        echo_print_periodic_stats (em);
 
-      svm_msg_q_lock (mq);
       if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 1))
        {
-         svm_msg_q_unlock (mq);
          continue;
        }
       for (i = 0; i < svm_msg_q_size (mq); i++)
        {
          vec_add2 (msg_vec, msg, 1);
-         svm_msg_q_sub_w_lock (mq, msg);
+         svm_msg_q_sub_raw (mq, msg);
        }
-      svm_msg_q_unlock (mq);
 
       for (i = 0; i < vec_len (msg_vec); i++)
        {
@@ -1333,7 +1313,7 @@ main (int argc, char **argv)
   cfg->n_rings = 1;
   cfg->q_nitems = rpc_queue_size;
   cfg->ring_cfgs = rc;
-  em->rpc_msq_queue = svm_msg_q_alloc (cfg);
+  svm_msg_q_attach (&em->rpc_msq_queue, svm_msg_q_alloc (cfg));
 
   signal (SIGINT, stop_signal);
   signal (SIGQUIT, stop_signal);