session: basic fifo-tuning-logic 72/24472/26
authorRyujiro Shibuya <ryujiro.shibuya@owmobility.com>
Wed, 22 Jan 2020 12:11:42 +0000 (12:11 +0000)
committerDave Barach <openvpp@barachs.net>
Tue, 25 Feb 2020 19:18:49 +0000 (19:18 +0000)
- Allowing application to register custom fifo-tuning-logic.
- Adding an example custom fifo-tuning-logic in hs_app/proxy.

Type: feature

Signed-off-by: Ryujiro Shibuya <ryujiro.shibuya@owmobility.com>
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I2aca14d1f23d5c3c9debb7f4c46aca3a15a8d1b9

src/plugins/hs_apps/proxy.c
src/plugins/hs_apps/proxy.h
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_interface.h
src/vnet/session/application_worker.c
src/vnet/session/segment_manager.c
src/vnet/session/segment_manager.h
src/vnet/session/session.c
src/vnet/session/session_types.h

index d0e3bc4..ca46aef 100644 (file)
@@ -140,6 +140,50 @@ delete_proxy_session (session_t * s, int is_active_open)
   clib_spinlock_unlock_if_init (&pm->sessions_lock);
 }
 
+static int
+common_fifo_tuning_callback (session_t * s, svm_fifo_t * f,
+                            session_ft_action_t act, u32 bytes)
+{
+  proxy_main_t *pm = &proxy_main;
+
+  segment_manager_t *sm = segment_manager_get (f->segment_manager);
+  fifo_segment_t *fs = segment_manager_get_segment (sm, f->segment_index);
+
+  u8 seg_usage = fifo_segment_get_mem_usage (fs);
+  u32 fifo_in_use = svm_fifo_max_dequeue_prod (f);
+  u32 fifo_size = svm_fifo_size (f);
+  u8 fifo_usage = fifo_in_use * 100 / fifo_size;
+  u8 update_size = 0;
+
+  ASSERT (act < SESSION_FT_ACTION_N_ACTIONS);
+
+  if (act == SESSION_FT_ACTION_ENQUEUED)
+    {
+      if (seg_usage < pm->low_watermark && fifo_usage > 50)
+       update_size = fifo_in_use;
+      else if (seg_usage < pm->high_watermark && fifo_usage > 80)
+       update_size = fifo_in_use;
+
+      update_size = clib_min (update_size, sm->max_fifo_size - fifo_size);
+      if (update_size)
+       svm_fifo_set_size (f, fifo_size + update_size);
+    }
+  else                         /* dequeued */
+    {
+      if (seg_usage > pm->high_watermark || fifo_usage < 20)
+       update_size = bytes;
+      else if (seg_usage > pm->low_watermark && fifo_usage < 50)
+       update_size = (bytes / 2);
+
+      ASSERT (fifo_size >= 4096);
+      update_size = clib_min (update_size, fifo_size - 4096);
+      if (update_size)
+       svm_fifo_set_size (f, fifo_size - update_size);
+    }
+
+  return 0;
+}
+
 static int
 proxy_accept_callback (session_t * s)
 {
@@ -313,7 +357,8 @@ static session_cb_vft_t proxy_session_cb_vft = {
   .add_segment_callback = proxy_add_segment_callback,
   .builtin_app_rx_callback = proxy_rx_callback,
   .builtin_app_tx_callback = proxy_tx_callback,
-  .session_reset_callback = proxy_reset_callback
+  .session_reset_callback = proxy_reset_callback,
+  .fifo_tuning_callback = common_fifo_tuning_callback
 };
 
 static int
@@ -468,6 +513,7 @@ static session_cb_vft_t active_open_clients = {
   .session_disconnect_callback = active_open_disconnect_callback,
   .builtin_app_rx_callback = active_open_rx_callback,
   .builtin_app_tx_callback = active_open_tx_callback,
+  .fifo_tuning_callback = common_fifo_tuning_callback
 };
 /* *INDENT-ON* */
 
@@ -491,6 +537,9 @@ proxy_server_attach ()
   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
+  a->options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
+  a->options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
+  a->options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
@@ -513,7 +562,7 @@ active_open_attach (void)
 {
   proxy_main_t *pm = &proxy_main;
   vnet_app_attach_args_t _a, *a = &_a;
-  u64 options[16];
+  u64 options[APP_OPTIONS_N_OPTIONS];
 
   clib_memset (a, 0, sizeof (*a));
   clib_memset (options, 0, sizeof (options));
@@ -526,6 +575,9 @@ active_open_attach (void)
   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
+  options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
+  options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
+  options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
@@ -607,10 +659,13 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
   proxy_main_t *pm = &proxy_main;
   char *default_server_uri = "tcp://0.0.0.0/23";
   char *default_client_uri = "tcp://6.0.2.2/23";
-  int rv;
-  u64 tmp;
+  int rv, tmp32;
+  u64 tmp64;
 
   pm->fifo_size = 64 << 10;
+  pm->max_fifo_size = 128 << 20;
+  pm->high_watermark = 80;
+  pm->low_watermark = 50;
   pm->rcv_buffer_size = 1024;
   pm->prealloc_fifos = 0;
   pm->private_segment_count = 0;
@@ -620,8 +675,16 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
     {
-      if (unformat (input, "fifo-size %d", &pm->fifo_size))
-       pm->fifo_size <<= 10;
+      if (unformat (input, "fifo-size %U",
+                   unformat_memory_size, &pm->fifo_size))
+       ;
+      else if (unformat (input, "max-fifo-size %U",
+                        unformat_memory_size, &pm->max_fifo_size))
+       ;
+      else if (unformat (input, "high-watermark %d", &tmp32))
+       pm->high_watermark = (u8) tmp32;
+      else if (unformat (input, "low-watermark %d", &tmp32))
+       pm->low_watermark = (u8) tmp32;
       else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
        ;
       else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
@@ -630,12 +693,12 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
                         &pm->private_segment_count))
        ;
       else if (unformat (input, "private-segment-size %U",
-                        unformat_memory_size, &tmp))
+                        unformat_memory_size, &tmp64))
        {
-         if (tmp >= 0x100000000ULL)
+         if (tmp64 >= 0x100000000ULL)
            return clib_error_return
-             (0, "private segment size %lld (%llu) too large", tmp, tmp);
-         pm->private_segment_size = tmp;
+             (0, "private segment size %lld (%llu) too large", tmp64, tmp64);
+         pm->private_segment_size = tmp64;
        }
       else if (unformat (input, "server-uri %s", &pm->server_uri))
        vec_add1 (pm->server_uri, 0);
