vcl: add read/write udp support
[vpp.git] / src / vnet / session-apps / echo_server.c
index 85e6c29..14ab36d 100644 (file)
@@ -23,7 +23,7 @@ typedef struct
   /*
    * Server app parameters
    */
-  svm_queue_t **vpp_queue;
+  svm_msg_q_t **vpp_queue;
   svm_queue_t *vl_input_queue; /**< Sever's event queue */
 
   u32 app_index;               /**< Server app index */
@@ -63,6 +63,7 @@ echo_server_session_accept_callback (stream_session_t * s)
     session_manager_get_vpp_event_queue (s->thread_index);
   s->session_state = SESSION_STATE_READY;
   esm->byte_index = 0;
+  ASSERT (vec_len (esm->rx_retries) > s->thread_index);
   vec_validate (esm->rx_retries[s->thread_index], s->session_index);
   esm->rx_retries[s->thread_index][s->session_index] = 0;
   return 0;
@@ -144,10 +145,8 @@ echo_server_rx_callback (stream_session_t * s)
   int actual_transfer;
   svm_fifo_t *tx_fifo, *rx_fifo;
   echo_server_main_t *esm = &echo_server_main;
-  session_fifo_event_t evt;
   u32 thread_index = vlib_get_thread_index ();
   app_session_transport_t at;
-  svm_queue_t *q;
 
   ASSERT (s->thread_index == thread_index);
 
@@ -169,8 +168,9 @@ echo_server_rx_callback (stream_session_t * s)
       max_dequeue = ph.data_length - ph.data_offset;
       if (!esm->vpp_queue[s->thread_index])
        {
-         q = session_manager_get_vpp_event_queue (s->thread_index);
-         esm->vpp_queue[s->thread_index] = q;
+         svm_msg_q_t *mq;
+         mq = session_manager_get_vpp_event_queue (s->thread_index);
+         esm->vpp_queue[s->thread_index] = mq;
        }
       max_enqueue -= sizeof (session_dgram_hdr_t);
     }
@@ -190,13 +190,7 @@ echo_server_rx_callback (stream_session_t * s)
       /* Program self-tap to retry */
       if (svm_fifo_set_event (rx_fifo))
        {
-         evt.fifo = rx_fifo;
-         evt.event_type = FIFO_EVENT_BUILTIN_RX;
-
-         q = esm->vpp_queue[s->thread_index];
-         if (PREDICT_FALSE (q->cursize == q->maxsize))
-           clib_warning ("out of event queue space");
-         else if (svm_queue_add (q, (u8 *) & evt, 0))
+         if (session_send_io_evt_to_thread (rx_fifo, FIFO_EVENT_BUILTIN_RX))
            clib_warning ("failed to enqueue self-tap");
 
          vec_validate (esm->rx_retries[s->thread_index], s->session_index);
@@ -217,14 +211,16 @@ echo_server_rx_callback (stream_session_t * s)
       actual_transfer = app_recv_stream_raw (rx_fifo,
                                             esm->rx_buf[thread_index],
                                             max_transfer,
-                                            0 /* don't clear event */ );
+                                            0 /* don't clear event */ ,
+                                            0 /* peek */ );
     }
   else
     {
       actual_transfer = app_recv_dgram_raw (rx_fifo,
                                            esm->rx_buf[thread_index],
                                            max_transfer, &at,
-                                           0 /* don't clear event */ );
+                                           0 /* don't clear event */ ,
+                                           0 /* peek */ );
     }
   ASSERT (actual_transfer == max_transfer);
   /* test_bytes (esm, actual_transfer); */
@@ -238,14 +234,14 @@ echo_server_rx_callback (stream_session_t * s)
       n_written = app_send_stream_raw (tx_fifo,
                                       esm->vpp_queue[thread_index],
                                       esm->rx_buf[thread_index],
-                                      actual_transfer, 0);
+                                      actual_transfer, FIFO_EVENT_APP_TX, 0);
     }
   else
     {
       n_written = app_send_dgram_raw (tx_fifo, &at,
                                      esm->vpp_queue[s->thread_index],
                                      esm->rx_buf[thread_index],
-                                     actual_transfer, 0);
+                                     actual_transfer, FIFO_EVENT_APP_TX, 0);
     }
 
   if (n_written != max_transfer)
@@ -391,7 +387,9 @@ echo_server_create (vlib_main_t * vm, u8 * appns_id, u64 appns_flags,
   vec_validate (echo_server_main.vpp_queue, num_threads - 1);
   vec_validate (esm->rx_buf, num_threads - 1);
   vec_validate (esm->rx_retries, num_threads - 1);
-
+  for (i = 0; i < vec_len (esm->rx_retries); i++)
+    vec_validate (esm->rx_retries[i],
+                 pool_elts (session_manager_main.sessions[i]));
   esm->rcv_buffer_size = clib_max (esm->rcv_buffer_size, esm->fifo_size);
   for (i = 0; i < num_threads; i++)
     vec_validate (esm->rx_buf[i], esm->rcv_buffer_size);