hsa: add setting desired throughput option for test echo client 34/42834/8
authorSemir Sionek <[email protected]>
Thu, 17 Apr 2025 13:46:50 +0000 (13:46 +0000)
committerFlorin Coras <[email protected]>
Fri, 25 Apr 2025 19:21:29 +0000 (19:21 +0000)
Introduce the throughput setting for the echo client, which enables
pacing out the sending of data.

Type: improvement
Change-Id: I337d6d7e3995faf246776edc88c2121cf0ff1b59
Signed-off-by: Semir Sionek <[email protected]>
extras/hs-test/echo_test.go
extras/hs-test/infra/hst_suite.go
src/plugins/hs_apps/echo_client.c
src/plugins/hs_apps/echo_client.h

index 8d69c00..27de77c 100644 (file)
@@ -1,11 +1,14 @@
 package main
 
 import (
+       "regexp"
+       "strconv"
+
        . "fd.io/hs-test/infra"
 )
 
 func init() {
-       RegisterVethTests(EchoBuiltinTest)
+       RegisterVethTests(EchoBuiltinTest, EchoBuiltinBandwidthTest)
        RegisterSoloVethTests(TcpWithLossTest)
 }
 
@@ -24,6 +27,35 @@ func EchoBuiltinTest(s *VethsSuite) {
        s.AssertNotContains(o, "failed:")
 }
 
+func EchoBuiltinBandwidthTest(s *VethsSuite) {
+       regex := regexp.MustCompile(`gbytes\) in (\d+\.\d+) seconds`)
+       serverVpp := s.Containers.ServerVpp.VppInstance
+
+       serverVpp.Vppctl("test echo server " +
+               " uri tcp://" + s.Interfaces.Server.Ip4AddressString() + "/1234")
+
+       clientVpp := s.Containers.ClientVpp.VppInstance
+
+       o := clientVpp.Vppctl("test echo client nclients 4 bytes 8m throughput 16m" +
+               " uri tcp://" + s.Interfaces.Server.Ip4AddressString() + "/1234")
+       s.Log(o)
+       s.AssertContains(o, "Test started")
+       s.AssertContains(o, "Test finished")
+       if regex.MatchString(o) {
+               matches := regex.FindStringSubmatch(o)
+               if len(matches) != 0 {
+                       seconds, _ := strconv.ParseFloat(matches[1], 32)
+                       // Make sure that we are within 0.1 of the targeted
+                       // 2 seconds of runtime
+                       s.AssertEqualWithinThreshold(seconds, 2, 0.1)
+               } else {
+                       s.AssertEmpty("invalid echo test client output")
+               }
+       } else {
+               s.AssertEmpty("invalid echo test client output")
+       }
+}
+
 func TcpWithLossTest(s *VethsSuite) {
        serverVpp := s.Containers.ServerVpp.VppInstance
 
index ee6c6c2..b3ad70c 100644 (file)
@@ -375,6 +375,10 @@ func (s *HstSuite) AssertGreaterThan(actual, expected interface{}, msgAndArgs ..
        ExpectWithOffset(2, actual).Should(BeNumerically(">=", expected), msgAndArgs...)
 }
 
+func (s *HstSuite) AssertEqualWithinThreshold(actual, expected, threshold interface{}, msgAndArgs ...interface{}) {
+       ExpectWithOffset(2, actual).Should(BeNumerically("~", expected, threshold), msgAndArgs...)
+}
+
 func (s *HstSuite) AssertTimeEqualWithinThreshold(actual, expected time.Time, threshold time.Duration, msgAndArgs ...interface{}) {
        ExpectWithOffset(2, actual).Should(BeTemporally("~", expected, threshold), msgAndArgs...)
 }
index 1731244..1c0e49c 100644 (file)
@@ -92,6 +92,8 @@ send_data_chunk (ec_main_t *ecm, ec_session_t *es)
     bytes_to_send = clib_min (svm_fifo_max_enqueue_prod (f), max_burst);
   else
     bytes_to_send = clib_min (es->bytes_to_send, max_burst);
+  if (ecm->throughput)
+    bytes_to_send = clib_min (es->bytes_paced_current, bytes_to_send);
   test_buf_offset = es->bytes_sent % test_buf_len;
 
   bytes_this_chunk = clib_min (test_buf_len - test_buf_offset, bytes_to_send);
@@ -141,7 +143,8 @@ send_data_chunk (ec_main_t *ecm, ec_session_t *es)
       else
        {
          bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue);
-         bytes_this_chunk = clib_min (bytes_this_chunk, 1460);
+         if (!ecm->throughput)
+           bytes_this_chunk = clib_min (bytes_this_chunk, 1460);
          rv =
            app_send_dgram ((app_session_t *) es, test_data + test_buf_offset,
                            bytes_this_chunk, 0);
@@ -157,6 +160,11 @@ send_data_chunk (ec_main_t *ecm, ec_session_t *es)
        es->bytes_to_receive += rv;
       else
        es->bytes_to_send -= rv;
+      if (ecm->throughput)
+       {
+         es->bytes_paced_current -= rv;
+         es->bytes_paced_current += es->bytes_paced_target;
+       }
 
       if (ecm->cfg.verbose)
        {
@@ -236,6 +244,7 @@ ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
 {
   u32 *conn_indices, *conns_this_batch, nconns_this_batch;
   int thread_index = vm->thread_index, i, delete_session;
+  f64 time_now;
   ec_main_t *ecm = &ec_main;
   ec_worker_t *wrk;
   ec_session_t *es;
@@ -285,18 +294,23 @@ ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
       ecm->repeats = 0;
     }
 
+  time_now = vlib_time_now (ecm->vlib_main);
   /*
    * Handle connections in this batch
    */
   for (i = 0; i < vec_len (conns_this_batch); i++)
     {
       es = ec_session_get (wrk, conns_this_batch[i]);
+      if (ecm->throughput && time_now < es->time_to_send)
+       continue;
 
       delete_session = 1;
 
       if (es->bytes_to_send > 0)
        {
          send_data_chunk (ecm, es);
+         if (ecm->throughput)
+           es->time_to_send += ecm->pacing_window_len;
          delete_session = 0;
        }
 
@@ -334,6 +348,8 @@ ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
              signal_evt_to_cli (EC_CLI_TEST_DONE);
            }
        }
+      if (ecm->throughput)
+       time_now = vlib_time_now (vm);
     }
 
   wrk->conn_indices = conn_indices;
