hsa: vcl test client epoll worker loop 68/35968/13
authorFlorin Coras <fcoras@cisco.com>
Fri, 15 Apr 2022 01:19:42 +0000 (18:19 -0700)
committerFlorin Coras <fcoras@cisco.com>
Fri, 22 Apr 2022 01:17:11 +0000 (18:17 -0700)
Supports more connections and track connect time. Can be used to measure
CPS. Only works in unidirectional mode for now.

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I70bc6a271996407dd16a96115f509bd680a0f302

src/plugins/hs_apps/vcl/vcl_test.h
src/plugins/hs_apps/vcl/vcl_test_client.c
src/plugins/hs_apps/vcl/vcl_test_server.c

index ac94979..e2a19e1 100644 (file)
@@ -65,7 +65,8 @@
 #define VCL_TEST_CFG_TXBUF_SIZE_DEF    8192
 #define VCL_TEST_CFG_RXBUF_SIZE_DEF    (64*VCL_TEST_CFG_TXBUF_SIZE_DEF)
 #define VCL_TEST_CFG_BUF_SIZE_MIN      128
-#define VCL_TEST_CFG_MAX_TEST_SESS     512
+#define VCL_TEST_CFG_MAX_TEST_SESS     ((uint32_t) 1e6)
+#define VCL_TEST_CFG_MAX_SELECT_SESS   512
 #define VCL_TEST_CFG_MAX_EPOLL_EVENTS  16
 
 #define VCL_TEST_CTRL_LISTENER         (~0 - 1)
@@ -140,6 +141,7 @@ typedef struct vcl_test_session
   vcl_test_stats_t stats;
   vcl_test_stats_t old_stats;
   int session_index;
+  struct vcl_test_session *next;
   vppcom_endpt_t endpt;
   uint8_t ip[16];
   vppcom_data_segment_t ds[2];
index 1b149b2..427dd82 100644 (file)
@@ -35,9 +35,21 @@ struct vtc_worker_
   vcl_test_session_t *qsessions;
   uint32_t n_sessions;
   uint32_t wrk_index;
-  fd_set wr_fdset;
-  fd_set rd_fdset;
-  int max_fd_index;
+  union
+  {
+    struct
+    {
+      fd_set wr_fdset;
+      fd_set rd_fdset;
+      int max_fd_index;
+    };
+    struct
+    {
+      uint32_t epoll_sh;
+      struct epoll_event ep_evts[VCL_TEST_CFG_MAX_EPOLL_EVENTS];
+      vcl_test_session_t *next_to_send;
+    };
+  };
   pthread_t thread_handle;
   vtc_worker_run_fn *wrk_run_fn;
   vcl_test_cfg_t cfg;
@@ -296,6 +308,19 @@ vtc_worker_start_transfer (vcl_test_client_worker_t *wrk)
     }
 }
 
