hsa: vcl test client incremental stats
[vpp.git] / src / plugins / hs_apps / vcl / vcl_test_client.c
index 0aff98e..d16d62c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
 #include <arpa/inet.h>
 #include <hs_apps/vcl/vcl_test.h>
 #include <pthread.h>
+#include <signal.h>
 
 typedef struct
 {
   vcl_test_session_t *sessions;
   vcl_test_session_t *qsessions;
   uint32_t n_sessions;
-  uint32_t n_qsessions;
   uint32_t wrk_index;
   fd_set wr_fdset;
   fd_set rd_fdset;
@@ -42,27 +42,26 @@ typedef struct
 typedef struct
 {
   vcl_test_client_worker_t *workers;
+  vcl_test_session_t ctrl_session;
   vppcom_endpt_t server_endpt;
   uint32_t cfg_seq_num;
-  vcl_test_session_t quic_session;
-  vcl_test_session_t ctrl_session;
-  vcl_test_session_t *sessions;
   uint8_t dump_cfg;
   vcl_test_t post_test;
   uint8_t proto;
+  uint8_t incremental_stats;
   uint32_t n_workers;
-  uint32_t ckpair_index;
   volatile int active_workers;
+  volatile int test_running;
   struct sockaddr_storage server_addr;
 } vcl_test_client_main_t;
 
-static __thread int __wrk_index = 0;
-
 vcl_test_client_main_t vcl_client_main;
 
 #define vtc_min(a, b) (a < b ? a : b)
 #define vtc_max(a, b) (a > b ? a : b)
 
+vcl_test_main_t vcl_test_main;
+
 static int
 vtc_cfg_sync (vcl_test_session_t * ts)
 {
@@ -75,16 +74,14 @@ vtc_cfg_sync (vcl_test_session_t * ts)
       vtinf ("(fd %d): Sending config to server.", ts->fd);
       vcl_test_cfg_dump (&ts->cfg, 1 /* is_client */ );
     }
-  tx_bytes = vcl_test_write (ts->fd, (uint8_t *) & ts->cfg,
-                            sizeof (ts->cfg), NULL, ts->cfg.verbose);
+  tx_bytes = ts->write (ts, &ts->cfg, sizeof (ts->cfg));
   if (tx_bytes < 0)
     {
       vtwrn ("(fd %d): write test cfg failed (%d)!", ts->fd, tx_bytes);
       return tx_bytes;
     }
 
-  rx_bytes = vcl_test_read (ts->fd, (uint8_t *) ts->rxbuf,
-                           sizeof (vcl_test_cfg_t), NULL);
+  rx_bytes = ts->read (ts, ts->rxbuf, sizeof (vcl_test_cfg_t));
   if (rx_bytes < 0)
     return rx_bytes;
 
@@ -119,121 +116,16 @@ vtc_cfg_sync (vcl_test_session_t * ts)
   return 0;
 }
 
