udp: refactor udp code
[vpp.git] / src / uri / uri_udp_test.c
index d559d57..27e70cf 100644 (file)
@@ -50,6 +50,7 @@
 typedef enum
 {
   STATE_START,
+  STATE_BOUND,
   STATE_READY,
   STATE_FAILED,
   STATE_DISCONNECTING,
@@ -97,6 +98,7 @@ typedef struct
 
   /* $$$$ hack: cut-through session index */
   volatile u32 cut_through_session_index;
+  volatile u32 connected_session;
 
   /* unique segment name counter */
   u32 unique_segment_index;
@@ -123,6 +125,7 @@ typedef struct
   /* convenience */
   svm_fifo_segment_main_t *segment_main;
 
+  u8 *connect_test_data;
 } uri_udp_test_main_t;
 
 #if CLIB_DEBUG > 0
@@ -163,7 +166,7 @@ void
 application_send_attach (uri_udp_test_main_t * utm)
 {
   vl_api_application_attach_t *bmp;
-  u32 fifo_size = 3 << 20;
+  u32 fifo_size = 1 << 20;
   bmp = vl_msg_api_alloc (sizeof (*bmp));
   memset (bmp, 0, sizeof (*bmp));
 
@@ -172,11 +175,12 @@ application_send_attach (uri_udp_test_main_t * utm)
   bmp->context = ntohl (0xfeedface);
   bmp->options[APP_OPTIONS_FLAGS] =
     APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT;
-  bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16;
+  bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 2;
   bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
   bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
   bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
   bmp->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 20;
+  bmp->options[APP_EVT_QUEUE_SIZE] = 16768;
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
 }
 
@@ -348,7 +352,7 @@ udp_client_connect (uri_udp_test_main_t * utm)
 }
 
 static void
-client_send (uri_udp_test_main_t * utm, session_t * session)
+client_send_cut_through (uri_udp_test_main_t * utm, session_t * session)
 {
   int i;
   u8 *test_data = 0;
@@ -391,7 +395,6 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
        }
 
       bytes_to_read = svm_fifo_max_dequeue (rx_fifo);
-
       bytes_to_read = vec_len (utm->rx_buf) > bytes_to_read ?
        bytes_to_read : vec_len (utm->rx_buf);
 
@@ -451,7 +454,114 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
 }
 
 static void
