X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fhs_apps%2Fproxy.c;h=ca46aef2ae0d7f66e8eb65f92df9bea279c89145;hb=d8f48e2166747b529aa23762ac314dd686590c89;hp=c4ddd67643638dd0082bad949009e4fedada0124;hpb=39d69112fcec114fde34955ceb41555221d3ba11;p=vpp.git diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c index c4ddd676436..ca46aef2ae0 100644 --- a/src/plugins/hs_apps/proxy.c +++ b/src/plugins/hs_apps/proxy.c @@ -18,9 +18,12 @@ #include #include #include +#include proxy_main_t proxy_main; +#define TCP_MSS 1460 + typedef struct { char uri[128]; @@ -69,13 +72,12 @@ delete_proxy_session (session_t * s, int is_active_open) uword *p; u64 handle; + clib_spinlock_lock_if_init (&pm->sessions_lock); + handle = session_handle (s); - clib_spinlock_lock_if_init (&pm->sessions_lock); if (is_active_open) { - active_open_session = s; - p = hash_get (pm->proxy_session_by_active_open_handle, handle); if (p == 0) { @@ -85,17 +87,14 @@ delete_proxy_session (session_t * s, int is_active_open) } else if (!pool_is_free_index (pm->sessions, p[0])) { + active_open_session = s; ps = pool_elt_at_index (pm->sessions, p[0]); if (ps->vpp_server_handle != ~0) server_session = session_get_from_handle (ps->vpp_server_handle); - else - server_session = 0; } } else { - server_session = s; - p = hash_get (pm->proxy_session_by_server_handle, handle); if (p == 0) { @@ -105,12 +104,11 @@ delete_proxy_session (session_t * s, int is_active_open) } else if (!pool_is_free_index (pm->sessions, p[0])) { + server_session = s; ps = pool_elt_at_index (pm->sessions, p[0]); if (ps->vpp_active_open_handle != ~0) active_open_session = session_get_from_handle (ps->vpp_active_open_handle); - else - active_open_session = 0; } } @@ -121,8 +119,6 @@ delete_proxy_session (session_t * s, int is_active_open) pool_put (pm->sessions, ps); } - clib_spinlock_unlock_if_init (&pm->sessions_lock); - if (active_open_session) { a->handle = session_handle (active_open_session); @@ -140,6 +136,52 @@ delete_proxy_session (session_t * s, int is_active_open) session_handle (server_session)); vnet_disconnect_session (a); } + + clib_spinlock_unlock_if_init (&pm->sessions_lock); +} + +static int +common_fifo_tuning_callback (session_t * s, svm_fifo_t * f, + session_ft_action_t act, u32 bytes) +{ + proxy_main_t *pm = &proxy_main; + + segment_manager_t *sm = segment_manager_get (f->segment_manager); + fifo_segment_t *fs = segment_manager_get_segment (sm, f->segment_index); + + u8 seg_usage = fifo_segment_get_mem_usage (fs); + u32 fifo_in_use = svm_fifo_max_dequeue_prod (f); + u32 fifo_size = svm_fifo_size (f); + u8 fifo_usage = fifo_in_use * 100 / fifo_size; + u8 update_size = 0; + + ASSERT (act < SESSION_FT_ACTION_N_ACTIONS); + + if (act == SESSION_FT_ACTION_ENQUEUED) + { + if (seg_usage < pm->low_watermark && fifo_usage > 50) + update_size = fifo_in_use; + else if (seg_usage < pm->high_watermark && fifo_usage > 80) + update_size = fifo_in_use; + + update_size = clib_min (update_size, sm->max_fifo_size - fifo_size); + if (update_size) + svm_fifo_set_size (f, fifo_size + update_size); + } + else /* dequeued */ + { + if (seg_usage > pm->high_watermark || fifo_usage < 20) + update_size = bytes; + else if (seg_usage > pm->low_watermark && fifo_usage < 50) + update_size = (bytes / 2); + + ASSERT (fifo_size >= 4096); + update_size = clib_min (update_size, fifo_size - 4096); + if (update_size) + svm_fifo_set_size (f, fifo_size - update_size); + } + + return 0; } static int @@ -194,7 +236,7 @@ proxy_rx_callback (session_t * s) proxy_session_t *ps; int proxy_index; uword *p; - svm_fifo_t *active_open_tx_fifo; + svm_fifo_t *ao_tx_fifo; ASSERT (s->thread_index == thread_index); @@ -204,20 +246,23 @@ proxy_rx_callback (session_t * s) if (PREDICT_TRUE (p != 0)) { clib_spinlock_unlock_if_init (&pm->sessions_lock); - active_open_tx_fifo = s->rx_fifo; + ao_tx_fifo = s->rx_fifo; /* * Send event for active open tx fifo */ - if (svm_fifo_set_event (active_open_tx_fifo)) + if (svm_fifo_set_event (ao_tx_fifo)) { - u32 ao_thread_index = active_open_tx_fifo->master_thread_index; - u32 ao_session_index = active_open_tx_fifo->master_session_index; + u32 ao_thread_index = ao_tx_fifo->master_thread_index; + u32 ao_session_index = ao_tx_fifo->master_session_index; if (session_send_io_evt_to_thread_custom (&ao_session_index, ao_thread_index, SESSION_IO_EVT_TX)) clib_warning ("failed to enqueue tx evt"); } + + if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS) + svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); } else { @@ -232,6 +277,7 @@ proxy_rx_callback (session_t * s) if (PREDICT_FALSE (max_dequeue == 0)) return 0; + max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue); actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ , max_dequeue, pm->rx_buf[thread_index]); @@ -239,7 +285,6 @@ proxy_rx_callback (session_t * s) clib_memset (a, 0, sizeof (*a)); - clib_spinlock_lock_if_init (&pm->sessions_lock); pool_get (pm->sessions, ps); clib_memset (ps, 0, sizeof (*ps)); ps->server_rx_fifo = rx_fifo; @@ -262,13 +307,58 @@ proxy_rx_callback (session_t * s) return 0; } +static int +proxy_tx_callback (session_t * proxy_s) +{ + proxy_main_t *pm = &proxy_main; + transport_connection_t *tc; + session_handle_t handle; + proxy_session_t *ps; + session_t *ao_s; + u32 min_free; + uword *p; + + min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10); + if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free) + { + svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + return 0; + } + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + handle = session_handle (proxy_s); + p = hash_get (pm->proxy_session_by_server_handle, handle); + if (!p) + return 0; + + if (pool_is_free_index (pm->sessions, p[0])) + return 0; + + ps = pool_elt_at_index (pm->sessions, p[0]); + if (ps->vpp_active_open_handle == ~0) + return 0; + + ao_s = session_get_from_handle (ps->vpp_active_open_handle); + + /* Force ack on active open side to update rcv wnd */ + tc = session_get_transport (ao_s); + tcp_send_ack ((tcp_connection_t *) tc); + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + return 0; +} + static session_cb_vft_t proxy_session_cb_vft = { .session_accept_callback = proxy_accept_callback, .session_disconnect_callback = proxy_disconnect_callback, .session_connected_callback = proxy_connected_callback, .add_segment_callback = proxy_add_segment_callback, .builtin_app_rx_callback = proxy_rx_callback, - .session_reset_callback = proxy_reset_callback + .builtin_app_tx_callback = proxy_tx_callback, + .session_reset_callback = proxy_reset_callback, + .fifo_tuning_callback = common_fifo_tuning_callback }; static int @@ -311,6 +401,9 @@ active_open_connected_callback (u32 app_index, u32 opaque, s->tx_fifo->refcnt++; s->rx_fifo->refcnt++; + svm_fifo_init_ooo_lookup (s->tx_fifo, 1 /* deq ooo */ ); + svm_fifo_init_ooo_lookup (s->rx_fifo, 0 /* enq ooo */ ); + hash_set (pm->proxy_session_by_active_open_handle, ps->vpp_active_open_handle, opaque); @@ -363,6 +456,52 @@ active_open_rx_callback (session_t * s) SESSION_IO_EVT_TX); } + if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS) + svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + + return 0; +} + +static int +active_open_tx_callback (session_t * ao_s) +{ + proxy_main_t *pm = &proxy_main; + transport_connection_t *tc; + session_handle_t handle; + proxy_session_t *ps; + session_t *proxy_s; + u32 min_free; + uword *p; + + min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10); + if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free) + { + svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + return 0; + } + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + handle = session_handle (ao_s); + p = hash_get (pm->proxy_session_by_active_open_handle, handle); + if (!p) + return 0; + + if (pool_is_free_index (pm->sessions, p[0])) + return 0; + + ps = pool_elt_at_index (pm->sessions, p[0]); + if (ps->vpp_server_handle == ~0) + return 0; + + proxy_s = session_get_from_handle (ps->vpp_server_handle); + + /* Force ack on proxy side to update rcv wnd */ + tc = session_get_transport (proxy_s); + tcp_send_ack ((tcp_connection_t *) tc); + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + return 0; } @@ -372,26 +511,12 @@ static session_cb_vft_t active_open_clients = { .session_connected_callback = active_open_connected_callback, .session_accept_callback = active_open_create_callback, .session_disconnect_callback = active_open_disconnect_callback, - .builtin_app_rx_callback = active_open_rx_callback + .builtin_app_rx_callback = active_open_rx_callback, + .builtin_app_tx_callback = active_open_tx_callback, + .fifo_tuning_callback = common_fifo_tuning_callback }; /* *INDENT-ON* */ - -static void -create_api_loopbacks (vlib_main_t * vm) -{ - proxy_main_t *pm = &proxy_main; - api_main_t *am = vlibapi_get_main (); - vl_shmem_hdr_t *shmem_hdr; - - shmem_hdr = am->shmem_hdr; - pm->vl_input_queue = shmem_hdr->vl_input_queue; - pm->server_client_index = - vl_api_memclnt_create_internal ("proxy_server", pm->vl_input_queue); - pm->active_open_client_index = - vl_api_memclnt_create_internal ("proxy_active_open", pm->vl_input_queue); -} - static int proxy_server_attach () { @@ -405,12 +530,16 @@ proxy_server_attach () if (pm->private_segment_size) segment_size = pm->private_segment_size; + a->name = format (0, "proxy-server"); a->api_client_index = pm->server_client_index; a->session_cb_vft = &proxy_session_cb_vft; a->options = options; a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size; a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size; a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size; + a->options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size; + a->options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark; + a->options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark; a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count; a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = pm->prealloc_fifos ? pm->prealloc_fifos : 0; @@ -424,6 +553,7 @@ proxy_server_attach () } pm->server_app_index = a->app_index; + vec_free (a->name); return 0; } @@ -432,18 +562,22 @@ active_open_attach (void) { proxy_main_t *pm = &proxy_main; vnet_app_attach_args_t _a, *a = &_a; - u64 options[16]; + u64 options[APP_OPTIONS_N_OPTIONS]; clib_memset (a, 0, sizeof (*a)); clib_memset (options, 0, sizeof (options)); a->api_client_index = pm->active_open_client_index; a->session_cb_vft = &active_open_clients; + a->name = format (0, "proxy-active-open"); options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678; options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20; options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size; options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size; + options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size; + options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark; + options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark; options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count; options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = pm->prealloc_fifos ? pm->prealloc_fifos : 0; @@ -458,6 +592,8 @@ active_open_attach (void) pm->active_open_app_index = a->app_index; + vec_free (a->name); + return 0; } @@ -480,9 +616,6 @@ proxy_server_create (vlib_main_t * vm) u32 num_threads; int i; - if (pm->server_client_index == (u32) ~ 0) - create_api_loopbacks (vm); - num_threads = 1 /* main thread */ + vtm->n_threads; vec_validate (proxy_main.server_event_queue, num_threads - 1); vec_validate (proxy_main.active_open_event_queue, num_threads - 1); @@ -526,20 +659,32 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, proxy_main_t *pm = &proxy_main; char *default_server_uri = "tcp://0.0.0.0/23"; char *default_client_uri = "tcp://6.0.2.2/23"; - int rv; - u64 tmp; + int rv, tmp32; + u64 tmp64; pm->fifo_size = 64 << 10; + pm->max_fifo_size = 128 << 20; + pm->high_watermark = 80; + pm->low_watermark = 50; pm->rcv_buffer_size = 1024; pm->prealloc_fifos = 0; pm->private_segment_count = 0; pm->private_segment_size = 0; pm->server_uri = 0; + pm->client_uri = 0; while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) { - if (unformat (input, "fifo-size %d", &pm->fifo_size)) - pm->fifo_size <<= 10; + if (unformat (input, "fifo-size %U", + unformat_memory_size, &pm->fifo_size)) + ; + else if (unformat (input, "max-fifo-size %U", + unformat_memory_size, &pm->max_fifo_size)) + ; + else if (unformat (input, "high-watermark %d", &tmp32)) + pm->high_watermark = (u8) tmp32; + else if (unformat (input, "low-watermark %d", &tmp32)) + pm->low_watermark = (u8) tmp32; else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size)) ; else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos)) @@ -548,17 +693,17 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, &pm->private_segment_count)) ; else if (unformat (input, "private-segment-size %U", - unformat_memory_size, &tmp)) + unformat_memory_size, &tmp64)) { - if (tmp >= 0x100000000ULL) + if (tmp64 >= 0x100000000ULL) return clib_error_return - (0, "private segment size %lld (%llu) too large", tmp, tmp); - pm->private_segment_size = tmp; + (0, "private segment size %lld (%llu) too large", tmp64, tmp64); + pm->private_segment_size = tmp64; } else if (unformat (input, "server-uri %s", &pm->server_uri)) - ; + vec_add1 (pm->server_uri, 0); else if (unformat (input, "client-uri %s", &pm->client_uri)) - pm->client_uri = format (0, "%s%c", pm->client_uri, 0); + vec_add1 (pm->client_uri, 0); else return clib_error_return (0, "unknown input `%U'", format_unformat_error, input); @@ -596,9 +741,10 @@ VLIB_CLI_COMMAND (proxy_create_command, static) = { .path = "test proxy server", .short_help = "test proxy server [server-uri ]" - "[client-uri ][fifo-size ][rcv-buf-size ]" - "[prealloc-fifos ][private-segment-size ]" - "[private-segment-count ]", + "[client-uri ][fifo-size [k|m]]" + "[max-fifo-size [k|m]][high-watermark ]" + "[low-watermark ][rcv-buf-size ][prealloc-fifos ]" + "[private-segment-size ][private-segment-count ]", .function = proxy_server_create_command_fn, }; /* *INDENT-ON* */