vcl: register workers in order
[vpp.git] / src / vcl / vppcom.c
index 60d5eb3..6455508 100644 (file)
@@ -700,6 +700,7 @@ vppcom_app_create (char *app_name)
       svm_fifo_segment_main_init (vcl_cfg->segment_baseva,
                                  20 /* timeout in secs */ );
       pool_init_fixed (vcm->workers, vcl_cfg->max_workers);
+      clib_spinlock_init (&vcm->workers_lock);
       vcl_worker_alloc_and_init ();
     }
 
@@ -1580,6 +1581,14 @@ vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
   return n_msgs;
 }
 
+#define vcl_fifo_rx_evt_valid_or_break(_fifo)                  \
+if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                 \
+  {                                                            \
+    svm_fifo_unset_event (_fifo);                              \
+    if (svm_fifo_is_empty (_fifo))                             \
+       break;                                                  \
+  }                                                            \
+
 static int
 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
                      unsigned long n_bits, unsigned long *read_map,
@@ -1633,8 +1642,11 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
+         vcl_fifo_rx_evt_valid_or_break (e->fifo);
          sid = e->fifo->client_session_index;
          session = vcl_session_get (wrk, sid);
+         if (!session)
+           break;
          if (sid < n_bits && read_map)
            {
              clib_bitmap_set_no_check (read_map, sid, 1);
@@ -1653,13 +1665,10 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
            }
          break;
        case SESSION_IO_EVT_CT_TX:
-         if (svm_fifo_is_empty (e->fifo))
-           {
-             svm_fifo_unset_event (e->fifo);
-             if (svm_fifo_is_empty (e->fifo))
-               break;
-           }
+         vcl_fifo_rx_evt_valid_or_break (e->fifo);
          session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
+         if (!session)
+           break;
          sid = session->session_index;
          if (sid < n_bits && read_map)
            {
@@ -1669,9 +1678,9 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
          break;
        case SESSION_IO_EVT_CT_RX:
          session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
-         sid = session->session_index;
          if (!session)
            break;
+         sid = session->session_index;
          if (sid < n_bits && write_map)
            {
              clib_bitmap_set_no_check (write_map, sid, 1);
@@ -2224,6 +2233,7 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
+         vcl_fifo_rx_evt_valid_or_break (e->fifo);
          sid = e->fifo->client_session_index;
          session = vcl_session_get (wrk, sid);
          session_events = session->vep.ev.events;
@@ -2244,6 +2254,7 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
          session_evt_data = session->vep.ev.data.u64;
          break;
        case SESSION_IO_EVT_CT_TX:
+         vcl_fifo_rx_evt_valid_or_break (e->fifo);
          session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
          sid = session->session_index;
          session_events = session->vep.ev.events;
@@ -2417,6 +2428,9 @@ vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
     }
 
   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
+  if (!vep_session)
+    return VPPCOM_EBADFD;
+
   if (PREDICT_FALSE (!vep_session->is_vep))
     {
       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
@@ -3149,7 +3163,7 @@ vppcom_session_index (uint32_t session_handle)
 int
 vppcom_worker_register (void)
 {
-  if (!vcl_worker_alloc_and_init ())
+  if (vcl_worker_alloc_and_init ())
     return VPPCOM_OK;
   return VPPCOM_EEXIST;
 }