-uri_udp_client_test (uri_udp_test_main_t * utm)
+send_test_chunk (uri_udp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
+                u32 bytes)
+{
+  u8 *test_data = utm->connect_test_data;
+  u64 bytes_sent = 0;
+  int test_buf_offset = 0;
+  u32 bytes_to_snd;
+  u32 queue_max_chunk = 128 << 10, actual_write;
+  session_fifo_event_t evt;
+  int rv;
+
+  bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
+  if (bytes_to_snd > vec_len (test_data))
+    bytes_to_snd = vec_len (test_data);
+
+  while (bytes_to_snd > 0 && !utm->time_to_stop)
+    {
+      actual_write = (bytes_to_snd > queue_max_chunk) ?
+       queue_max_chunk : bytes_to_snd;
+      rv = svm_fifo_enqueue_nowait (tx_fifo, actual_write,
+                                   test_data + test_buf_offset);
+
+      if (rv > 0)
+       {
+         bytes_to_snd -= rv;
+         test_buf_offset += rv;
+         bytes_sent += rv;
+
+         if (svm_fifo_set_event (tx_fifo))
+           {
+             /* Fabricate TX event, send to vpp */
+             evt.fifo = tx_fifo;
+             evt.event_type = FIFO_EVENT_APP_TX;
+
+             unix_shared_memory_queue_add (utm->vpp_event_queue,
+                                           (u8 *) & evt,
+                                           0 /* do wait for mutex */ );
+           }
+       }
+    }
+}
+
+static void
+recv_test_chunk (uri_udp_test_main_t * utm, session_t * session)
+{
+  svm_fifo_t *rx_fifo;
+  int buffer_offset, bytes_to_read = 0, rv;
+
+  rx_fifo = session->server_rx_fifo;
+  bytes_to_read = svm_fifo_max_dequeue (rx_fifo);
+  bytes_to_read =
+    vec_len (utm->rx_buf) > bytes_to_read ?
+    bytes_to_read : vec_len (utm->rx_buf);
+
+  buffer_offset = 0;
+  while (bytes_to_read > 0)
+    {
+      rv = svm_fifo_dequeue_nowait (rx_fifo, bytes_to_read,
+                                   utm->rx_buf + buffer_offset);
+      if (rv > 0)
+       {
+         bytes_to_read -= rv;
+         buffer_offset += rv;
+       }
+    }
+}
+
+void
+client_send_data (uri_udp_test_main_t * utm)
+{
+  u8 *test_data;
+  int mypid = getpid ();
+  session_t *session;
+  svm_fifo_t *tx_fifo;
+  u32 n_iterations;
+  int i;
+
+  vec_validate (utm->connect_test_data, 64 * 1024 - 1);
+  for (i = 0; i < vec_len (utm->connect_test_data); i++)
+    utm->connect_test_data[i] = i & 0xff;
+
+  test_data = utm->connect_test_data;
+  session = pool_elt_at_index (utm->sessions, utm->connected_session);
+  tx_fifo = session->server_tx_fifo;
+
+  ASSERT (vec_len (test_data) > 0);
+
+  vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+  n_iterations = NITER;
+
+  for (i = 0; i < n_iterations; i++)
+    {
+      send_test_chunk (utm, tx_fifo, mypid, 0);
+      recv_test_chunk (utm, session);
+      if (utm->time_to_stop)
+       break;
+    }
+
+  f64 timeout = clib_time_now (&utm->clib_time) + 5;
+  while (clib_time_now (&utm->clib_time) < timeout)
+    {
+      recv_test_chunk (utm, session);
+    }
+
+}
+
+static void
+client_test (uri_udp_test_main_t * utm)
 {
   session_t *session;
 
@@ -464,10 +574,18 @@ uri_udp_client_test (uri_udp_test_main_t * utm)
       return;
     }
 
-  /* Only works with cut through sessions */
-  session = pool_elt_at_index (utm->sessions, utm->cut_through_session_index);
+  if (utm->cut_through_session_index != ~0)
+    {
+      session = pool_elt_at_index (utm->sessions,
+                                  utm->cut_through_session_index);
+      client_send_cut_through (utm, session);
+    }
+  else
+    {
+      session = pool_elt_at_index (utm->sessions, utm->connected_session);
+      client_send_data (utm);
+    }
 
-  client_send (utm, session);
   application_detach (utm);
 }
 
@@ -483,7 +601,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
       return;
     }
 
-  utm->state = STATE_READY;
+  utm->state = STATE_BOUND;
 }
 
 static void
@@ -492,6 +610,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
   svm_fifo_segment_create_args_t _a, *a = &_a;
   int rv;
 
+  memset (a, 0, sizeof (*a));
   a->segment_name = (char *) mp->segment_name;
   a->segment_size = mp->segment_size;
   /* Attach to the segment vpp created */
@@ -625,8 +744,6 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   hash_set (utm->session_index_by_vpp_handles, mp->handle,
            session - utm->sessions);
 
-  utm->state = STATE_READY;
-
   if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
     {
       f64 now = clib_time_now (&utm->clib_time);
@@ -639,7 +756,11 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   memset (rmp, 0, sizeof (*rmp));
   rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
   rmp->handle = mp->handle;
+  rmp->context = mp->context;
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
+
+  CLIB_MEMORY_BARRIER ();
+  utm->state = STATE_READY;
 }
 
 static void
@@ -677,16 +798,22 @@ static void
 vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
 {
   uri_udp_test_main_t *utm = &uri_udp_test_main;
+  session_t *session;
 
   ASSERT (utm->i_am_master == 0);
 
+  if (mp->retval)
+    {
+      clib_warning ("failed connect");
+      return;
+    }
+
   /* We've been redirected */
   if (mp->segment_name_length > 0)
     {
       svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
       svm_fifo_segment_create_args_t _a, *a = &_a;
       u32 segment_index;
-      session_t *session;
       svm_fifo_segment_private_t *seg;
       int rv;
 
@@ -707,20 +834,24 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
       vec_add2 (utm->seg, seg, 1);
       memcpy (seg, sm->segments + segment_index, sizeof (*seg));
       sleep (1);
-
-      pool_get (utm->sessions, session);
-      utm->cut_through_session_index = session - utm->sessions;
-
-      session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo,
-                                                 svm_fifo_t *);
-      ASSERT (session->server_rx_fifo);
-      session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo,
-                                                 svm_fifo_t *);
-      ASSERT (session->server_tx_fifo);
     }
 
