vcl: unset fifo events only in read/write 98/14298/5
authorFlorin Coras <fcoras@cisco.com>
Thu, 16 Aug 2018 21:57:31 +0000 (14:57 -0700)
committerDave Barach <openvpp@barachs.net>
Fri, 17 Aug 2018 14:46:57 +0000 (14:46 +0000)
Avoids accumulation of io event messages when the receiver is slow to
read.

Change-Id: I3a713d339a6ac0781c010be1fbad5a7963ab02c6
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/vcl/vppcom.c

index 8572b69..29936aa 100644 (file)
@@ -1339,12 +1339,11 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
              continue;
            }
          svm_fifo_unset_event (rx_fifo);
-         if (svm_fifo_is_empty (rx_fifo))
-           {
-             svm_msg_q_free_msg (mq, &msg);
-             continue;
-           }
          svm_msg_q_free_msg (mq, &msg);
+         if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
+           return 0;
+         if (svm_fifo_is_empty (rx_fifo))
+           continue;
          break;
        }
     }
@@ -1483,14 +1482,9 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
          rv = VPPCOM_EWOULDBLOCK;
          goto done;
        }
-      while (1)
+      while (svm_fifo_is_full (tx_fifo))
        {
          svm_msg_q_lock (mq);
-         if (!svm_fifo_is_full (tx_fifo))
-           {
-             svm_msg_q_unlock (mq);
-             break;
-           }
          while (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
            ;
          svm_msg_q_sub_w_lock (mq, &msg);
@@ -1499,18 +1493,8 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
 
          if (!vcl_is_tx_evt_for_session (e, session_index,
                                          s->our_evt_q != 0))
-           {
-             vcl_handle_mq_ctrl_event (e);
-             svm_msg_q_free_msg (mq, &msg);
-             continue;
-           }
-         if (svm_fifo_is_full (tx_fifo))
-           {
-             svm_msg_q_free_msg (mq, &msg);
-             continue;
-           }
+           vcl_handle_mq_ctrl_event (e);
          svm_msg_q_free_msg (mq, &msg);
-         break;
        }
     }
 
@@ -1680,9 +1664,6 @@ vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
        case FIFO_EVENT_APP_RX:
          sid = e->fifo->client_session_index;
          session = vcl_session_get (sid);
-         svm_fifo_unset_event (session->rx_fifo);
-         if (svm_fifo_is_empty (session->rx_fifo))
-           break;
          if (sid < n_bits && read_map)
            {
              clib_bitmap_set_no_check (read_map, sid, 1);
@@ -1692,7 +1673,7 @@ vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
        case FIFO_EVENT_APP_TX:
          sid = e->fifo->client_session_index;
          session = vcl_session_get (sid);
-         if (!session || svm_fifo_is_full (session->tx_fifo))
+         if (!session)
            break;
          if (sid < n_bits && write_map)
            {
@@ -1703,9 +1684,6 @@ vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
        case SESSION_IO_EVT_CT_TX:
          session = vcl_ct_session_get_from_fifo (e->fifo, 0);
          sid = vcl_session_index (session);
-         svm_fifo_unset_event (session->rx_fifo);
-         if (svm_fifo_is_empty (session->rx_fifo))
-           break;
          if (sid < n_bits && read_map)
            {
              clib_bitmap_set_no_check (read_map, sid, 1);
@@ -1716,7 +1694,7 @@ vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
        case SESSION_IO_EVT_CT_RX:
          session = vcl_ct_session_get_from_fifo (e->fifo, 1);
          sid = vcl_session_index (session);
-         if (!session || svm_fifo_is_full (session->tx_fifo))
+         if (!session)
            break;
          if (sid < n_bits && write_map)
            {
@@ -2267,7 +2245,7 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
        }
       else
        {
-         if (svm_msg_q_timedwait (mq, wait_for_time / 1e3) < 0)
+         if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
            {
              svm_msg_q_unlock (mq);
              return 0;
@@ -2290,13 +2268,9 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
          session_events = session->vep.ev.events;
          if (!(EPOLLIN & session->vep.ev.events))
            break;
-         svm_fifo_unset_event (session->rx_fifo);
-         if (!svm_fifo_is_empty (session->rx_fifo))
-           {
-             add_event = 1;
-             events[*num_ev].events |= EPOLLIN;
-             session_evt_data = session->vep.ev.data.u64;
-           }
+         add_event = 1;
+         events[*num_ev].events |= EPOLLIN;
+         session_evt_data = session->vep.ev.data.u64;
          break;
        case FIFO_EVENT_APP_TX:
          sid = e->fifo->client_session_index;
@@ -2304,12 +2278,9 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
          session_events = session->vep.ev.events;
          if (!(EPOLLOUT & session_events))
            break;
-         if (!svm_fifo_is_full (session->tx_fifo))
-           {
-             add_event = 1;
-             events[*num_ev].events |= EPOLLOUT;
-             session_evt_data = session->vep.ev.data.u64;
-           }
+         add_event = 1;
+         events[*num_ev].events |= EPOLLOUT;
+         session_evt_data = session->vep.ev.data.u64;
          break;
        case SESSION_IO_EVT_CT_TX:
          session = vcl_ct_session_get_from_fifo (e->fifo, 0);
@@ -2317,13 +2288,9 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
          session_events = session->vep.ev.events;
          if (!(EPOLLIN & session->vep.ev.events))
            break;
-         svm_fifo_unset_event (session->rx_fifo);
-         if (!svm_fifo_is_empty (session->rx_fifo))
-           {
-             add_event = 1;
-             events[*num_ev].events |= EPOLLIN;
-             session_evt_data = session->vep.ev.data.u64;
-           }
+         add_event = 1;
+         events[*num_ev].events |= EPOLLIN;
+         session_evt_data = session->vep.ev.data.u64;
          break;
        case SESSION_IO_EVT_CT_RX:
          session = vcl_ct_session_get_from_fifo (e->fifo, 1);
@@ -2331,12 +2298,9 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
          session_events = session->vep.ev.events;
          if (!(EPOLLOUT & session_events))
            break;
-         if (!svm_fifo_is_full (session->tx_fifo))
-           {
-             add_event = 1;
-             events[*num_ev].events |= EPOLLOUT;
-             session_evt_data = session->vep.ev.data.u64;
-           }
+         add_event = 1;
+         events[*num_ev].events |= EPOLLOUT;
+         session_evt_data = session->vep.ev.data.u64;
          break;
        case SESSION_CTRL_EVT_ACCEPTED:
          accepted_msg = (session_accepted_msg_t *) e->data;