@@ -378,6 +394,8 @@ ec_reset_runtime_config (ec_main_t *ecm)
   ecm->syn_timeout = 20.0;
   ecm->test_timeout = 20.0;
   ecm->run_time = 0;
+  ecm->throughput = 0;
+  ecm->pacing_window_len = 1;
   vec_free (ecm->connect_uri);
 }
 
@@ -484,7 +502,8 @@ ec_cleanup (ec_main_t *ecm)
 
   vec_free (ecm->connect_uri);
   vec_free (ecm->appns_id);
-
+  if (ecm->throughput)
+    ecm->pacing_window_len = 1;
   if (ecm->barrier_acq_needed)
     vlib_worker_thread_barrier_sync (ecm->vlib_main);
 }
@@ -626,6 +645,41 @@ quic_ec_session_connected_callback (u32 app_index, u32 api_context,
   return 0;
 }
 
+static void
+ec_calc_tput (ec_main_t *ecm)
+{
+  vlib_main_t *vm = vlib_get_main ();
+  ec_worker_t *wrk;
+  ec_session_t *sess;
+  f64 pacing_base;
+  u64 bytes_paced_target;
+  /* periodic writes larger than this clog up the fifo */
+  const u64 target_size_threshold = 4344;
+
+  /* find a suitable pacing window length & data chunk size */
+  bytes_paced_target =
+    ecm->throughput * ecm->pacing_window_len / ecm->n_clients;
+  while (bytes_paced_target > target_size_threshold)
+    {
+      ecm->pacing_window_len /= 2;
+      bytes_paced_target /= 2;
+    }
+
+  /* order sessions to shoot out data sequentially */
+  pacing_base = vlib_time_now (vm) - ecm->pacing_window_len;
+  vec_foreach (wrk, ecm->wrk)
+    {
+      vec_foreach (sess, wrk->sessions)
+       {
+         sess->time_to_send =
+           pacing_base + ecm->pacing_window_len / ecm->n_clients;
+         pacing_base = sess->time_to_send;
+         sess->bytes_paced_target = bytes_paced_target;
+         sess->bytes_paced_current = bytes_paced_target;
+       }
+    }
+}
+
 static int
 ec_session_connected_callback (u32 app_index, u32 api_context, session_t *s,
                               session_error_t err)