@@ -678,9 +741,10 @@ VLIB_CLI_COMMAND (proxy_create_command, static) =
 {
   .path = "test proxy server",
   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
-      "[client-uri <tcp://ip/port>][fifo-size <nn>][rcv-buf-size <nn>]"
-      "[prealloc-fifos <nn>][private-segment-size <mem>]"
-      "[private-segment-count <nn>]",
+      "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
+      "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
+      "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
+      "[private-segment-size <mem>][private-segment-count <nn>]",
   .function = proxy_server_create_command_fn,
 };
 /* *INDENT-ON* */
index 9b80b3d..87a9618 100644 (file)
@@ -57,7 +57,10 @@ typedef struct
    */
   u8 *connect_uri;                     /**< URI for slave's connect */
   u32 configured_segment_size;
-  u32 fifo_size;
+  u32 fifo_size;                       /**< initial fifo size */
+  u32 max_fifo_size;                   /**< max fifo size */
+  u8 high_watermark;                   /**< high watermark (%) */
+  u8 low_watermark;                    /**< low watermark (%) */
   u32 private_segment_count;           /**< Number of private fifo segs */
   u32 private_segment_size;            /**< size of private fifo segs */
   int rcv_buffer_size;
index 0dcb3e6..7777c72 100644 (file)
@@ -550,6 +550,8 @@ application_alloc_and_init (app_init_args_t * a)
     props->use_mq_eventfd = 1;
   if (options[APP_OPTIONS_TLS_ENGINE])
     app->tls_engine = options[APP_OPTIONS_TLS_ENGINE];
+  if (options[APP_OPTIONS_MAX_FIFO_SIZE])
+    props->max_fifo_size = options[APP_OPTIONS_MAX_FIFO_SIZE];
   if (options[APP_OPTIONS_HIGH_WATERMARK])
     props->high_watermark = options[APP_OPTIONS_HIGH_WATERMARK];
   if (options[APP_OPTIONS_LOW_WATERMARK])
index 9943576..90b5a5a 100644 (file)
@@ -261,6 +261,9 @@ int app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
                               session_handle_t new_sh);
 int app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s);
 int app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s);
+int app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
+                                   svm_fifo_t * f,
+                                   session_ft_action_t act, u32 len);
 segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
                                                          session_t *);
 segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *);
index d03994f..2ecabb0 100644 (file)
@@ -68,6 +68,10 @@ typedef struct session_cb_vft_
   /** Cert and key pair delete notification */
   int (*app_cert_key_pair_delete_callback) (app_cert_key_pair_t * ckpair);
 
+  /** Delegate fifo-tuning-logic to application */
+  int (*fifo_tuning_callback) (session_t * s, svm_fifo_t * f,
+                              session_ft_action_t act, u32 bytes);
+
 } session_cb_vft_t;
 
 #define foreach_app_init_args                  \