-static int
-vtc_quic_connect_test_sessions (vcl_test_client_worker_t * wrk)
-{
-  vcl_test_client_main_t *vcm = &vcl_client_main;
-  vcl_test_session_t *ts, *tq;
-  uint32_t i, flags, flen;
-  int rv;
-
-  if (wrk->cfg.num_test_sessions < 1 || wrk->cfg.num_test_sessions_perq < 1)
-    {
-      errno = EINVAL;
-      return -1;
-    }
-
-  if (wrk->n_sessions >= wrk->cfg.num_test_sessions)
-    goto done;
-
-  /* Connect Qsessions */
-
-  if (wrk->n_qsessions)
-    wrk->qsessions =
-      realloc (wrk->qsessions,
-              wrk->cfg.num_test_qsessions * sizeof (vcl_test_session_t));
-  else
-    wrk->qsessions =
-      calloc (wrk->cfg.num_test_qsessions, sizeof (vcl_test_session_t));
-
-  if (!wrk->qsessions)
-    {
-      vterr ("failed to alloc Qsessions", -errno);
-      return errno;
-    }
-
-
-  for (i = 0; i < wrk->cfg.num_test_qsessions; i++)
-    {
-      tq = &wrk->qsessions[i];
-      tq->fd = vppcom_session_create (vcm->proto, 0 /* is_nonblocking */ );
-      tq->session_index = i;
-      if (tq->fd < 0)
-       {
-         vterr ("vppcom_session_create()", tq->fd);
-         return tq->fd;
-       }
-
-      /* Connect is blocking */
-      rv = vppcom_session_connect (tq->fd, &vcm->server_endpt);
-      if (rv < 0)
-       {
-         vterr ("vppcom_session_connect()", rv);
-         return rv;
-       }
-      flags = O_NONBLOCK;
-      flen = sizeof (flags);
-      vppcom_session_attr (tq->fd, VPPCOM_ATTR_SET_FLAGS, &flags, &flen);
-      vtinf ("Test Qsession %d (fd %d) connected.", i, tq->fd);
-    }
-  wrk->n_qsessions = wrk->cfg.num_test_qsessions;
-
-  /* Connect Stream sessions */
-
-  if (wrk->n_sessions)
-    wrk->sessions =
-      realloc (wrk->sessions,
-              wrk->cfg.num_test_sessions * sizeof (vcl_test_session_t));
-  else
-    wrk->sessions =
-      calloc (wrk->cfg.num_test_sessions, sizeof (vcl_test_session_t));
-
-  if (!wrk->sessions)
-    {
-      vterr ("failed to alloc sessions", -errno);
-      return errno;
-    }
-
-  for (i = 0; i < wrk->cfg.num_test_sessions; i++)
-    {
-      tq = &wrk->qsessions[i / wrk->cfg.num_test_sessions_perq];
-      ts = &wrk->sessions[i];
-      ts->fd = vppcom_session_create (vcm->proto, 1 /* is_nonblocking */ );
-      ts->session_index = i;
-      if (ts->fd < 0)
-       {
-         vterr ("vppcom_session_create()", ts->fd);
-         return ts->fd;
-       }
-
-      rv = vppcom_session_stream_connect (ts->fd, tq->fd);
-      if (rv < 0)
-       {
-         vterr ("vppcom_session_stream_connect()", rv);
-         return rv;
-       }
-
-      vtinf ("Test session %d (fd %d) connected.", i, ts->fd);
-    }
-  wrk->n_sessions = wrk->cfg.num_test_sessions;
-
-done:
-  vtinf ("All test sessions (%d) connected!", wrk->cfg.num_test_sessions);
-  return 0;
-}
-
 static int
 vtc_connect_test_sessions (vcl_test_client_worker_t * wrk)
 {
   vcl_test_client_main_t *vcm = &vcl_client_main;
+  vcl_test_main_t *vt = &vcl_test_main;
+  const vcl_test_proto_vft_t *tp;
   vcl_test_session_t *ts;
   uint32_t n_test_sessions;
-  uint32_t flags, flen;
   int i, rv;
 
-  if (vcm->proto == VPPCOM_PROTO_QUIC)
-    return vtc_quic_connect_test_sessions (wrk);
-
   n_test_sessions = wrk->cfg.num_test_sessions;
   if (n_test_sessions < 1)
     {
@@ -256,34 +148,18 @@ vtc_connect_test_sessions (vcl_test_client_worker_t * wrk)
       return errno;
     }
 
+  tp = vt->protos[vcm->proto];
+
   for (i = 0; i < n_test_sessions; i++)
     {
       ts = &wrk->sessions[i];
-      ts->fd = vppcom_session_create (vcm->proto, 0 /* is_nonblocking */ );
-      if (ts->fd < 0)
-       {
-         vterr ("vppcom_session_create()", ts->fd);
-         return ts->fd;
-       }
-
-      if (vcm->proto == VPPCOM_PROTO_TLS)
-       {
-         uint32_t ckp_len = sizeof (vcm->ckpair_index);
-         vppcom_session_attr (ts->fd, VPPCOM_ATTR_SET_CKPAIR,
-                              &vcm->ckpair_index, &ckp_len);
-       }
-
-      /* Connect is blocking */
-      rv = vppcom_session_connect (ts->fd, &vcm->server_endpt);
+      memset (ts, 0, sizeof (*ts));
+      ts->session_index = i;
+      ts->cfg = wrk->cfg;
+      vcl_test_session_buf_alloc (ts);
+      rv = tp->open (&wrk->sessions[i], &vcm->server_endpt);
       if (rv < 0)
-       {
-         vterr ("vppcom_session_connect()", rv);
-         return rv;
-       }
-      flags = O_NONBLOCK;
-      flen = sizeof (flags);
-      vppcom_session_attr (ts->fd, VPPCOM_ATTR_SET_FLAGS, &flags, &flen);
-      vtinf ("Test session %d (fd %d) connected.", i, ts->fd);
+       return rv;
     }
   wrk->n_sessions = n_test_sessions;
 
@@ -295,32 +171,31 @@ done:
 static int
 vtc_worker_test_setup (vcl_test_client_worker_t * wrk)
 {
-  vcl_test_client_main_t *vcm = &vcl_client_main;
-  vcl_test_session_t *ctrl = &vcm->ctrl_session;
   vcl_test_cfg_t *cfg = &wrk->cfg;
   vcl_test_session_t *ts;
+  struct timespec now;
   uint32_t sidx;
   int i, j;
 
   FD_ZERO (&wrk->wr_fdset);
   FD_ZERO (&wrk->rd_fdset);
 
+  clock_gettime (CLOCK_REALTIME, &now);
+
   for (i = 0; i < cfg->num_test_sessions; i++)
     {
       ts = &wrk->sessions[i];
-      ts->cfg = wrk->cfg;
-      vcl_test_session_buf_alloc (ts);
+      ts->old_stats.stop = now;
 
       switch (cfg->test)
        {
-       case VCL_TEST_TYPE_ECHO:
-         memcpy (ts->txbuf, ctrl->txbuf, cfg->total_bytes);
-         break;
        case VCL_TEST_TYPE_UNI:
        case VCL_TEST_TYPE_BI:
          for (j = 0; j < ts->txbuf_size; j++)
            ts->txbuf[j] = j & 0xff;
          break;
+       default:
+         break;
        }
 
       FD_SET (vppcom_session_index (ts->fd), &wrk->wr_fdset);
@@ -337,9 +212,6 @@ static int
 vtc_worker_init (vcl_test_client_worker_t * wrk)
 {
   vcl_test_client_main_t *vcm = &vcl_client_main;
-  vcl_test_cfg_t *cfg = &wrk->cfg;
-  vcl_test_session_t *ts;
-  uint32_t n;
   int rv;
 
   __wrk_index = wrk->wrk_index;
@@ -365,16 +237,6 @@ vtc_worker_init (vcl_test_client_worker_t * wrk)
   if (vtc_worker_test_setup (wrk))
     return -1;
 
-  vtinf ("Sending config to server on all sessions ...");
-
-  for (n = 0; n < cfg->num_test_sessions; n++)
-    {
-      ts = &wrk->sessions[n];
-      if (vtc_cfg_sync (ts))
-       return -1;
-      memset (&ts->stats, 0, sizeof (ts->stats));
-    }
-
   return 0;
 }
 
@@ -418,35 +280,41 @@ vtc_accumulate_stats (vcl_test_client_worker_t * wrk,
 static void
 vtc_worker_sessions_exit (vcl_test_client_worker_t * wrk)
 {
-  vcl_test_client_main_t *vcm = &vcl_client_main;
-  vcl_test_session_t *ctrl = &vcm->ctrl_session;
   vcl_test_session_t *ts;
-  int i, verbose = ctrl->cfg.verbose;
+  int i;
 
   for (i = 0; i < wrk->cfg.num_test_sessions; i++)
     {
       ts = &wrk->sessions[i];
-      ts->cfg.test = VCL_TEST_TYPE_EXIT;
-
-      if (verbose)
-       {
-         vtinf ("(fd %d): Sending exit cfg to server...", ts->fd);
-         vcl_test_cfg_dump (&ts->cfg, 1 /* is_client */ );
-       }
-      (void) vcl_test_write (ts->fd, (uint8_t *) & ts->cfg,
-                            sizeof (ts->cfg), &ts->stats, verbose);
+      vppcom_session_close (ts->fd);
+      vcl_test_session_buf_free (ts);
     }
 
   wrk->n_sessions = 0;
 }
 
+static void
+vtc_inc_stats_check (vcl_test_session_t *ts)
+{
+  /* Avoid checking time too often because of syscall cost */
+  if (ts->stats.tx_bytes - ts->old_stats.tx_bytes < 1 << 20)
+    return;
+
+  clock_gettime (CLOCK_REALTIME, &ts->stats.stop);
+  if (vcl_test_time_diff (&ts->old_stats.stop, &ts->stats.stop) > 1)
+    {
+      vcl_test_stats_dump_inc (&ts->old_stats, &ts->stats);
+      ts->old_stats = ts->stats;
+    }
+}
+
 static void *
 vtc_worker_loop (void *arg)
 {
   vcl_test_client_main_t *vcm = &vcl_client_main;
   vcl_test_session_t *ctrl = &vcm->ctrl_session;
   vcl_test_client_worker_t *wrk = arg;
-  uint32_t n_active_sessions, n_bytes;
+  uint32_t n_active_sessions;
   fd_set _wfdset, *wfdset = &_wfdset;
   fd_set _rfdset, *rfdset = &_rfdset;
   vcl_test_session_t *ts;
@@ -466,7 +334,7 @@ vtc_worker_loop (void *arg)
 
   check_rx = wrk->cfg.test != VCL_TEST_TYPE_UNI;
   n_active_sessions = wrk->cfg.num_test_sessions;
-  while (n_active_sessions)
+  while (n_active_sessions && vcm->test_running)
     {
       _wfdset = wrk->wr_fdset;
       _rfdset = wrk->rd_fdset;
@@ -484,44 +352,42 @@ vtc_worker_loop (void *arg)
       for (i = 0; i < wrk->cfg.num_test_sessions; i++)
        {
          ts = &wrk->sessions[i];
-         if (!((ts->stats.stop.tv_sec == 0) &&
-               (ts->stats.stop.tv_nsec == 0)))
+         if (ts->is_done)
            continue;
 
          if (FD_ISSET (vppcom_session_index (ts->fd), rfdset)
              && ts->stats.rx_bytes < ts->cfg.total_bytes)
            {
-             (void) vcl_test_read (ts->fd, (uint8_t *) ts->rxbuf,
-                                   ts->rxbuf_size, &ts->stats);
+             rv = ts->read (ts, ts->rxbuf, ts->rxbuf_size);
+             if (rv < 0)
+               goto exit;
            }
 
          if (FD_ISSET (vppcom_session_index (ts->fd), wfdset)
              && ts->stats.tx_bytes < ts->cfg.total_bytes)
            {
-             n_bytes = ts->cfg.txbuf_size;
-             if (ts->cfg.test == VCL_TEST_TYPE_ECHO)
-               n_bytes = strlen (ctrl->txbuf) + 1;
-             rv = vcl_test_write (ts->fd, (uint8_t *) ts->txbuf,
-                                  n_bytes, &ts->stats, ts->cfg.verbose);
+             rv = ts->write (ts, ts->txbuf, ts->cfg.txbuf_size);
              if (rv < 0)
                {
                  vtwrn ("vppcom_test_write (%d) failed -- aborting test",
                         ts->fd);
                  goto exit;
                }
+             if (vcm->incremental_stats)
+               vtc_inc_stats_check (ts);
            }
          if ((!check_rx && ts->stats.tx_bytes >= ts->cfg.total_bytes)
              || (check_rx && ts->stats.rx_bytes >= ts->cfg.total_bytes))
            {
              clock_gettime (CLOCK_REALTIME, &ts->stats.stop);
+             ts->is_done = 1;
              n_active_sessions--;
            }
        }
     }
 exit:
   vtinf ("Worker %d done ...", wrk->wrk_index);
-  if (wrk->cfg.test != VCL_TEST_TYPE_ECHO)
-    vtc_accumulate_stats (wrk, ctrl);
+  vtc_accumulate_stats (wrk, ctrl);
   sleep (VCL_TEST_DELAY_DISCONNECT);
   vtc_worker_sessions_exit (wrk);
   if (wrk->wrk_index)
@@ -570,25 +436,21 @@ vtc_print_stats (vcl_test_session_t * ctrl)
 static void
 vtc_echo_client (vcl_test_client_main_t * vcm)
 {
-  vcl_test_client_worker_t *wrk;
   vcl_test_session_t *ctrl = &vcm->ctrl_session;
   vcl_test_cfg_t *cfg = &ctrl->cfg;
+  int rv;
 
   cfg->total_bytes = strlen (ctrl->txbuf) + 1;
   memset (&ctrl->stats, 0, sizeof (ctrl->stats));
 
-  /* Echo works with only one worker */
-  wrk = vcm->workers;
-  wrk->wrk_index = 0;
-  wrk->cfg = *cfg;
-
-  vtc_worker_loop (wrk);
+  rv = ctrl->write (ctrl, ctrl->txbuf, cfg->total_bytes);
+  if (rv < 0)
+    {
+      vtwrn ("vppcom_test_write (%d) failed ", ctrl->fd);
+      return;
+    }
 
-  /* Not relevant for echo test
-     clock_gettime (CLOCK_REALTIME, &ctrl->stats.stop);
-     vtc_accumulate_stats (wrk, ctrl);
-     vtc_print_stats (ctrl);
-   */
+  (void) ctrl->read (ctrl, ctrl->rxbuf, ctrl->rxbuf_size);
 }
 
 static void
@@ -602,15 +464,9 @@ vtc_stream_client (vcl_test_client_main_t * vcm)
   vtinf ("%s-directional Stream Test Starting!",
         ctrl->cfg.test == VCL_TEST_TYPE_BI ? "Bi" : "Uni");
 
+  memset (&ctrl->stats, 0, sizeof (vcl_test_stats_t));
   cfg->total_bytes = cfg->num_writes * cfg->txbuf_size;
-  cfg->ctrl_handle = ~0;
-  if (vtc_cfg_sync (ctrl))
-    {
-      vtwrn ("test cfg sync failed -- aborting!");
-      return;
-    }
-  cfg->ctrl_handle = ((vcl_test_cfg_t *) ctrl->rxbuf)->ctrl_handle;
-  memset (&ctrl->stats, 0, sizeof (ctrl->stats));
+  cfg->ctrl_handle = ctrl->fd;
 
   n_conn = cfg->num_test_sessions;
   n_conn_per_wrk = n_conn / vcm->n_workers;
@@ -623,6 +479,14 @@ vtc_stream_client (vcl_test_client_main_t * vcm)
       n_conn -= wrk->cfg.num_test_sessions;
     }
 
+  vcm->test_running = 1;
+  ctrl->cfg.cmd = VCL_TEST_CMD_START;
+  if (vtc_cfg_sync (ctrl))
+    {
+      vtwrn ("test cfg sync failed -- aborting!");
+      return;
+    }
+
   for (i = 1; i < vcm->n_workers; i++)
     {
       wrk = &vcm->workers[i];
@@ -635,6 +499,7 @@ vtc_stream_client (vcl_test_client_main_t * vcm)
     ;
 
   vtinf ("Sending config on ctrl session (fd %d) for stats...", ctrl->fd);
+  ctrl->cfg.cmd = VCL_TEST_CMD_STOP;
   if (vtc_cfg_sync (ctrl))
     {
       vtwrn ("test cfg sync failed -- aborting!");
@@ -643,6 +508,7 @@ vtc_stream_client (vcl_test_client_main_t * vcm)
 
   vtc_print_stats (ctrl);
 
+  ctrl->cfg.cmd = VCL_TEST_CMD_SYNC;
   ctrl->cfg.test = VCL_TEST_TYPE_ECHO;
   ctrl->cfg.total_bytes = 0;
   if (vtc_cfg_sync (ctrl))
@@ -798,27 +664,29 @@ parse_input ()
 void
 print_usage_and_exit (void)
 {
-  fprintf (stderr,
-          "vcl_test_client [OPTIONS] <ipaddr> <port>\n"
-          "  OPTIONS\n"
-          "  -h               Print this message and exit.\n"
-          "  -6               Use IPv6\n"
-          "  -c               Print test config before test.\n"
-          "  -w <dir>         Write test results to <dir>.\n"
-          "  -X               Exit after running test.\n"
-          "  -p <proto>       Use <proto> transport layer\n"
-          "  -D               Use UDP transport layer\n"
-          "  -L               Use TLS transport layer\n"
-          "  -E               Run Echo test.\n"
-          "  -N <num-writes>  Test Cfg: number of writes.\n"
-          "  -R <rxbuf-size>  Test Cfg: rx buffer size.\n"
-          "  -T <txbuf-size>  Test Cfg: tx buffer size.\n"
-          "  -U               Run Uni-directional test.\n"
-          "  -B               Run Bi-directional test.\n"
-          "  -V               Verbose mode.\n"
-          "  -I <N>           Use N sessions.\n"
-          "  -s <N>           Use N sessions.\n"
-          "  -q <n>           QUIC : use N Ssessions on top of n Qsessions\n");
+  fprintf (
+    stderr,
+    "vcl_test_client [OPTIONS] <ipaddr> <port>\n"
+    "  OPTIONS\n"
+    "  -h               Print this message and exit.\n"
+    "  -6               Use IPv6\n"
+    "  -c               Print test config before test.\n"
+    "  -w <dir>         Write test results to <dir>.\n"
+    "  -X               Exit after running test.\n"
+    "  -p <proto>       Use <proto> transport layer\n"
+    "  -D               Use UDP transport layer\n"
+    "  -L               Use TLS transport layer\n"
+    "  -E               Run Echo test.\n"
+    "  -N <num-writes>  Test Cfg: number of writes.\n"
+    "  -R <rxbuf-size>  Test Cfg: rx buffer size.\n"
+    "  -T <txbuf-size>  Test Cfg: tx buffer size.\n"
+    "  -U               Run Uni-directional test.\n"
+    "  -B               Run Bi-directional test.\n"
+    "  -V               Verbose mode.\n"
+    "  -I <N>           Use N sessions.\n"
+    "  -s <N>           Use N sessions.\n"
+    "  -S              Print incremental stats per session.\n"
+    "  -q <n>           QUIC : use N Ssessions on top of n Qsessions\n");
   exit (1);
 }
 
@@ -829,7 +697,7 @@ vtc_process_opts (vcl_test_client_main_t * vcm, int argc, char **argv)
   int c, v;
 
   opterr = 0;
-  while ((c = getopt (argc, argv, "chnp:w:XE:I:N:R:T:UBV6DLs:q:")) != -1)
+  while ((c = getopt (argc, argv, "chnp:w:XE:I:N:R:T:UBV6DLs:q:S")) != -1)
     switch (c)
       {
       case 'c':
@@ -984,6 +852,10 @@ vtc_process_opts (vcl_test_client_main_t * vcm, int argc, char **argv)
        vcm->proto = VPPCOM_PROTO_TLS;
        break;
 
+      case 'S':
+       vcm->incremental_stats = 1;
+       break;
+
       case '?':
        switch (optopt)
          {
@@ -1073,14 +945,74 @@ vtc_ctrl_session_exit (void)
   int verbose = ctrl->cfg.verbose;
 
   ctrl->cfg.test = VCL_TEST_TYPE_EXIT;
+  vtinf ("(fd %d): Sending exit cfg to server...", ctrl->fd);
   if (verbose)
+    vcl_test_cfg_dump (&ctrl->cfg, 1 /* is_client */);
+  (void) vcl_test_write (ctrl, (uint8_t *) &ctrl->cfg, sizeof (ctrl->cfg));
+  sleep (1);
+}
+
+static int
+vtc_ctrl_session_init (vcl_test_client_main_t *vcm, vcl_test_session_t *ctrl)
+{
+  int rv;
+
+  ctrl->fd = vppcom_session_create (VPPCOM_PROTO_TCP, 0 /* is_nonblocking */);
+  if (ctrl->fd < 0)
     {
-      vtinf ("(fd %d): Sending exit cfg to server...", ctrl->fd);
-      vcl_test_cfg_dump (&ctrl->cfg, 1 /* is_client */ );
+      vterr ("vppcom_session_create()", ctrl->fd);
+      return ctrl->fd;
+    }
+
+  vtinf ("Connecting to server...");
+  rv = vppcom_session_connect (ctrl->fd, &vcm->server_endpt);
+  if (rv)
+    {
+      vterr ("vppcom_session_connect()", rv);
+      return rv;
+    }
+  vtinf ("Control session (fd %d) connected.", ctrl->fd);
+
+  ctrl->read = vcl_test_read;
+  ctrl->write = vcl_test_write;
+
+  ctrl->cfg.cmd = VCL_TEST_CMD_SYNC;
+  rv = vtc_cfg_sync (ctrl);
+  if (rv)
+    {
+      vterr ("vtc_cfg_sync()", rv);
+      return rv;
+    }
+
+  ctrl->cfg.ctrl_handle = ((vcl_test_cfg_t *) ctrl->rxbuf)->ctrl_handle;
+  memset (&ctrl->stats, 0, sizeof (ctrl->stats));
+
+  return 0;
+}
+
+static void
+vt_sigs_handler (int signum, siginfo_t *si, void *uc)
+{
+  vcl_test_client_main_t *vcm = &vcl_client_main;
+  vcl_test_session_t *ctrl = &vcm->ctrl_session;
+
+  vcm->test_running = 0;
+  clock_gettime (CLOCK_REALTIME, &ctrl->stats.stop);
+}
+
+static void
+vt_incercept_sigs (void)
+{
+  struct sigaction sa;
+
+  memset (&sa, 0, sizeof (sa));
+  sa.sa_sigaction = vt_sigs_handler;
+  sa.sa_flags = SA_SIGINFO;
+  if (sigaction (SIGINT, &sa, 0))
+    {
+      vtwrn ("couldn't intercept sigint");
+      exit (-1);
     }
-  (void) vcl_test_write (ctrl->fd, (uint8_t *) & ctrl->cfg,
-                        sizeof (ctrl->cfg), &ctrl->stats, verbose);
-  sleep (1);
 }
 
 int
@@ -1088,69 +1020,31 @@ main (int argc, char **argv)
 {
   vcl_test_client_main_t *vcm = &vcl_client_main;
   vcl_test_session_t *ctrl = &vcm->ctrl_session;
-  vcl_test_session_t *quic_session = &vcm->quic_session;
+  vcl_test_main_t *vt = &vcl_test_main;
   int rv;
 
   vcm->n_workers = 1;
   vcl_test_cfg_init (&ctrl->cfg);
   vcl_test_session_buf_alloc (ctrl);
   vtc_process_opts (vcm, argc, argv);
+  vt_incercept_sigs ();
 
   vcm->workers = calloc (vcm->n_workers, sizeof (vcl_test_client_worker_t));
+  vt->wrk = calloc (vcm->n_workers, sizeof (vcl_test_wrk_t));
+
   rv = vppcom_app_create ("vcl_test_client");
   if (rv < 0)
     vtfail ("vppcom_app_create()", rv);
 
-  ctrl->fd = vppcom_session_create (vcm->proto, 0 /* is_nonblocking */ );
-  if (ctrl->fd < 0)
-    vtfail ("vppcom_session_create()", ctrl->fd);
-
-  if (vcm->proto == VPPCOM_PROTO_TLS || vcm->proto == VPPCOM_PROTO_QUIC)
-    {
-      vppcom_cert_key_pair_t ckpair;
-      uint32_t ckp_len;
-      int ckp_index;
-
-      vtinf ("Adding tls certs ...");
-      ckpair.cert = vcl_test_crt_rsa;
-      ckpair.key = vcl_test_key_rsa;
-      ckpair.cert_len = vcl_test_crt_rsa_len;
-      ckpair.key_len = vcl_test_key_rsa_len;
-      ckp_index = vppcom_add_cert_key_pair (&ckpair);
-      if (ckp_index < 0)
-       vtfail ("vppcom_add_cert_key_pair()", ckp_index);
-
-      vcm->ckpair_index = ckp_index;
-      ckp_len = sizeof (ckp_index);
-      vppcom_session_attr (ctrl->fd, VPPCOM_ATTR_SET_CKPAIR, &ckp_index,
-                          &ckp_len);
-    }
-
-  vtinf ("Connecting to server...");
-  if (vcm->proto == VPPCOM_PROTO_QUIC)
-    {
-      quic_session->fd = vppcom_session_create (vcm->proto,
-                                               0 /* is_nonblocking */ );
-      if (quic_session->fd < 0)
-       vtfail ("vppcom_session_create()", quic_session->fd);
-      rv = vppcom_session_connect (quic_session->fd, &vcm->server_endpt);
-      if (rv)
-       vtfail ("vppcom_session_connect()", rv);
-      vtinf ("Connecting to stream...");
-      rv = vppcom_session_stream_connect (ctrl->fd, quic_session->fd);
-    }
-  else
-    rv = vppcom_session_connect (ctrl->fd, &vcm->server_endpt);
-  if (rv)
-    vtfail ("vppcom_session_connect()", rv);
-  vtinf ("Control session (fd %d) connected.", ctrl->fd);
+  /* Protos like tls/dtls/quic need init */
+  if (vt->protos[vcm->proto]->init)
+    vt->protos[vcm->proto]->init (&ctrl->cfg);
 
-  rv = vtc_cfg_sync (ctrl);
-  if (rv)
-    vtfail ("vtc_cfg_sync()", rv);
+  if ((rv = vtc_ctrl_session_init (vcm, ctrl)))
+    vtfail ("vppcom_session_create() ctrl session", rv);
 
-  ctrl->cfg.ctrl_handle = ((vcl_test_cfg_t *) ctrl->rxbuf)->ctrl_handle;
-  memset (&ctrl->stats, 0, sizeof (ctrl->stats));
+  /* Update ctrl port to data port */
+  vcm->server_endpt.port += 1;
 
   while (ctrl->cfg.test != VCL_TEST_TYPE_EXIT)
     {
@@ -1211,8 +1105,6 @@ main (int argc, char **argv)
     }
 
   vtc_ctrl_session_exit ();
-  if (quic_session)
-    vppcom_session_close (quic_session->fd);
   vppcom_app_destroy ();
   free (vcm->workers);
   return 0;