2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <hs_apps/proxy.h>
21 #include <vnet/tcp/tcp.h>
23 proxy_main_t proxy_main;
32 } proxy_connect_args_t;
35 proxy_cb_fn (void *data, u32 data_len)
37 proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
38 vnet_connect_args_t a;
40 memset (&a, 0, sizeof (a));
41 a.api_context = pa->api_context;
42 a.app_index = pa->app_index;
44 vnet_connect_uri (&a);
48 proxy_call_main_thread (vnet_connect_args_t * a)
50 if (vlib_get_thread_index () == 0)
56 proxy_connect_args_t args;
57 args.api_context = a->api_context;
58 args.app_index = a->app_index;
59 clib_memcpy (args.uri, a->uri, vec_len (a->uri));
60 vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
65 delete_proxy_session (session_t * s, int is_active_open)
67 proxy_main_t *pm = &proxy_main;
68 proxy_session_t *ps = 0;
69 vnet_disconnect_args_t _a, *a = &_a;
70 session_t *active_open_session = 0;
71 session_t *server_session = 0;
75 clib_spinlock_lock_if_init (&pm->sessions_lock);
77 handle = session_handle (s);
81 p = hash_get (pm->proxy_session_by_active_open_handle, handle);
84 clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
85 is_active_open ? "active open" : "server",
88 else if (!pool_is_free_index (pm->sessions, p[0]))
90 active_open_session = s;
91 ps = pool_elt_at_index (pm->sessions, p[0]);
92 if (ps->vpp_server_handle != ~0)
93 server_session = session_get_from_handle (ps->vpp_server_handle);
98 p = hash_get (pm->proxy_session_by_server_handle, handle);
101 clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
102 is_active_open ? "active open" : "server",
105 else if (!pool_is_free_index (pm->sessions, p[0]))
108 ps = pool_elt_at_index (pm->sessions, p[0]);
109 if (ps->vpp_active_open_handle != ~0)
110 active_open_session = session_get_from_handle
111 (ps->vpp_active_open_handle);
118 clib_memset (ps, 0xFE, sizeof (*ps));
119 pool_put (pm->sessions, ps);
122 if (active_open_session)
124 a->handle = session_handle (active_open_session);
125 a->app_index = pm->active_open_app_index;
126 hash_unset (pm->proxy_session_by_active_open_handle,
127 session_handle (active_open_session));
128 vnet_disconnect_session (a);
133 a->handle = session_handle (server_session);
134 a->app_index = pm->server_app_index;
135 hash_unset (pm->proxy_session_by_server_handle,
136 session_handle (server_session));
137 vnet_disconnect_session (a);
140 clib_spinlock_unlock_if_init (&pm->sessions_lock);
144 common_fifo_tuning_callback (session_t * s, svm_fifo_t * f,
145 session_ft_action_t act, u32 bytes)
147 proxy_main_t *pm = &proxy_main;
149 segment_manager_t *sm = segment_manager_get (f->segment_manager);
150 fifo_segment_t *fs = segment_manager_get_segment (sm, f->segment_index);
152 u8 seg_usage = fifo_segment_get_mem_usage (fs);
153 u32 fifo_in_use = svm_fifo_max_dequeue_prod (f);
154 u32 fifo_size = svm_fifo_size (f);
155 u8 fifo_usage = fifo_in_use * 100 / fifo_size;
158 ASSERT (act < SESSION_FT_ACTION_N_ACTIONS);
160 if (act == SESSION_FT_ACTION_ENQUEUED)
162 if (seg_usage < pm->low_watermark && fifo_usage > 50)
163 update_size = fifo_in_use;
164 else if (seg_usage < pm->high_watermark && fifo_usage > 80)
165 update_size = fifo_in_use;
167 update_size = clib_min (update_size, sm->max_fifo_size - fifo_size);
169 svm_fifo_set_size (f, fifo_size + update_size);
173 if (seg_usage > pm->high_watermark || fifo_usage < 20)
175 else if (seg_usage > pm->low_watermark && fifo_usage < 50)
176 update_size = (bytes / 2);
178 ASSERT (fifo_size >= 4096);
179 update_size = clib_min (update_size, fifo_size - 4096);
181 svm_fifo_set_size (f, fifo_size - update_size);
188 proxy_accept_callback (session_t * s)
190 proxy_main_t *pm = &proxy_main;
192 s->session_state = SESSION_STATE_READY;
194 clib_spinlock_lock_if_init (&pm->sessions_lock);
200 proxy_disconnect_callback (session_t * s)
202 delete_proxy_session (s, 0 /* is_active_open */ );
206 proxy_reset_callback (session_t * s)
208 clib_warning ("Reset session %U", format_session, s, 2);
209 delete_proxy_session (s, 0 /* is_active_open */ );
213 proxy_connected_callback (u32 app_index, u32 api_context,
214 session_t * s, u8 is_fail)
216 clib_warning ("called...");
221 proxy_add_segment_callback (u32 client_index, u64 segment_handle)
223 clib_warning ("called...");
228 proxy_rx_callback (session_t * s)
231 int actual_transfer __attribute__ ((unused));
232 svm_fifo_t *tx_fifo, *rx_fifo;
233 proxy_main_t *pm = &proxy_main;
234 u32 thread_index = vlib_get_thread_index ();
235 vnet_connect_args_t _a, *a = &_a;
239 svm_fifo_t *ao_tx_fifo;
241 ASSERT (s->thread_index == thread_index);
243 clib_spinlock_lock_if_init (&pm->sessions_lock);
244 p = hash_get (pm->proxy_session_by_server_handle, session_handle (s));
246 if (PREDICT_TRUE (p != 0))
248 clib_spinlock_unlock_if_init (&pm->sessions_lock);
249 ao_tx_fifo = s->rx_fifo;
252 * Send event for active open tx fifo
254 if (svm_fifo_set_event (ao_tx_fifo))
256 u32 ao_thread_index = ao_tx_fifo->master_thread_index;
257 u32 ao_session_index = ao_tx_fifo->master_session_index;
258 if (session_send_io_evt_to_thread_custom (&ao_session_index,
261 clib_warning ("failed to enqueue tx evt");
264 if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
265 svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
269 rx_fifo = s->rx_fifo;
270 tx_fifo = s->tx_fifo;
272 ASSERT (rx_fifo->master_thread_index == thread_index);
273 ASSERT (tx_fifo->master_thread_index == thread_index);
275 max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
277 if (PREDICT_FALSE (max_dequeue == 0))
280 max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
281 actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
282 max_dequeue, pm->rx_buf[thread_index]);
284 /* $$$ your message in this space: parse url, etc. */
286 clib_memset (a, 0, sizeof (*a));
288 pool_get (pm->sessions, ps);
289 clib_memset (ps, 0, sizeof (*ps));
290 ps->server_rx_fifo = rx_fifo;
291 ps->server_tx_fifo = tx_fifo;
292 ps->vpp_server_handle = session_handle (s);
294 proxy_index = ps - pm->sessions;
296 hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
299 clib_spinlock_unlock_if_init (&pm->sessions_lock);
301 a->uri = (char *) pm->client_uri;
302 a->api_context = proxy_index;
303 a->app_index = pm->active_open_app_index;
304 proxy_call_main_thread (a);
311 proxy_tx_callback (session_t * proxy_s)
313 proxy_main_t *pm = &proxy_main;
314 transport_connection_t *tc;
315 session_handle_t handle;
321 min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
322 if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free)
324 svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
328 clib_spinlock_lock_if_init (&pm->sessions_lock);
330 handle = session_handle (proxy_s);
331 p = hash_get (pm->proxy_session_by_server_handle, handle);
335 if (pool_is_free_index (pm->sessions, p[0]))
338 ps = pool_elt_at_index (pm->sessions, p[0]);
339 if (ps->vpp_active_open_handle == ~0)
342 ao_s = session_get_from_handle (ps->vpp_active_open_handle);
344 /* Force ack on active open side to update rcv wnd */
345 tc = session_get_transport (ao_s);
346 tcp_send_ack ((tcp_connection_t *) tc);
348 clib_spinlock_unlock_if_init (&pm->sessions_lock);
353 static session_cb_vft_t proxy_session_cb_vft = {
354 .session_accept_callback = proxy_accept_callback,
355 .session_disconnect_callback = proxy_disconnect_callback,
356 .session_connected_callback = proxy_connected_callback,
357 .add_segment_callback = proxy_add_segment_callback,
358 .builtin_app_rx_callback = proxy_rx_callback,
359 .builtin_app_tx_callback = proxy_tx_callback,
360 .session_reset_callback = proxy_reset_callback,
361 .fifo_tuning_callback = common_fifo_tuning_callback
365 active_open_connected_callback (u32 app_index, u32 opaque,
366 session_t * s, u8 is_fail)
368 proxy_main_t *pm = &proxy_main;
370 u8 thread_index = vlib_get_thread_index ();
374 clib_warning ("connection %d failed!", opaque);
379 * Setup proxy session handle.
381 clib_spinlock_lock_if_init (&pm->sessions_lock);
383 ps = pool_elt_at_index (pm->sessions, opaque);
384 ps->vpp_active_open_handle = session_handle (s);
386 s->tx_fifo = ps->server_rx_fifo;
387 s->rx_fifo = ps->server_tx_fifo;
390 * Reset the active-open tx-fifo master indices so the active-open session
391 * will receive data, etc.
393 s->tx_fifo->master_session_index = s->session_index;
394 s->tx_fifo->master_thread_index = s->thread_index;
397 * Account for the active-open session's use of the fifos
398 * so they won't disappear until the last session which uses
401 s->tx_fifo->refcnt++;
402 s->rx_fifo->refcnt++;
404 svm_fifo_init_ooo_lookup (s->tx_fifo, 1 /* deq ooo */ );
405 svm_fifo_init_ooo_lookup (s->rx_fifo, 0 /* enq ooo */ );
407 hash_set (pm->proxy_session_by_active_open_handle,
408 ps->vpp_active_open_handle, opaque);
410 clib_spinlock_unlock_if_init (&pm->sessions_lock);
413 * Send event for active open tx fifo
415 ASSERT (s->thread_index == thread_index);
416 if (svm_fifo_set_event (s->tx_fifo))
417 session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
423 active_open_reset_callback (session_t * s)
425 delete_proxy_session (s, 1 /* is_active_open */ );
429 active_open_create_callback (session_t * s)
435 active_open_disconnect_callback (session_t * s)
437 delete_proxy_session (s, 1 /* is_active_open */ );
441 active_open_rx_callback (session_t * s)
443 svm_fifo_t *proxy_tx_fifo;
445 proxy_tx_fifo = s->rx_fifo;
448 * Send event for server tx fifo
450 if (svm_fifo_set_event (proxy_tx_fifo))
452 u8 thread_index = proxy_tx_fifo->master_thread_index;
453 u32 session_index = proxy_tx_fifo->master_session_index;
454 return session_send_io_evt_to_thread_custom (&session_index,
459 if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS)
460 svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
466 active_open_tx_callback (session_t * ao_s)
468 proxy_main_t *pm = &proxy_main;
469 transport_connection_t *tc;
470 session_handle_t handle;
476 min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
477 if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free)
479 svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
483 clib_spinlock_lock_if_init (&pm->sessions_lock);
485 handle = session_handle (ao_s);
486 p = hash_get (pm->proxy_session_by_active_open_handle, handle);
490 if (pool_is_free_index (pm->sessions, p[0]))
493 ps = pool_elt_at_index (pm->sessions, p[0]);
494 if (ps->vpp_server_handle == ~0)
497 proxy_s = session_get_from_handle (ps->vpp_server_handle);
499 /* Force ack on proxy side to update rcv wnd */
500 tc = session_get_transport (proxy_s);
501 tcp_send_ack ((tcp_connection_t *) tc);
503 clib_spinlock_unlock_if_init (&pm->sessions_lock);
509 static session_cb_vft_t active_open_clients = {
510 .session_reset_callback = active_open_reset_callback,
511 .session_connected_callback = active_open_connected_callback,
512 .session_accept_callback = active_open_create_callback,
513 .session_disconnect_callback = active_open_disconnect_callback,
514 .builtin_app_rx_callback = active_open_rx_callback,
515 .builtin_app_tx_callback = active_open_tx_callback,
516 .fifo_tuning_callback = common_fifo_tuning_callback
521 proxy_server_attach ()
523 proxy_main_t *pm = &proxy_main;
524 u64 options[APP_OPTIONS_N_OPTIONS];
525 vnet_app_attach_args_t _a, *a = &_a;
526 u32 segment_size = 512 << 20;
528 clib_memset (a, 0, sizeof (*a));
529 clib_memset (options, 0, sizeof (options));
531 if (pm->private_segment_size)
532 segment_size = pm->private_segment_size;
533 a->name = format (0, "proxy-server");
534 a->api_client_index = pm->server_client_index;
535 a->session_cb_vft = &proxy_session_cb_vft;
536 a->options = options;
537 a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
538 a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
539 a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
540 a->options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
541 a->options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
542 a->options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
543 a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
544 a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
545 pm->prealloc_fifos ? pm->prealloc_fifos : 0;
547 a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
549 if (vnet_application_attach (a))
551 clib_warning ("failed to attach server");
554 pm->server_app_index = a->app_index;
561 active_open_attach (void)
563 proxy_main_t *pm = &proxy_main;
564 vnet_app_attach_args_t _a, *a = &_a;
565 u64 options[APP_OPTIONS_N_OPTIONS];
567 clib_memset (a, 0, sizeof (*a));
568 clib_memset (options, 0, sizeof (options));
570 a->api_client_index = pm->active_open_client_index;
571 a->session_cb_vft = &active_open_clients;
572 a->name = format (0, "proxy-active-open");
574 options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
575 options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
576 options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
577 options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
578 options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
579 options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
580 options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
581 options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
582 options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
583 pm->prealloc_fifos ? pm->prealloc_fifos : 0;
585 options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
586 | APP_OPTIONS_FLAGS_IS_PROXY;
588 a->options = options;
590 if (vnet_application_attach (a))
593 pm->active_open_app_index = a->app_index;
601 proxy_server_listen ()
603 proxy_main_t *pm = &proxy_main;
604 vnet_listen_args_t _a, *a = &_a;
605 clib_memset (a, 0, sizeof (*a));
606 a->app_index = pm->server_app_index;
607 a->uri = (char *) pm->server_uri;
608 return vnet_bind_uri (a);
612 proxy_server_create (vlib_main_t * vm)
614 proxy_main_t *pm = &proxy_main;
615 vlib_thread_main_t *vtm = vlib_get_thread_main ();
619 num_threads = 1 /* main thread */ + vtm->n_threads;
620 vec_validate (proxy_main.server_event_queue, num_threads - 1);
621 vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
622 vec_validate (pm->rx_buf, num_threads - 1);
624 for (i = 0; i < num_threads; i++)
625 vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
627 if (proxy_server_attach ())
629 clib_warning ("failed to attach server app");
632 if (proxy_server_listen ())
634 clib_warning ("failed to start listening");
637 if (active_open_attach ())
639 clib_warning ("failed to attach active open app");
643 for (i = 0; i < num_threads; i++)
645 pm->active_open_event_queue[i] = session_main_get_vpp_event_queue (i);
647 ASSERT (pm->active_open_event_queue[i]);
649 pm->server_event_queue[i] = session_main_get_vpp_event_queue (i);
655 static clib_error_t *
656 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
657 vlib_cli_command_t * cmd)
659 proxy_main_t *pm = &proxy_main;
660 char *default_server_uri = "tcp://0.0.0.0/23";
661 char *default_client_uri = "tcp://6.0.2.2/23";
665 pm->fifo_size = 64 << 10;
666 pm->max_fifo_size = 128 << 20;
667 pm->high_watermark = 80;
668 pm->low_watermark = 50;
669 pm->rcv_buffer_size = 1024;
670 pm->prealloc_fifos = 0;
671 pm->private_segment_count = 0;
672 pm->private_segment_size = 0;
676 while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
678 if (unformat (input, "fifo-size %U",
679 unformat_memory_size, &pm->fifo_size))
681 else if (unformat (input, "max-fifo-size %U",
682 unformat_memory_size, &pm->max_fifo_size))
684 else if (unformat (input, "high-watermark %d", &tmp32))
685 pm->high_watermark = (u8) tmp32;
686 else if (unformat (input, "low-watermark %d", &tmp32))
687 pm->low_watermark = (u8) tmp32;
688 else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
690 else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
692 else if (unformat (input, "private-segment-count %d",
693 &pm->private_segment_count))
695 else if (unformat (input, "private-segment-size %U",
696 unformat_memory_size, &tmp64))
698 if (tmp64 >= 0x100000000ULL)
699 return clib_error_return
700 (0, "private segment size %lld (%llu) too large", tmp64, tmp64);
701 pm->private_segment_size = tmp64;
703 else if (unformat (input, "server-uri %s", &pm->server_uri))
704 vec_add1 (pm->server_uri, 0);
705 else if (unformat (input, "client-uri %s", &pm->client_uri))
706 vec_add1 (pm->client_uri, 0);
708 return clib_error_return (0, "unknown input `%U'",
709 format_unformat_error, input);
714 clib_warning ("No server-uri provided, Using default: %s",
716 pm->server_uri = format (0, "%s%c", default_server_uri, 0);
720 clib_warning ("No client-uri provided, Using default: %s",
722 pm->client_uri = format (0, "%s%c", default_client_uri, 0);
725 vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
727 rv = proxy_server_create (vm);
733 return clib_error_return (0, "server_create returned %d", rv);
740 VLIB_CLI_COMMAND (proxy_create_command, static) =
742 .path = "test proxy server",
743 .short_help = "test proxy server [server-uri <tcp://ip/port>]"
744 "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
745 "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
746 "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
747 "[private-segment-size <mem>][private-segment-count <nn>]",
748 .function = proxy_server_create_command_fn,
753 proxy_main_init (vlib_main_t * vm)
755 proxy_main_t *pm = &proxy_main;
756 pm->server_client_index = ~0;
757 pm->active_open_client_index = ~0;
758 pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
759 pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
764 VLIB_INIT_FUNCTION (proxy_main_init);
767 * fd.io coding-style-patch-verification: ON
770 * eval: (c-set-style "gnu")