@@ -201,6 +205,7 @@ typedef enum
   APP_OPTIONS_PROXY_TRANSPORT,
   APP_OPTIONS_ACCEPT_COOKIE,
   APP_OPTIONS_TLS_ENGINE,
+  APP_OPTIONS_MAX_FIFO_SIZE,
   APP_OPTIONS_HIGH_WATERMARK,
   APP_OPTIONS_LOW_WATERMARK,
   APP_OPTIONS_N_OPTIONS
index 0b67d29..1793998 100644 (file)
@@ -275,11 +275,16 @@ app_worker_init_accepted (session_t * s)
   app_worker_t *app_wrk;
   segment_manager_t *sm;
   session_t *listener;
+  application_t *app;
 
   listener = listen_session_get_from_handle (s->listener_handle);
   app_wrk = application_listener_select_worker (listener);
   s->app_wrk_index = app_wrk->wrk_index;
 
+  app = application_get (app_wrk->app_index);
+  if (app->cb_fns.fifo_tuning_callback)
+    s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
+
   sm = app_worker_get_listen_segment_manager (app_wrk, listener);
   if (app_worker_alloc_session_fifos (sm, s))
     return -1;
@@ -307,6 +312,10 @@ app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
       if (app_worker_alloc_session_fifos (sm, s))
        return -1;
     }
+
+  if (app->cb_fns.fifo_tuning_callback)
+    s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
+
   return 0;
 }
 
@@ -432,6 +441,15 @@ app_worker_connect_session (app_worker_t * app, session_endpoint_t * sep,
   return 0;
 }
 
+int
+app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
+                               svm_fifo_t * f,
+                               session_ft_action_t act, u32 len)
+{
+  application_t *app = application_get (app_wrk->app_index);
+  return app->cb_fns.fifo_tuning_callback (s, f, act, len);
+}
+
 int
 app_worker_alloc_connects_segment_manager (app_worker_t * app_wrk)
 {
index 25fbd6f..eb3f63c 100644 (file)
@@ -26,11 +26,12 @@ typedef struct segment_manager_main_
   /*
    * Configuration
    */
-  u32 default_fifo_size;               /**< default rx/tx fifo size */
-  u32 default_segment_size;            /**< default fifo segment size */
-  u32 default_app_mq_size;             /**< default app msg q size */
-  u8 default_high_watermark;           /**< default high watermark % */
-  u8 default_low_watermark;            /**< default low watermark % */
+  u32 default_fifo_size;       /**< default rx/tx fifo size */
+  u32 default_segment_size;    /**< default fifo segment size */
+  u32 default_app_mq_size;     /**< default app msg q size */
+  u32 default_max_fifo_size;   /**< default max fifo size */
+  u8 default_high_watermark;   /**< default high watermark % */
+  u8 default_low_watermark;    /**< default low watermark % */
 } segment_manager_main_t;
 
 static segment_manager_main_t sm_main;
@@ -56,6 +57,7 @@ segment_manager_props_init (segment_manager_props_t * props)
   props->rx_fifo_size = sm_main.default_fifo_size;
   props->tx_fifo_size = sm_main.default_fifo_size;
   props->evt_q_size = sm_main.default_app_mq_size;
+  props->max_fifo_size = sm_main.default_max_fifo_size;
   props->high_watermark = sm_main.default_high_watermark;
   props->low_watermark = sm_main.default_low_watermark;
   props->n_slices = vlib_num_workers () + 1;
@@ -335,6 +337,10 @@ segment_manager_init (segment_manager_t * sm)
                             sm_main.default_segment_size);
   prealloc_fifo_pairs = props->prealloc_fifos;
 
+  sm->max_fifo_size = props->max_fifo_size ?
+    props->max_fifo_size : sm_main.default_max_fifo_size;
+  sm->max_fifo_size = clib_max (sm->max_fifo_size, 4096);
+
   segment_manager_set_watermarks (sm,
                                  props->high_watermark,
                                  props->low_watermark);
@@ -832,6 +838,7 @@ segment_manager_main_init (segment_manager_main_init_args_t * a)
   sm->default_fifo_size = 1 << 12;
   sm->default_segment_size = 1 << 20;
   sm->default_app_mq_size = 128;
+  sm->default_max_fifo_size = 4 << 20;
   sm->default_high_watermark = 80;
   sm->default_low_watermark = 50;
 }
