a520110dce81a4eb1850eb7bbf0c9237d83373a6
[vpp.git] / src / plugins / hs_apps / proxy.c
1 /*
2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <hs_apps/proxy.h>
21 #include <vnet/tcp/tcp.h>
22
23 proxy_main_t proxy_main;
24
25 #define TCP_MSS 1460
26
27 typedef struct
28 {
29   char uri[128];
30   u32 app_index;
31   u32 api_context;
32 } proxy_connect_args_t;
33
34 static void
35 proxy_cb_fn (void *data, u32 data_len)
36 {
37   proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
38   vnet_connect_args_t a;
39
40   memset (&a, 0, sizeof (a));
41   a.api_context = pa->api_context;
42   a.app_index = pa->app_index;
43   a.uri = pa->uri;
44   vnet_connect_uri (&a);
45 }
46
47 static void
48 proxy_call_main_thread (vnet_connect_args_t * a)
49 {
50   if (vlib_get_thread_index () == 0)
51     {
52       vnet_connect_uri (a);
53     }
54   else
55     {
56       proxy_connect_args_t args;
57       args.api_context = a->api_context;
58       args.app_index = a->app_index;
59       clib_memcpy (args.uri, a->uri, vec_len (a->uri));
60       vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
61     }
62 }
63
64 static void
65 delete_proxy_session (session_t * s, int is_active_open)
66 {
67   proxy_main_t *pm = &proxy_main;
68   proxy_session_t *ps = 0;
69   vnet_disconnect_args_t _a, *a = &_a;
70   session_t *active_open_session = 0;
71   session_t *server_session = 0;
72   uword *p;
73   u64 handle;
74
75   clib_spinlock_lock_if_init (&pm->sessions_lock);
76
77   handle = session_handle (s);
78
79   if (is_active_open)
80     {
81       p = hash_get (pm->proxy_session_by_active_open_handle, handle);
82       if (p == 0)
83         {
84           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
85                         is_active_open ? "active open" : "server",
86                         handle, handle);
87         }
88       else if (!pool_is_free_index (pm->sessions, p[0]))
89         {
90           active_open_session = s;
91           ps = pool_elt_at_index (pm->sessions, p[0]);
92           if (ps->vpp_server_handle != ~0)
93             server_session = session_get_from_handle (ps->vpp_server_handle);
94         }
95     }
96   else
97     {
98       p = hash_get (pm->proxy_session_by_server_handle, handle);
99       if (p == 0)
100         {
101           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
102                         is_active_open ? "active open" : "server",
103                         handle, handle);
104         }
105       else if (!pool_is_free_index (pm->sessions, p[0]))
106         {
107           server_session = s;
108           ps = pool_elt_at_index (pm->sessions, p[0]);
109           if (ps->vpp_active_open_handle != ~0)
110             active_open_session = session_get_from_handle
111               (ps->vpp_active_open_handle);
112         }
113     }
114
115   if (ps)
116     {
117       if (CLIB_DEBUG > 0)
118         clib_memset (ps, 0xFE, sizeof (*ps));
119       pool_put (pm->sessions, ps);
120     }
121
122   if (active_open_session)
123     {
124       a->handle = session_handle (active_open_session);
125       a->app_index = pm->active_open_app_index;
126       hash_unset (pm->proxy_session_by_active_open_handle,
127                   session_handle (active_open_session));
128       vnet_disconnect_session (a);
129     }
130
131   if (server_session)
132     {
133       a->handle = session_handle (server_session);
134       a->app_index = pm->server_app_index;
135       hash_unset (pm->proxy_session_by_server_handle,
136                   session_handle (server_session));
137       vnet_disconnect_session (a);
138     }
139
140   clib_spinlock_unlock_if_init (&pm->sessions_lock);
141 }
142
143 static int
144 proxy_accept_callback (session_t * s)
145 {
146   proxy_main_t *pm = &proxy_main;
147
148   s->session_state = SESSION_STATE_READY;
149
150   clib_spinlock_lock_if_init (&pm->sessions_lock);
151
152   return 0;
153 }
154
155 static void
156 proxy_disconnect_callback (session_t * s)
157 {
158   delete_proxy_session (s, 0 /* is_active_open */ );
159 }
160
161 static void
162 proxy_reset_callback (session_t * s)
163 {
164   clib_warning ("Reset session %U", format_session, s, 2);
165   delete_proxy_session (s, 0 /* is_active_open */ );
166 }
167
168 static int
169 proxy_connected_callback (u32 app_index, u32 api_context,
170                           session_t * s, u8 is_fail)
171 {
172   clib_warning ("called...");
173   return -1;
174 }
175
176 static int
177 proxy_add_segment_callback (u32 client_index, u64 segment_handle)
178 {
179   clib_warning ("called...");
180   return -1;
181 }
182
183 static int
184 proxy_rx_callback (session_t * s)
185 {
186   u32 max_dequeue;
187   int actual_transfer __attribute__ ((unused));
188   svm_fifo_t *tx_fifo, *rx_fifo;
189   proxy_main_t *pm = &proxy_main;
190   u32 thread_index = vlib_get_thread_index ();
191   vnet_connect_args_t _a, *a = &_a;
192   proxy_session_t *ps;
193   int proxy_index;
194   uword *p;
195   svm_fifo_t *ao_tx_fifo;
196
197   ASSERT (s->thread_index == thread_index);
198
199   clib_spinlock_lock_if_init (&pm->sessions_lock);
200   p = hash_get (pm->proxy_session_by_server_handle, session_handle (s));
201
202   if (PREDICT_TRUE (p != 0))
203     {
204       clib_spinlock_unlock_if_init (&pm->sessions_lock);
205       ao_tx_fifo = s->rx_fifo;
206
207       /*
208        * Send event for active open tx fifo
209        */
210       if (svm_fifo_set_event (ao_tx_fifo))
211         {
212           u32 ao_thread_index = ao_tx_fifo->master_thread_index;
213           u32 ao_session_index = ao_tx_fifo->master_session_index;
214           if (session_send_io_evt_to_thread_custom (&ao_session_index,
215                                                     ao_thread_index,
216                                                     SESSION_IO_EVT_TX))
217             clib_warning ("failed to enqueue tx evt");
218         }
219
220       if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
221         svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
222     }
223   else
224     {
225       rx_fifo = s->rx_fifo;
226       tx_fifo = s->tx_fifo;
227
228       ASSERT (rx_fifo->master_thread_index == thread_index);
229       ASSERT (tx_fifo->master_thread_index == thread_index);
230
231       max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
232
233       if (PREDICT_FALSE (max_dequeue == 0))
234         return 0;
235
236       max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
237       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
238                                        max_dequeue, pm->rx_buf[thread_index]);
239
240       /* $$$ your message in this space: parse url, etc. */
241
242       clib_memset (a, 0, sizeof (*a));
243
244       pool_get (pm->sessions, ps);
245       clib_memset (ps, 0, sizeof (*ps));
246       ps->server_rx_fifo = rx_fifo;
247       ps->server_tx_fifo = tx_fifo;
248       ps->vpp_server_handle = session_handle (s);
249
250       proxy_index = ps - pm->sessions;
251
252       hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
253                 proxy_index);
254
255       clib_spinlock_unlock_if_init (&pm->sessions_lock);
256
257       a->uri = (char *) pm->client_uri;
258       a->api_context = proxy_index;
259       a->app_index = pm->active_open_app_index;
260       proxy_call_main_thread (a);
261     }
262
263   return 0;
264 }
265
266 static int
267 proxy_tx_callback (session_t * proxy_s)
268 {
269   proxy_main_t *pm = &proxy_main;
270   transport_connection_t *tc;
271   session_handle_t handle;
272   proxy_session_t *ps;
273   session_t *ao_s;
274   u32 min_free;
275   uword *p;
276
277   min_free = clib_min (proxy_s->tx_fifo->nitems >> 3, 128 << 10);
278   if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free)
279     {
280       svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
281       return 0;
282     }
283
284   clib_spinlock_lock_if_init (&pm->sessions_lock);
285
286   handle = session_handle (proxy_s);
287   p = hash_get (pm->proxy_session_by_server_handle, handle);
288   if (!p)
289     return 0;
290
291   if (pool_is_free_index (pm->sessions, p[0]))
292     return 0;
293
294   ps = pool_elt_at_index (pm->sessions, p[0]);
295   if (ps->vpp_active_open_handle == ~0)
296     return 0;
297
298   ao_s = session_get_from_handle (ps->vpp_active_open_handle);
299
300   /* Force ack on active open side to update rcv wnd */
301   tc = session_get_transport (ao_s);
302   tcp_send_ack ((tcp_connection_t *) tc);
303
304   clib_spinlock_unlock_if_init (&pm->sessions_lock);
305
306   return 0;
307 }
308
309 static session_cb_vft_t proxy_session_cb_vft = {
310   .session_accept_callback = proxy_accept_callback,
311   .session_disconnect_callback = proxy_disconnect_callback,
312   .session_connected_callback = proxy_connected_callback,
313   .add_segment_callback = proxy_add_segment_callback,
314   .builtin_app_rx_callback = proxy_rx_callback,
315   .builtin_app_tx_callback = proxy_tx_callback,
316   .session_reset_callback = proxy_reset_callback
317 };
318
319 static int
320 active_open_connected_callback (u32 app_index, u32 opaque,
321                                 session_t * s, u8 is_fail)
322 {
323   proxy_main_t *pm = &proxy_main;
324   proxy_session_t *ps;
325   u8 thread_index = vlib_get_thread_index ();
326
327   if (is_fail)
328     {
329       clib_warning ("connection %d failed!", opaque);
330       return 0;
331     }
332
333   /*
334    * Setup proxy session handle.
335    */
336   clib_spinlock_lock_if_init (&pm->sessions_lock);
337
338   ps = pool_elt_at_index (pm->sessions, opaque);
339   ps->vpp_active_open_handle = session_handle (s);
340
341   s->tx_fifo = ps->server_rx_fifo;
342   s->rx_fifo = ps->server_tx_fifo;
343
344   /*
345    * Reset the active-open tx-fifo master indices so the active-open session
346    * will receive data, etc.
347    */
348   s->tx_fifo->master_session_index = s->session_index;
349   s->tx_fifo->master_thread_index = s->thread_index;
350
351   /*
352    * Account for the active-open session's use of the fifos
353    * so they won't disappear until the last session which uses
354    * them disappears
355    */
356   s->tx_fifo->refcnt++;
357   s->rx_fifo->refcnt++;
358
359   hash_set (pm->proxy_session_by_active_open_handle,
360             ps->vpp_active_open_handle, opaque);
361
362   clib_spinlock_unlock_if_init (&pm->sessions_lock);
363
364   /*
365    * Send event for active open tx fifo
366    */
367   ASSERT (s->thread_index == thread_index);
368   if (svm_fifo_set_event (s->tx_fifo))
369     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
370
371   return 0;
372 }
373
374 static void
375 active_open_reset_callback (session_t * s)
376 {
377   delete_proxy_session (s, 1 /* is_active_open */ );
378 }
379
380 static int
381 active_open_create_callback (session_t * s)
382 {
383   return 0;
384 }
385
386 static void
387 active_open_disconnect_callback (session_t * s)
388 {
389   delete_proxy_session (s, 1 /* is_active_open */ );
390 }
391
392 static int
393 active_open_rx_callback (session_t * s)
394 {
395   svm_fifo_t *proxy_tx_fifo;
396
397   proxy_tx_fifo = s->rx_fifo;
398
399   /*
400    * Send event for server tx fifo
401    */
402   if (svm_fifo_set_event (proxy_tx_fifo))
403     {
404       u8 thread_index = proxy_tx_fifo->master_thread_index;
405       u32 session_index = proxy_tx_fifo->master_session_index;
406       return session_send_io_evt_to_thread_custom (&session_index,
407                                                    thread_index,
408                                                    SESSION_IO_EVT_TX);
409     }
410
411   if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS)
412     svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
413
414   return 0;
415 }
416
417 static int
418 active_open_tx_callback (session_t * ao_s)
419 {
420   proxy_main_t *pm = &proxy_main;
421   transport_connection_t *tc;
422   session_handle_t handle;
423   proxy_session_t *ps;
424   session_t *proxy_s;
425   u32 min_free;
426   uword *p;
427
428   min_free = clib_min (ao_s->tx_fifo->nitems >> 3, 128 << 10);
429   if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free)
430     {
431       svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
432       return 0;
433     }
434
435   clib_spinlock_lock_if_init (&pm->sessions_lock);
436
437   handle = session_handle (ao_s);
438   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
439   if (!p)
440     return 0;
441
442   if (pool_is_free_index (pm->sessions, p[0]))
443     return 0;
444
445   ps = pool_elt_at_index (pm->sessions, p[0]);
446   if (ps->vpp_server_handle == ~0)
447     return 0;
448
449   proxy_s = session_get_from_handle (ps->vpp_server_handle);
450
451   /* Force ack on proxy side to update rcv wnd */
452   tc = session_get_transport (proxy_s);
453   tcp_send_ack ((tcp_connection_t *) tc);
454
455   clib_spinlock_unlock_if_init (&pm->sessions_lock);
456
457   return 0;
458 }
459
460 /* *INDENT-OFF* */
461 static session_cb_vft_t active_open_clients = {
462   .session_reset_callback = active_open_reset_callback,
463   .session_connected_callback = active_open_connected_callback,
464   .session_accept_callback = active_open_create_callback,
465   .session_disconnect_callback = active_open_disconnect_callback,
466   .builtin_app_rx_callback = active_open_rx_callback,
467   .builtin_app_tx_callback = active_open_tx_callback,
468 };
469 /* *INDENT-ON* */
470
471 static int
472 proxy_server_attach ()
473 {
474   proxy_main_t *pm = &proxy_main;
475   u64 options[APP_OPTIONS_N_OPTIONS];
476   vnet_app_attach_args_t _a, *a = &_a;
477   u32 segment_size = 512 << 20;
478
479   clib_memset (a, 0, sizeof (*a));
480   clib_memset (options, 0, sizeof (options));
481
482   if (pm->private_segment_size)
483     segment_size = pm->private_segment_size;
484   a->name = format (0, "proxy-server");
485   a->api_client_index = pm->server_client_index;
486   a->session_cb_vft = &proxy_session_cb_vft;
487   a->options = options;
488   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
489   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
490   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
491   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
492   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
493     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
494
495   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
496
497   if (vnet_application_attach (a))
498     {
499       clib_warning ("failed to attach server");
500       return -1;
501     }
502   pm->server_app_index = a->app_index;
503
504   vec_free (a->name);
505   return 0;
506 }
507
508 static int
509 active_open_attach (void)
510 {
511   proxy_main_t *pm = &proxy_main;
512   vnet_app_attach_args_t _a, *a = &_a;
513   u64 options[16];
514
515   clib_memset (a, 0, sizeof (*a));
516   clib_memset (options, 0, sizeof (options));
517
518   a->api_client_index = pm->active_open_client_index;
519   a->session_cb_vft = &active_open_clients;
520   a->name = format (0, "proxy-active-open");
521
522   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
523   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
524   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
525   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
526   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
527   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
528     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
529
530   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
531     | APP_OPTIONS_FLAGS_IS_PROXY;
532
533   a->options = options;
534
535   if (vnet_application_attach (a))
536     return -1;
537
538   pm->active_open_app_index = a->app_index;
539
540   vec_free (a->name);
541
542   return 0;
543 }
544
545 static int
546 proxy_server_listen ()
547 {
548   proxy_main_t *pm = &proxy_main;
549   vnet_listen_args_t _a, *a = &_a;
550   clib_memset (a, 0, sizeof (*a));
551   a->app_index = pm->server_app_index;
552   a->uri = (char *) pm->server_uri;
553   return vnet_bind_uri (a);
554 }
555
556 static int
557 proxy_server_create (vlib_main_t * vm)
558 {
559   proxy_main_t *pm = &proxy_main;
560   vlib_thread_main_t *vtm = vlib_get_thread_main ();
561   u32 num_threads;
562   int i;
563
564   num_threads = 1 /* main thread */  + vtm->n_threads;
565   vec_validate (proxy_main.server_event_queue, num_threads - 1);
566   vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
567   vec_validate (pm->rx_buf, num_threads - 1);
568
569   for (i = 0; i < num_threads; i++)
570     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
571
572   if (proxy_server_attach ())
573     {
574       clib_warning ("failed to attach server app");
575       return -1;
576     }
577   if (proxy_server_listen ())
578     {
579       clib_warning ("failed to start listening");
580       return -1;
581     }
582   if (active_open_attach ())
583     {
584       clib_warning ("failed to attach active open app");
585       return -1;
586     }
587
588   for (i = 0; i < num_threads; i++)
589     {
590       pm->active_open_event_queue[i] = session_main_get_vpp_event_queue (i);
591
592       ASSERT (pm->active_open_event_queue[i]);
593
594       pm->server_event_queue[i] = session_main_get_vpp_event_queue (i);
595     }
596
597   return 0;
598 }
599
600 static clib_error_t *
601 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
602                                 vlib_cli_command_t * cmd)
603 {
604   proxy_main_t *pm = &proxy_main;
605   char *default_server_uri = "tcp://0.0.0.0/23";
606   char *default_client_uri = "tcp://6.0.2.2/23";
607   int rv;
608   u64 tmp;
609
610   pm->fifo_size = 64 << 10;
611   pm->rcv_buffer_size = 1024;
612   pm->prealloc_fifos = 0;
613   pm->private_segment_count = 0;
614   pm->private_segment_size = 0;
615   pm->server_uri = 0;
616   pm->client_uri = 0;
617
618   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
619     {
620       if (unformat (input, "fifo-size %d", &pm->fifo_size))
621         pm->fifo_size <<= 10;
622       else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
623         ;
624       else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
625         ;
626       else if (unformat (input, "private-segment-count %d",
627                          &pm->private_segment_count))
628         ;
629       else if (unformat (input, "private-segment-size %U",
630                          unformat_memory_size, &tmp))
631         {
632           if (tmp >= 0x100000000ULL)
633             return clib_error_return
634               (0, "private segment size %lld (%llu) too large", tmp, tmp);
635           pm->private_segment_size = tmp;
636         }
637       else if (unformat (input, "server-uri %s", &pm->server_uri))
638         vec_add1 (pm->server_uri, 0);
639       else if (unformat (input, "client-uri %s", &pm->client_uri))
640         vec_add1 (pm->client_uri, 0);
641       else
642         return clib_error_return (0, "unknown input `%U'",
643                                   format_unformat_error, input);
644     }
645
646   if (!pm->server_uri)
647     {
648       clib_warning ("No server-uri provided, Using default: %s",
649                     default_server_uri);
650       pm->server_uri = format (0, "%s%c", default_server_uri, 0);
651     }
652   if (!pm->client_uri)
653     {
654       clib_warning ("No client-uri provided, Using default: %s",
655                     default_client_uri);
656       pm->client_uri = format (0, "%s%c", default_client_uri, 0);
657     }
658
659   vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
660
661   rv = proxy_server_create (vm);
662   switch (rv)
663     {
664     case 0:
665       break;
666     default:
667       return clib_error_return (0, "server_create returned %d", rv);
668     }
669
670   return 0;
671 }
672
673 /* *INDENT-OFF* */
674 VLIB_CLI_COMMAND (proxy_create_command, static) =
675 {
676   .path = "test proxy server",
677   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
678       "[client-uri <tcp://ip/port>][fifo-size <nn>][rcv-buf-size <nn>]"
679       "[prealloc-fifos <nn>][private-segment-size <mem>]"
680       "[private-segment-count <nn>]",
681   .function = proxy_server_create_command_fn,
682 };
683 /* *INDENT-ON* */
684
685 clib_error_t *
686 proxy_main_init (vlib_main_t * vm)
687 {
688   proxy_main_t *pm = &proxy_main;
689   pm->server_client_index = ~0;
690   pm->active_open_client_index = ~0;
691   pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
692   pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
693
694   return 0;
695 }
696
697 VLIB_INIT_FUNCTION (proxy_main_init);
698
699 /*
700 * fd.io coding-style-patch-verification: ON
701 *
702 * Local Variables:
703 * eval: (c-set-style "gnu")
704 * End:
705 */