+static int
+vtc_session_check_is_done (vcl_test_session_t *ts, uint8_t check_rx)
+{
+  if ((!check_rx && ts->stats.tx_bytes >= ts->cfg.total_bytes) ||
+      (check_rx && ts->stats.rx_bytes >= ts->cfg.total_bytes))
+    {
+      clock_gettime (CLOCK_REALTIME, &ts->stats.stop);
+      ts->is_done = 1;
+      return 1;
+    }
+  return 0;
+}
+
 static int
 vtc_worker_connect_sessions_select (vcl_test_client_worker_t *wrk)
 {
@@ -374,16 +399,16 @@ vtc_worker_run_select (vcl_test_client_worker_t *wrk)
          if (ts->is_done)
            continue;
 
-         if (FD_ISSET (vppcom_session_index (ts->fd), rfdset)
-             && ts->stats.rx_bytes < ts->cfg.total_bytes)
+         if (FD_ISSET (vppcom_session_index (ts->fd), rfdset) &&
+             ts->stats.rx_bytes < ts->cfg.total_bytes)
            {
              rv = ts->read (ts, ts->rxbuf, ts->rxbuf_size);
              if (rv < 0)
                break;
            }
 
-         if (FD_ISSET (vppcom_session_index (ts->fd), wfdset)
-             && ts->stats.tx_bytes < ts->cfg.total_bytes)
+         if (FD_ISSET (vppcom_session_index (ts->fd), wfdset) &&
+             ts->stats.tx_bytes < ts->cfg.total_bytes)
            {
              rv = ts->write (ts, ts->txbuf, ts->cfg.txbuf_size);
              if (rv < 0)
@@ -395,8 +420,8 @@ vtc_worker_run_select (vcl_test_client_worker_t *wrk)
              if (vcm->incremental_stats)
                vtc_inc_stats_check (ts);
            }
-         if ((!check_rx && ts->stats.tx_bytes >= ts->cfg.total_bytes)
-             || (check_rx && ts->stats.rx_bytes >= ts->cfg.total_bytes))
+         if ((!check_rx && ts->stats.tx_bytes >= ts->cfg.total_bytes) ||
+             (check_rx && ts->stats.rx_bytes >= ts->cfg.total_bytes))
            {
              clock_gettime (CLOCK_REALTIME, &ts->stats.stop);
              ts->is_done = 1;
@@ -408,6 +433,260 @@ vtc_worker_run_select (vcl_test_client_worker_t *wrk)
   return 0;
 }
 
+static void
+vtc_worker_epoll_send_add (vcl_test_client_worker_t *wrk,
+                          vcl_test_session_t *ts)
+{
+  if (!wrk->next_to_send)
+    {
+      wrk->next_to_send = ts;
+    }
+  else
+    {
+      ts->next = wrk->next_to_send;
+      wrk->next_to_send = ts->next;
+    }
+}
+
+static void
+vtc_worker_epoll_send_del (vcl_test_client_worker_t *wrk,
+                          vcl_test_session_t *ts, vcl_test_session_t *prev)
+{
+  if (!prev)
+    {
+      wrk->next_to_send = ts->next;
+    }
+  else
+    {
+      prev->next = ts->next;
+    }
+}
+
+static int
+vtc_worker_connect_sessions_epoll (vcl_test_client_worker_t *wrk)
+{
+  vcl_test_client_main_t *vcm = &vcl_client_main;
+  vcl_test_main_t *vt = &vcl_test_main;
+  const vcl_test_proto_vft_t *tp;
+  struct timespec start, end;
+  uint32_t n_connected = 0;
+  vcl_test_session_t *ts;
+  struct epoll_event ev;
+  int i, ci = 0, rv, n_ev;
+  double diff;
+
+  tp = vt->protos[vcm->proto];
+  wrk->epoll_sh = vppcom_epoll_create ();
+
+  ev.events = EPOLLET | EPOLLOUT;
+
+  clock_gettime (CLOCK_REALTIME, &start);
+
+  while (n_connected < wrk->cfg.num_test_sessions)
+    {
+      /*
+       * Try to connect more sessions if under pending threshold
+       */
+      while ((ci - n_connected) < 16 && ci < wrk->cfg.num_test_sessions)
+       {
+         ts = &wrk->sessions[ci];
+         ts->noblk_connect = 1;
+         rv = tp->open (&wrk->sessions[ci], &vcm->server_endpt);
+         if (rv < 0)
+           {
+             vtwrn ("open: %d", rv);
+             return rv;
+           }
+
+         ev.data.u64 = ci;
+         rv = vppcom_epoll_ctl (wrk->epoll_sh, EPOLL_CTL_ADD, ts->fd, &ev);
+         if (rv < 0)
+           {
+             vtwrn ("vppcom_epoll_ctl: %d", rv);
+             return rv;
+           }
+         ci += 1;
+       }
+
+      /*
+       * Handle connected events
+       */
+      n_ev =
+       vppcom_epoll_wait (wrk->epoll_sh, wrk->ep_evts,
+                          VCL_TEST_CFG_MAX_EPOLL_EVENTS, 0 /* timeout */);
+      if (n_ev < 0)
+       {
+         vterr ("vppcom_epoll_wait() returned", n_ev);
+         return -1;
+       }
+      else if (n_ev == 0)
+       {
+         continue;
+       }
+
+      for (i = 0; i < n_ev; i++)
+       {
+         ts = &wrk->sessions[wrk->ep_evts[i].data.u32];
+         if (!(wrk->ep_evts[i].events & EPOLLOUT))
+           {
+             vtwrn ("connect failed");
+             return -1;
+           }
+         if (ts->is_open)
+           {
+             vtwrn ("connection already open?");
+             return -1;
+           }
+         ts->is_open = 1;
+         n_connected += 1;
+       }
+    }
+
+  clock_gettime (CLOCK_REALTIME, &end);
+
+  diff = vcl_test_time_diff (&start, &end);
+  vtinf ("Connected (%u) connected in %.2f seconds (%u CPS)!",
+        wrk->cfg.num_test_sessions, diff,
+        (uint32_t) ((double) wrk->cfg.num_test_sessions / diff));
+
+  ev.events = EPOLLET | EPOLLIN | EPOLLOUT;
+
+  for (i = 0; i < wrk->cfg.num_test_sessions; i++)
+    {
+      ts = &wrk->sessions[i];
+
+      /* No data to be sent */
+      if (ts->cfg.total_bytes == 0)
+       {
+         n_connected -= 1;
+         clock_gettime (CLOCK_REALTIME, &ts->stats.stop);
+         ts->is_done = 1;
+         continue;
+       }
+
+      ev.data.u64 = i;
+      rv = vppcom_epoll_ctl (wrk->epoll_sh, EPOLL_CTL_MOD, ts->fd, &ev);
+      if (rv < 0)
+       {
+         vtwrn ("vppcom_epoll_ctl: %d", rv);
+         return rv;
+       }
+      vtc_worker_epoll_send_add (wrk, ts);
+    }
+
+  return n_connected;
+}
+
+static int
+vtc_worker_run_epoll (vcl_test_client_worker_t *wrk)
+{
+  vcl_test_client_main_t *vcm = &vcl_client_main;
+  uint32_t n_active_sessions, max_writes = 16, n_writes = 0;
+  vcl_test_session_t *ts, *prev = 0;
+  int i, rv, check_rx = 0, n_ev;
+
+  rv = vtc_worker_connect_sessions_epoll (wrk);
+  if (rv < 0)
+    {
+      vterr ("vtc_worker_connect_sessions()", rv);
+      return rv;
+    }
+
+  n_active_sessions = rv;
+  check_rx = wrk->cfg.test != VCL_TEST_TYPE_UNI;
+
+  vtc_worker_start_transfer (wrk);
+  ts = wrk->next_to_send;
+
+  while (n_active_sessions && vcm->test_running)
+    {
+      /*
+       * Try to write
+       */
+      if (!ts)
+       {
+         ts = wrk->next_to_send;
+         if (!ts)
+           goto get_epoll_evts;
+       }
+
+      rv = ts->write (ts, ts->txbuf, ts->cfg.txbuf_size);
+      if (rv > 0)
+       {
+         if (vcm->incremental_stats)
+           vtc_inc_stats_check (ts);
+         if (vtc_session_check_is_done (ts, check_rx))
+           n_active_sessions -= 1;
+       }
+      else if (rv == 0)
+       {
+         vtc_worker_epoll_send_del (wrk, ts, prev);
+       }
+      else
+       {
+         vtwrn ("vppcom_test_write (%d) failed -- aborting test", ts->fd);
+         return -1;
+       }
+      prev = ts;
+      ts = ts->next;
+      n_writes += 1;
+
+      if (rv > 0 && n_writes < max_writes)
+       continue;
+
+    get_epoll_evts:
+
+      /*
+       * Grab new events
+       */
+
+      n_ev =
+       vppcom_epoll_wait (wrk->epoll_sh, wrk->ep_evts,
+                          VCL_TEST_CFG_MAX_EPOLL_EVENTS, 0 /* timeout */);
+      if (n_ev < 0)
+       {
+         vterr ("vppcom_epoll_wait()", n_ev);
+         break;
+       }
+      else if (n_ev == 0)
+       {
+         continue;
+       }
+
+      for (i = 0; i < n_ev; i++)
+       {
+         ts = &wrk->sessions[wrk->ep_evts[i].data.u32];
+
+         if (ts->is_done)
+           continue;
+
+         if (wrk->ep_evts[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP))
+           {
+             vtinf ("%u finished before reading all data?", ts->fd);
+             break;
+           }
+         if ((wrk->ep_evts[i].events & EPOLLIN) &&
+             ts->stats.rx_bytes < ts->cfg.total_bytes)
+           {
+             rv = ts->read (ts, ts->rxbuf, ts->rxbuf_size);
+             if (rv < 0)
+               break;
+             if (vtc_session_check_is_done (ts, check_rx))
+               n_active_sessions -= 1;
+           }
+         if ((wrk->ep_evts[i].events & EPOLLOUT) &&
+             ts->stats.tx_bytes < ts->cfg.total_bytes)
+           {
+             vtc_worker_epoll_send_add (wrk, ts);
+           }
+       }
+
+      n_writes = 0;
+    }
+
+  return 0;
+}
+
 static inline int
 vtc_worker_run (vcl_test_client_worker_t *wrk)
 {
@@ -1078,14 +1357,18 @@ static void
 vtc_alloc_workers (vcl_test_client_main_t *vcm)
 {
   vcl_test_main_t *vt = &vcl_test_main;
+  vtc_worker_run_fn *run_fn;
 
   vcm->workers = calloc (vcm->n_workers, sizeof (vcl_test_client_worker_t));
   vt->wrk = calloc (vcm->n_workers, sizeof (vcl_test_wrk_t));
 
+  if (vcm->ctrl_session.cfg.num_test_sessions > VCL_TEST_CFG_MAX_SELECT_SESS)
+    run_fn = vtc_worker_run_epoll;
+  else
+    run_fn = vtc_worker_run_select;
+
   for (int i = 0; i < vcm->n_workers; i++)
-    {
-      vcm->workers[i].wrk_run_fn = vtc_worker_run_select;
-    }
+    vcm->workers[i].wrk_run_fn = run_fn;
 }
 
 int
index d1700d4..f5c81ce 100644 (file)
@@ -373,8 +373,9 @@ vts_accept_client (vcl_test_server_worker_t *wrk, int listen_fd)
   if (tp->accept (listen_fd, conn))
     return 0;
 
-  vtinf ("Got a connection -- fd = %d (0x%08x) on listener fd = %d (0x%08x)",
-        conn->fd, conn->fd, listen_fd, listen_fd);
+  if (conn->cfg.num_test_sessions < VCL_TEST_CFG_MAX_SELECT_SESS)
+    vtinf ("Got a connection -- fd = %d (0x%08x) on listener fd = %d (0x%08x)",
+          conn->fd, conn->fd, listen_fd, listen_fd);
 
   ev.events = EPOLLET | EPOLLIN;
   ev.data.u64 = conn - wrk->conn_pool;