hsa: unify echo test setup
[vpp.git] / src / plugins / hs_apps / vcl / vcl_test_server.c
index a36801b..6ce9130 100644 (file)
@@ -58,6 +58,7 @@ typedef struct
   volatile int worker_fails;
   volatile int active_workers;
   u8 use_ds;
+  u8 incremental_stats;
 } vcl_test_server_main_t;
 
 vcl_test_main_t vcl_test_main;
@@ -101,11 +102,11 @@ again:
       if (!wrk->conn_pool[i].is_alloc)
        {
          conn = &wrk->conn_pool[i];
+         memset (conn, 0, sizeof (*conn));
          conn->endpt.ip = wrk->conn_pool[i].ip;
          conn->is_alloc = 1;
          conn->session_index = i;
-         memset (&conn->stats, 0, sizeof (vcl_test_stats_t));
-         vcl_test_cfg_init (&conn->cfg);
+         hs_test_cfg_init (&conn->cfg);
          return (&wrk->conn_pool[i]);
        }
     }
@@ -129,7 +130,7 @@ conn_pool_free (vcl_test_session_t *ts)
 }
 
 static inline void
-sync_config_and_reply (vcl_test_session_t *conn, vcl_test_cfg_t *rx_cfg)
+sync_config_and_reply (vcl_test_session_t *conn, hs_test_cfg_t *rx_cfg)
 {
   conn->cfg = *rx_cfg;
   vcl_test_buf_alloc (&conn->cfg, 1 /* is_rxbuf */, (uint8_t **) &conn->rxbuf,
@@ -139,7 +140,7 @@ sync_config_and_reply (vcl_test_session_t *conn, vcl_test_cfg_t *rx_cfg)
   if (conn->cfg.verbose)
     {
       vtinf ("(fd %d): Replying to cfg message!\n", conn->fd);
-      vcl_test_cfg_dump (&conn->cfg, 0 /* is_client */ );
+      hs_test_cfg_dump (&conn->cfg, 0 /* is_client */);
     }
   (void) vcl_test_write (conn, &conn->cfg, sizeof (conn->cfg));
 }
@@ -184,14 +185,14 @@ vts_wrk_cleanup_all (vcl_test_server_worker_t *wrk)
 
 static void
 vts_test_cmd (vcl_test_server_worker_t *wrk, vcl_test_session_t *conn,
-             vcl_test_cfg_t *rx_cfg)
+             hs_test_cfg_t *rx_cfg)
 {
-  u8 is_bi = rx_cfg->test == VCL_TEST_TYPE_BI;
+  u8 is_bi = rx_cfg->test == HS_TEST_TYPE_BI;
   vcl_test_session_t *tc;
   char buf[64];
   int i;
 
-  if (rx_cfg->cmd == VCL_TEST_CMD_STOP)
+  if (rx_cfg->cmd == HS_TEST_CMD_STOP)
     {
       struct timespec stop;
       clock_gettime (CLOCK_REALTIME, &stop);
@@ -231,25 +232,25 @@ vts_test_cmd (vcl_test_server_worker_t *wrk, vcl_test_session_t *conn,
 
       vcl_test_stats_dump ("SERVER RESULTS", &conn->stats, 1 /* show_rx */ ,
                           is_bi /* show_tx */ , conn->cfg.verbose);
-      vcl_test_cfg_dump (&conn->cfg, 0 /* is_client */ );
+      hs_test_cfg_dump (&conn->cfg, 0 /* is_client */);
       if (conn->cfg.verbose)
        {
-         vtinf ("  vcl server main\n" VCL_TEST_SEPARATOR_STRING
+         vtinf ("  vcl server main\n" HS_TEST_SEPARATOR_STRING
                 "       buf:  %p\n"
-                "  buf size:  %u (0x%08x)\n" VCL_TEST_SEPARATOR_STRING,
+                "  buf size:  %u (0x%08x)\n" HS_TEST_SEPARATOR_STRING,
                 conn->rxbuf, conn->rxbuf_size, conn->rxbuf_size);
        }
 
       sync_config_and_reply (conn, rx_cfg);
       memset (&conn->stats, 0, sizeof (conn->stats));
     }
-  else if (rx_cfg->cmd == VCL_TEST_CMD_SYNC)
+  else if (rx_cfg->cmd == HS_TEST_CMD_SYNC)
     {
       rx_cfg->ctrl_handle = conn->fd;
       vtinf ("Set control fd %d for test!", conn->fd);
       sync_config_and_reply (conn, rx_cfg);
     }
-  else if (rx_cfg->cmd == VCL_TEST_CMD_START)
+  else if (rx_cfg->cmd == HS_TEST_CMD_START)
     {
       vtinf ("Starting %s-directional Stream Test (fd %d)!",
             is_bi ? "Bi" : "Uni", conn->fd);
@@ -267,7 +268,7 @@ vts_server_process_rx (vcl_test_session_t *conn, int rx_bytes)
 {
   vcl_test_server_main_t *vsm = &vcl_server_main;
 
-  if (conn->cfg.test == VCL_TEST_TYPE_BI)
+  if (conn->cfg.test == HS_TEST_TYPE_BI)
     {
       if (vsm->use_ds)
        {
@@ -305,6 +306,48 @@ vts_server_echo (vcl_test_session_t *conn, int rx_bytes)
     vtinf ("(fd %d): TX (%d bytes) - '%s'", conn->fd, tx_bytes, conn->rxbuf);
 }
 
+static vcl_test_session_t *
+vts_accept_ctrl (vcl_test_server_worker_t *wrk, int listen_fd)
+{
+  vcl_test_server_main_t *vsm = &vcl_server_main;
+  const vcl_test_proto_vft_t *tp;
+  vcl_test_session_t *conn;
+  struct epoll_event ev;
+  int rv;
+
+  conn = conn_pool_alloc (wrk);
+  if (!conn)
+    {
+      vtwrn ("No free connections!");
+      return 0;
+    }
+
+  if (vsm->ctrl)
+    conn->cfg = vsm->ctrl->cfg;
+  vcl_test_session_buf_alloc (conn);
+  clock_gettime (CLOCK_REALTIME, &conn->old_stats.stop);
+
+  tp = vcl_test_main.protos[VPPCOM_PROTO_TCP];
+  if (tp->accept (listen_fd, conn))
+    return 0;
+
+  vtinf ("CTRL accepted fd = %d (0x%08x) on listener fd = %d (0x%08x)",
+        conn->fd, conn->fd, listen_fd, listen_fd);
+
+  ev.events = EPOLLET | EPOLLIN;
+  ev.data.u64 = conn - wrk->conn_pool;
+  rv = vppcom_epoll_ctl (wrk->epfd, EPOLL_CTL_ADD, conn->fd, &ev);
+  if (rv < 0)
+    {
+      vterr ("vppcom_epoll_ctl()", rv);
+      return 0;
+    }
+
+  wrk->nfds++;
+
+  return conn;
+}
+
 static vcl_test_session_t *
 vts_accept_client (vcl_test_server_worker_t *wrk, int listen_fd)
 {
@@ -324,15 +367,17 @@ vts_accept_client (vcl_test_server_worker_t *wrk, int listen_fd)
   if (vsm->ctrl)
     conn->cfg = vsm->ctrl->cfg;
   vcl_test_session_buf_alloc (conn);
+  clock_gettime (CLOCK_REALTIME, &conn->old_stats.stop);
 
   tp = vcl_test_main.protos[vsm->server_cfg.proto];
   if (tp->accept (listen_fd, conn))
     return 0;
 
-  vtinf ("Got a connection -- fd = %d (0x%08x) on listener fd = %d (0x%08x)",
-        conn->fd, conn->fd, listen_fd, listen_fd);
+  if (conn->cfg.num_test_sessions < VCL_TEST_CFG_MAX_SELECT_SESS)
+    vtinf ("Got a connection -- fd = %d (0x%08x) on listener fd = %d (0x%08x)",
+          conn->fd, conn->fd, listen_fd, listen_fd);
 
-  ev.events = EPOLLIN;
+  ev.events = EPOLLET | EPOLLIN;
   ev.data.u64 = conn - wrk->conn_pool;
   rv = vppcom_epoll_ctl (wrk->epfd, EPOLL_CTL_ADD, conn->fd, &ev);
   if (rv < 0)
@@ -348,15 +393,15 @@ vts_accept_client (vcl_test_server_worker_t *wrk, int listen_fd)
 static void
 print_usage_and_exit (void)
 {
-  fprintf (stderr,
-          "vcl_test_server [OPTIONS] <port>\n"
-          "  OPTIONS\n"
-          "  -h               Print this message and exit.\n"
-          "  -6               Use IPv6\n"
-          "  -w <num>         Number of workers\n"
-          "  -p <PROTO>       Use <PROTO> transport layer\n"
-          "  -D               Use UDP transport layer\n"
-          "  -L               Use TLS transport layer\n");
+  fprintf (stderr, "vcl_test_server [OPTIONS] <port>\n"
+                  "  OPTIONS\n"
+                  "  -h               Print this message and exit.\n"
+                  "  -6               Use IPv6\n"
+                  "  -w <num>         Number of workers\n"
+                  "  -p <PROTO>       Use <PROTO> transport layer\n"
+                  "  -D               Use UDP transport layer\n"
+                  "  -L               Use TLS transport layer\n"
+                  "  -S               Incremental stats\n");
   exit (1);
 }
 
@@ -406,7 +451,7 @@ vcl_test_server_process_opts (vcl_test_server_main_t * vsm, int argc,
   vsm->server_cfg.proto = VPPCOM_PROTO_TCP;
 
   opterr = 0;
-  while ((c = getopt (argc, argv, "6DLsw:hp:")) != -1)
+  while ((c = getopt (argc, argv, "6DLsw:hp:S")) != -1)
     switch (c)
       {
       case '6':
@@ -436,6 +481,9 @@ vcl_test_server_process_opts (vcl_test_server_main_t * vsm, int argc,
       case 's':
        vsm->use_ds = 1;
        break;
+      case 'S':
+       vsm->incremental_stats = 1;
+       break;
       case '?':
        switch (optopt)
          {
@@ -455,31 +503,33 @@ vcl_test_server_process_opts (vcl_test_server_main_t * vsm, int argc,
        print_usage_and_exit ();
       }
 
-  if (argc < (optind + 1))
+  if (argc > (optind + 1))
     {
-      fprintf (stderr, "SERVER: ERROR: Insufficient number of arguments!\n");
+      fprintf (stderr, "Incorrect number of arguments!\n");
       print_usage_and_exit ();
     }
-
-  if (sscanf (argv[optind], "%d", &v) == 1)
-    vsm->server_cfg.port = (uint16_t) v;
-  else
+  else if (argc > 1 && argc == (optind + 1))
     {
-      fprintf (stderr, "SERVER: ERROR: Invalid port (%s)!\n", argv[optind]);
-      print_usage_and_exit ();
+      if (sscanf (argv[optind], "%d", &v) == 1)
+       vsm->server_cfg.port = (uint16_t) v;
+      else
+       {
+         fprintf (stderr, "Invalid port (%s)!\n", argv[optind]);
+         print_usage_and_exit ();
+       }
     }
 
   vcl_test_init_endpoint_addr (vsm);
 }
 
 int
-vts_handle_ctrl_cfg (vcl_test_server_worker_t *wrk, vcl_test_cfg_t *rx_cfg,
+vts_handle_ctrl_cfg (vcl_test_server_worker_t *wrk, hs_test_cfg_t *rx_cfg,
                     vcl_test_session_t *conn, int rx_bytes)
 {
   if (rx_cfg->verbose)
     {
       vtinf ("(fd %d): Received a cfg msg!", conn->fd);
-      vcl_test_cfg_dump (rx_cfg, 0 /* is_client */ );
+      hs_test_cfg_dump (rx_cfg, 0 /* is_client */);
     }
 
   if (rx_bytes != sizeof (*rx_cfg))
@@ -491,7 +541,7 @@ vts_handle_ctrl_cfg (vcl_test_server_worker_t *wrk, vcl_test_cfg_t *rx_cfg,
       if (conn->cfg.verbose)
        {
          vtinf ("(fd %d): Replying to cfg msg", conn->fd);
-         vcl_test_cfg_dump (rx_cfg, 0 /* is_client */ );
+         hs_test_cfg_dump (rx_cfg, 0 /* is_client */);
        }
       conn->write (conn, &conn->cfg, sizeof (conn->cfg));
       return -1;
@@ -499,27 +549,28 @@ vts_handle_ctrl_cfg (vcl_test_server_worker_t *wrk, vcl_test_cfg_t *rx_cfg,
 
   switch (rx_cfg->test)
     {
-    case VCL_TEST_TYPE_NONE:
-    case VCL_TEST_TYPE_ECHO:
+    case HS_TEST_TYPE_NONE:
+    case HS_TEST_TYPE_ECHO:
       sync_config_and_reply (conn, rx_cfg);
       break;
 
-    case VCL_TEST_TYPE_BI:
-    case VCL_TEST_TYPE_UNI:
+    case HS_TEST_TYPE_BI:
+    case HS_TEST_TYPE_UNI:
       vts_test_cmd (wrk, conn, rx_cfg);
       break;
 
-    case VCL_TEST_TYPE_EXIT:
+    case HS_TEST_TYPE_EXIT:
       vtinf ("Ctrl session fd %d closing!", conn->fd);
       vts_session_cleanup (conn);
       wrk->nfds--;
       if (wrk->nfds)
        vts_wrk_cleanup_all (wrk);
+      vcl_server_main.ctrl = 0;
       break;
 
     default:
       vtwrn ("Unknown test type %d", rx_cfg->test);
-      vcl_test_cfg_dump (rx_cfg, 0 /* is_client */ );
+      hs_test_cfg_dump (rx_cfg, 0 /* is_client */);
       break;
     }
 
@@ -539,7 +590,7 @@ vts_worker_init (vcl_test_server_worker_t * wrk)
 
   vtinf ("Initializing worker ...");
 
-  conn_pool_expand (wrk, VCL_TEST_CFG_MAX_TEST_SESS + 1);
+  conn_pool_expand (wrk, VCL_TEST_CFG_INIT_TEST_SESS + 1);
   if (wrk->wrk_index)
     if (vppcom_worker_register ())
       vtfail ("vppcom_worker_register()", 1);
@@ -556,7 +607,7 @@ vts_worker_init (vcl_test_server_worker_t * wrk)
        vtfail ("vppcom_epoll_create()", wrk->epfd);
     }
 
-  listen_ev.events = EPOLLIN;
+  listen_ev.events = EPOLLET | EPOLLIN;
   listen_ev.data.u32 = VCL_TEST_DATA_LISTENER;
   rv =
     vppcom_epoll_ctl (wrk->epfd, EPOLL_CTL_ADD, wrk->listener.fd, &listen_ev);
@@ -578,6 +629,21 @@ vts_conn_read (vcl_test_session_t *conn)
     return conn->read (conn, conn->rxbuf, conn->rxbuf_size);
 }
 
+static void
+vts_inc_stats_check (vcl_test_session_t *ts)
+{
+  /* Avoid checking time too often because of syscall cost */
+  if (ts->stats.rx_bytes - ts->old_stats.rx_bytes < 1 << 20)
+    return;
+
+  clock_gettime (CLOCK_REALTIME, &ts->stats.stop);
+  if (vcl_test_time_diff (&ts->old_stats.stop, &ts->stats.stop) > 1)
+    {
+      vcl_test_stats_dump_inc (ts, 1 /* is_rx */);
+      ts->old_stats = ts->stats;
+    }
+}
+
 static void *
 vts_worker_loop (void *arg)
 {
@@ -586,7 +652,7 @@ vts_worker_loop (void *arg)
   vcl_test_server_worker_t *wrk = arg;
   vcl_test_session_t *conn;
   int i, rx_bytes, num_ev;
-  vcl_test_cfg_t *rx_cfg;
+  hs_test_cfg_t *rx_cfg;
 
   if (wrk->wrk_index)
     vts_worker_init (wrk);
@@ -613,13 +679,13 @@ vts_worker_loop (void *arg)
           */
          if (ep_evts[i].events & (EPOLLHUP | EPOLLRDHUP))
            {
-             vts_session_cleanup (conn);
-             wrk->nfds--;
-             if (!wrk->nfds)
+             if (conn == vsm->ctrl)
                {
-                 vtinf ("All client connections closed\n");
-                 goto done;
+                 vtinf ("ctrl session went away");
+                 vsm->ctrl = 0;
                }
+             vts_session_cleanup (conn);
+             wrk->nfds--;
              continue;
            }
 
@@ -634,9 +700,13 @@ vts_worker_loop (void *arg)
                  vtwrn ("ctrl already exists");
                  continue;
                }
-             vsm->ctrl = vts_accept_client (wrk, vsm->ctrl_listen_fd);
+             vsm->ctrl = vts_accept_ctrl (wrk, vsm->ctrl_listen_fd);
              continue;
            }
+
+         /* at this point ctrl session must be valid */
+         ASSERT (vsm->ctrl);
+
          if (ep_evts[i].data.u32 == VCL_TEST_DATA_LISTENER)
            {
              conn = vts_accept_client (wrk, wrk->listener.fd);
@@ -656,8 +726,8 @@ vts_worker_loop (void *arg)
          if (!wrk->wrk_index && conn->fd == vsm->ctrl->fd)
            {
              rx_bytes = conn->read (conn, conn->rxbuf, conn->rxbuf_size);
-             rx_cfg = (vcl_test_cfg_t *) conn->rxbuf;
-             if (rx_cfg->magic == VCL_TEST_CFG_CTRL_MAGIC)
+             rx_cfg = (hs_test_cfg_t *) conn->rxbuf;
+             if (rx_cfg->magic == HS_TEST_CFG_CTRL_MAGIC)
                {
                  vts_handle_ctrl_cfg (wrk, rx_cfg, conn, rx_bytes);
                  if (!wrk->nfds)
@@ -700,6 +770,8 @@ vts_worker_loop (void *arg)
              if (vppcom_session_attr (conn->fd, VPPCOM_ATTR_GET_NREAD, 0, 0) >
                  0)
                goto read_again;
+             if (vsm->incremental_stats)
+               vts_inc_stats_check (conn);
              continue;
            }
          else
@@ -747,7 +819,7 @@ vts_ctrl_session_init (vcl_test_server_worker_t *wrk)
   if (wrk->epfd < 0)
     vtfail ("vppcom_epoll_create()", wrk->epfd);
 
-  listen_ev.events = EPOLLIN;
+  listen_ev.events = EPOLLET | EPOLLIN;
   listen_ev.data.u32 = VCL_TEST_CTRL_LISTENER;
   rv = vppcom_epoll_ctl (wrk->epfd, EPOLL_CTL_ADD, vsm->ctrl_listen_fd,
                         &listen_ev);
@@ -783,13 +855,15 @@ main (int argc, char **argv)
   vts_ctrl_session_init (&vsm->workers[0]);
 
   /* Update ctrl port to data port */
-  vsm->server_cfg.endpt.port += 1;
+  vsm->server_cfg.endpt.port = hs_make_data_port (vsm->server_cfg.endpt.port);
   vts_worker_init (&vsm->workers[0]);
   for (i = 1; i < vsm->server_cfg.workers; i++)
     {
       vsm->workers[i].wrk_index = i;
       rv = pthread_create (&vsm->workers[i].thread_handle, NULL,
                           vts_worker_loop, (void *) &vsm->workers[i]);
+      if (rv)
+       vtfail ("pthread_create()", rv);
     }
 
   vts_worker_loop (&vsm->workers[0]);