-  /* security: could unlink /dev/shm/<mp->segment_name> here, maybe */
+  pool_get (utm->sessions, session);
+  session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo,
+                                             svm_fifo_t *);
+  ASSERT (session->server_rx_fifo);
+  session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo,
+                                             svm_fifo_t *);
+  ASSERT (session->server_tx_fifo);
 
+  if (mp->segment_name_length > 0)
+    utm->cut_through_session_index = session - utm->sessions;
+  else
+    {
+      utm->connected_session = session - utm->sessions;
+      utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+                                              unix_shared_memory_queue_t *);
+    }
   utm->state = STATE_READY;
 }
 
@@ -789,13 +920,13 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
 {
   svm_fifo_t *rx_fifo, *tx_fifo;
   int nbytes;
-
   session_fifo_event_t evt;
   unix_shared_memory_queue_t *q;
   int rv;
 
   rx_fifo = e->fifo;
   tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
+  svm_fifo_unset_event (rx_fifo);
 
   do
     {
@@ -809,13 +940,11 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
     }
   while (rv == -2);
 
-  /* Fabricate TX event, send to vpp */
-  evt.fifo = tx_fifo;
-  evt.event_type = FIFO_EVENT_APP_TX;
-  evt.event_id = e->event_id;
-
   if (svm_fifo_set_event (tx_fifo))
     {
+      /* Fabricate TX event, send to vpp */
+      evt.fifo = tx_fifo;
+      evt.event_type = FIFO_EVENT_APP_TX;
       q = utm->vpp_event_queue;
       unix_shared_memory_queue_add (q, (u8 *) & evt,
                                    0 /* do wait for mutex */ );
@@ -827,6 +956,9 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
 {
   session_fifo_event_t _e, *e = &_e;
 
+  while (utm->state != STATE_READY)
+    sleep (5);
+
   while (1)
     {
       unix_shared_memory_queue_sub (utm->our_event_queue, (u8 *) e,
@@ -845,7 +977,7 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
          break;
        }
       if (PREDICT_FALSE (utm->time_to_stop == 1))
-       break;
+       return;
       if (PREDICT_FALSE (utm->time_to_print_stats == 1))
        {
          utm->time_to_print_stats = 0;
@@ -869,7 +1001,7 @@ server_unbind (uri_udp_test_main_t * utm)
 }
 
 static void
-server_listen (uri_udp_test_main_t * utm)
+server_bind (uri_udp_test_main_t * utm)
 {
   vl_api_bind_uri_t *bmp;
 
@@ -890,11 +1022,11 @@ udp_server_test (uri_udp_test_main_t * utm)
   application_send_attach (utm);
 
   /* Bind to uri */
-  server_listen (utm);
+  server_bind (utm);
 
-  if (wait_for_state_change (utm, STATE_READY))
+  if (wait_for_state_change (utm, STATE_BOUND))
     {
-      clib_warning ("timeout waiting for STATE_READY");
+      clib_warning ("timeout waiting for STATE_BOUND");
       return;
     }
 
@@ -976,7 +1108,7 @@ main (int argc, char **argv)
   utm->i_am_master = i_am_master;
   utm->segment_main = &svm_fifo_segment_main;
 
-  utm->connect_uri = format (0, "udp://6.0.0.1/1234%c", 0);
+  utm->connect_uri = format (0, "udp://6.0.1.2/1234%c", 0);
 
   setup_signal_handlers ();
 
@@ -991,7 +1123,7 @@ main (int argc, char **argv)
 
   if (i_am_master == 0)
     {
-      uri_udp_client_test (utm);
+      client_test (utm);
       exit (0);
     }