session svm: non blocking mq
[vpp.git] / src / plugins / hs_apps / sapi / vpp_echo.c
index a47a4d4..816b7d4 100644 (file)
@@ -556,16 +556,14 @@ session_accepted_handler (session_accepted_msg_t * mp)
   session = echo_session_new (em);
 
   if (echo_attach_session (mp->segment_handle, mp->server_rx_fifo,
-                          mp->server_tx_fifo, session))
+                          mp->server_tx_fifo, mp->vpp_event_queue_address,
+                          session))
     {
       ECHO_FAIL (ECHO_FAIL_ACCEPTED_WAIT_FOR_SEG_ALLOC,
                 "accepted wait_for_segment_allocation errored");
       return;
     }
 
-  session->vpp_evt_q =
-    uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
-
   session->vpp_session_handle = mp->handle;
 
   /* session->transport needed by app_send_dgram */
@@ -617,14 +615,14 @@ session_connected_handler (session_connected_msg_t * mp)
   session = echo_session_new (em);
 
   if (echo_attach_session (mp->segment_handle, mp->server_rx_fifo,
-                          mp->server_tx_fifo, session))
+                          mp->server_tx_fifo, mp->vpp_event_queue_address,
+                          session))
     {
       ECHO_FAIL (ECHO_FAIL_CONNECTED_WAIT_FOR_SEG_ALLOC,
                 "connected wait_for_segment_allocation errored");
       return;
     }
-  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_msg_q_t *);
+
   session->vpp_session_handle = mp->handle;
   session->start = clib_time_now (&em->clib_time);
   session->listener_index = listener_index;
@@ -806,19 +804,16 @@ echo_process_rpcs (echo_main_t * em)
 {
   echo_rpc_msg_t *rpc;
   svm_msg_q_msg_t msg;
-  svm_msg_q_t *mq = em->rpc_msq_queue;
+  svm_msg_q_t *mq = &em->rpc_msq_queue;
 
   while (em->state < STATE_DATA_DONE && !em->time_to_stop)
     {
-      svm_msg_q_lock (mq);
       if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 1))
        {
-         svm_msg_q_unlock (mq);
          continue;
        }
-      svm_msg_q_sub_w_lock (mq, &msg);
+      svm_msg_q_sub_raw (mq, &msg);
       rpc = svm_msg_q_msg_data (mq, &msg);
-      svm_msg_q_unlock (mq);
       ((echo_rpc_t) rpc->fp) (em, &rpc->args);
       svm_msg_q_free_msg (mq, &msg);
     }
@@ -878,18 +873,15 @@ echo_mq_thread_fn (void *arg)
       if (em->periodic_stats_delta)
        echo_print_periodic_stats (em);
 
-      svm_msg_q_lock (mq);
       if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 1))
        {
-         svm_msg_q_unlock (mq);
          continue;
        }
       for (i = 0; i < svm_msg_q_size (mq); i++)
        {
          vec_add2 (msg_vec, msg, 1);
-         svm_msg_q_sub_w_lock (mq, msg);
+         svm_msg_q_sub_raw (mq, msg);
        }
-      svm_msg_q_unlock (mq);
 
       for (i = 0; i < vec_len (msg_vec); i++)
        {
@@ -1321,7 +1313,7 @@ main (int argc, char **argv)
   cfg->n_rings = 1;
   cfg->q_nitems = rpc_queue_size;
   cfg->ring_cfgs = rc;
-  em->rpc_msq_queue = svm_msg_q_alloc (cfg);
+  svm_msg_q_attach (&em->rpc_msq_queue, svm_msg_q_alloc (cfg));
 
   signal (SIGINT, stop_signal);
   signal (SIGQUIT, stop_signal);