@@ -844,6 +851,9 @@ segment_manager_show_fn (vlib_main_t * vm, unformat_input_t * input,
   u8 show_segments = 0, verbose = 0;
   segment_manager_t *sm;
   fifo_segment_t *seg;
+  app_worker_t *app_wrk;
+  application_t *app;
+  u8 custom_logic;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
     {
@@ -859,13 +869,22 @@ segment_manager_show_fn (vlib_main_t * vm, unformat_input_t * input,
                   pool_elts (smm->segment_managers));
   if (verbose && pool_elts (smm->segment_managers))
     {
-      vlib_cli_output (vm, "%-10s%=15s%=12s", "Index", "App Index",
-                      "Segments");
+      vlib_cli_output (vm, "%-6s%=10s%=10s%=13s%=11s%=11s%=12s",
+                      "Index", "AppIndex", "Segments", "MaxFifoSize",
+                      "HighWater", "LowWater", "FifoTuning");
 
       /* *INDENT-OFF* */
       pool_foreach (sm, smm->segment_managers, ({
-       vlib_cli_output (vm, "%-10d%=15d%=12d", segment_manager_index (sm),
-                          sm->app_wrk_index, pool_elts (sm->segments));
+        app_wrk = app_worker_get_if_valid (sm->app_wrk_index);
+        app = app_wrk ? application_get (app_wrk->app_index) : 0;
+        custom_logic = (app && (app->cb_fns.fifo_tuning_callback)) ? 1 : 0;
+
+       vlib_cli_output (vm, "%-6d%=10d%=10d%=13U%=11d%=11d%=12s",
+                         segment_manager_index (sm),
+                        sm->app_wrk_index, pool_elts (sm->segments),
+                         format_memory_size, sm->max_fifo_size,
+                         sm->high_watermark, sm->low_watermark,
+                         custom_logic ? "custom" : "none");
       }));
       /* *INDENT-ON* */
 
index cad9550..688711a 100644 (file)
@@ -19,6 +19,7 @@
 #include <vppinfra/lock.h>
 #include <vppinfra/valloc.h>
 #include <svm/fifo_segment.h>
+#include <vnet/session/session_types.h>
 
 typedef struct _segment_manager_props
 {
@@ -34,6 +35,7 @@ typedef struct _segment_manager_props
   u8 n_slices;                         /**< number of fs slices/threads */
   ssvm_segment_type_t segment_type;    /**< seg type: if set to SSVM_N_TYPES,
                                             private segments are used */
+  u32 max_fifo_size;                   /**< max fifo size */
   u8 high_watermark;                   /**< memory usage high watermark % */
   u8 low_watermark;                    /**< memory usage low watermark % */
 } segment_manager_props_t;
@@ -61,6 +63,7 @@ typedef struct _segment_manager
    */
   svm_msg_q_t *event_queue;
 
+  u32 max_fifo_size;
   u8 high_watermark;
   u8 low_watermark;
 } segment_manager_t;
index b006cfa..e9cda36 100644 (file)
@@ -396,6 +396,24 @@ session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
   return 0;
 }
 
+void
+session_fifo_tuning (session_t * s, svm_fifo_t * f,
+                    session_ft_action_t act, u32 len)
+{
+  if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
+    {
+      app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
+      app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
+      if (CLIB_ASSERT_ENABLE)
+       {
+         segment_manager_t *sm;
+         sm = segment_manager_get (f->segment_manager);
+         ASSERT (f->size >= 4096);
+         ASSERT (f->size <= sm->max_fifo_size);
+       }
+    }
+}
+
 /*
  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
  * event but on request can queue notification events for later delivery by
@@ -458,6 +476,8 @@ session_enqueue_stream_connection (transport_connection_t * tc,
          s->flags |= SESSION_F_RX_EVT;
          vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
        }
+
+      session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
     }
 
   return enqueued;
@@ -495,6 +515,8 @@ session_enqueue_dgram_connection (session_t * s,
          s->flags |= SESSION_F_RX_EVT;
          vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
        }
+
+      session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
     }
   return enqueued;
 }
@@ -514,6 +536,7 @@ session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
   u32 rv;
 
   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
+  session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
 
   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
     session_dequeue_notify (s);
@@ -674,6 +697,9 @@ session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
          continue;
        }
 
+      session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
+                          0 /* TODO/not needed */ );
+
       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
        errors++;
     }
index be8ad9c..aa0e865 100644 (file)
@@ -123,6 +123,13 @@ typedef enum
   SESSION_CLEANUP_SESSION,
 } session_cleanup_ntf_t;
 
+typedef enum session_ft_action_
+{
+  SESSION_FT_ACTION_ENQUEUED,
+  SESSION_FT_ACTION_DEQUEUED,
+  SESSION_FT_ACTION_N_ACTIONS
+} session_ft_action_t;
+
 /*
  * Session states
  */
@@ -154,6 +161,7 @@ typedef enum
   _(CUSTOM_TX, "custom-tx")                            \
   _(IS_MIGRATING, "migrating")                         \
   _(UNIDIRECTIONAL, "unidirectional")                  \
+  _(CUSTOM_FIFO_TUNING, "custom-fifo-tuning")          \
 
 typedef enum session_flags_bits_
 {