@@ -666,12 +720,16 @@ ec_session_connected_callback (u32 app_index, u32 api_context, session_t *s,
   es->bytes_to_receive = ecm->echo_bytes ? ecm->bytes_to_send : 0ULL;
   es->vpp_session_handle = session_handle (s);
   es->vpp_session_index = s->session_index;
+  es->bytes_paced_target = ~0;
+  es->bytes_paced_current = ~0;
   s->opaque = es->session_index;
 
   vec_add1 (wrk->conn_indices, es->session_index);
   clib_atomic_fetch_add (&ecm->ready_connections, 1);
   if (ecm->ready_connections == ecm->expected_connections)
     {
+      if (ecm->throughput)
+       ec_calc_tput (ecm);
       ecm->run_test = EC_RUNNING;
       /* Signal the CLI process that the action is starting... */
       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
@@ -1129,6 +1187,9 @@ ec_command_fn (vlib_main_t *vm, unformat_input_t *input,
       else if (unformat (line_input, "private-segment-size %U",
                         unformat_memory_size, &ecm->private_segment_size))
        ;
+      else if (unformat (line_input, "throughput %U", unformat_memory_size,
+                        &ecm->throughput))
+       ;
       else if (unformat (line_input, "preallocate-fifos"))
        ecm->prealloc_fifos = 1;
       else if (unformat (line_input, "preallocate-sessions"))
@@ -1248,7 +1309,6 @@ parse_config:
        clib_error_return (0, "failed: unexpected event(2): %d", event_type);
       goto stop_test;
     }
-
   /* Testing officially starts now */
   ecm->test_start_time = vlib_time_now (ecm->vlib_main);
   ec_cli ("Test started at %.6f", ecm->test_start_time);
@@ -1362,7 +1422,7 @@ VLIB_CLI_COMMAND (ec_command, static) = {
     "[run-time <time>][syn-timeout <time>][echo-bytes][fifo-size <size>]"
     "[private-segment-count <count>][private-segment-size <bytes>[m|g]]"
     "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]"
-    "[uri <tcp://ip/port>][test-bytes][verbose]",
+    "[throughput <bytes>[m|g]][uri <tcp://ip/port>][test-bytes][verbose]",
   .function = ec_command_fn,
   .is_mp_safe = 1,
 };
index b8744fe..d928a4e 100644 (file)
@@ -35,6 +35,9 @@ typedef struct ec_session_
   u64 bytes_to_receive;
   u64 bytes_received;
   u64 vpp_session_handle;
+  f64 time_to_send;
+  u64 bytes_paced_target;
+  u64 bytes_paced_current;
 } ec_session_t;
 
 typedef struct ec_worker_
@@ -65,6 +68,8 @@ typedef struct
   u32 prev_conns;
   u32 repeats;
 
+  f64
+    pacing_window_len; /**< Time between data chunk sends when limiting tput */
   u32 connect_conn_index; /**< Connects attempted progress */
 
   /*
@@ -89,6 +94,7 @@ typedef struct
   u32 connections_per_batch;           /**< Connections to rx/tx at once */
   u32 private_segment_count;           /**< Number of private fifo segs */
   u64 private_segment_size;            /**< size of private fifo segs */
+  u64 throughput;                      /**< Target bytes per second */
   u32 tls_engine;                      /**< TLS engine mbedtls/openssl */
   u32 no_copy;                         /**< Don't memcpy data to tx fifo */
   u32 quic_streams;                    /**< QUIC streams per connection */