d0e3bc49b4268f92eb1c0b3413436fb2e444de49
[vpp.git] / src / plugins / hs_apps / proxy.c
1 /*
2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <hs_apps/proxy.h>
21 #include <vnet/tcp/tcp.h>
22
23 proxy_main_t proxy_main;
24
25 #define TCP_MSS 1460
26
27 typedef struct
28 {
29   char uri[128];
30   u32 app_index;
31   u32 api_context;
32 } proxy_connect_args_t;
33
34 static void
35 proxy_cb_fn (void *data, u32 data_len)
36 {
37   proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
38   vnet_connect_args_t a;
39
40   memset (&a, 0, sizeof (a));
41   a.api_context = pa->api_context;
42   a.app_index = pa->app_index;
43   a.uri = pa->uri;
44   vnet_connect_uri (&a);
45 }
46
47 static void
48 proxy_call_main_thread (vnet_connect_args_t * a)
49 {
50   if (vlib_get_thread_index () == 0)
51     {
52       vnet_connect_uri (a);
53     }
54   else
55     {
56       proxy_connect_args_t args;
57       args.api_context = a->api_context;
58       args.app_index = a->app_index;
59       clib_memcpy (args.uri, a->uri, vec_len (a->uri));
60       vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
61     }
62 }
63
64 static void
65 delete_proxy_session (session_t * s, int is_active_open)
66 {
67   proxy_main_t *pm = &proxy_main;
68   proxy_session_t *ps = 0;
69   vnet_disconnect_args_t _a, *a = &_a;
70   session_t *active_open_session = 0;
71   session_t *server_session = 0;
72   uword *p;
73   u64 handle;
74
75   clib_spinlock_lock_if_init (&pm->sessions_lock);
76
77   handle = session_handle (s);
78
79   if (is_active_open)
80     {
81       p = hash_get (pm->proxy_session_by_active_open_handle, handle);
82       if (p == 0)
83         {
84           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
85                         is_active_open ? "active open" : "server",
86                         handle, handle);
87         }
88       else if (!pool_is_free_index (pm->sessions, p[0]))
89         {
90           active_open_session = s;
91           ps = pool_elt_at_index (pm->sessions, p[0]);
92           if (ps->vpp_server_handle != ~0)
93             server_session = session_get_from_handle (ps->vpp_server_handle);
94         }
95     }
96   else
97     {
98       p = hash_get (pm->proxy_session_by_server_handle, handle);
99       if (p == 0)
100         {
101           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
102                         is_active_open ? "active open" : "server",
103                         handle, handle);
104         }
105       else if (!pool_is_free_index (pm->sessions, p[0]))
106         {
107           server_session = s;
108           ps = pool_elt_at_index (pm->sessions, p[0]);
109           if (ps->vpp_active_open_handle != ~0)
110             active_open_session = session_get_from_handle
111               (ps->vpp_active_open_handle);
112         }
113     }
114
115   if (ps)
116     {
117       if (CLIB_DEBUG > 0)
118         clib_memset (ps, 0xFE, sizeof (*ps));
119       pool_put (pm->sessions, ps);
120     }
121
122   if (active_open_session)
123     {
124       a->handle = session_handle (active_open_session);
125       a->app_index = pm->active_open_app_index;
126       hash_unset (pm->proxy_session_by_active_open_handle,
127                   session_handle (active_open_session));
128       vnet_disconnect_session (a);
129     }
130
131   if (server_session)
132     {
133       a->handle = session_handle (server_session);
134       a->app_index = pm->server_app_index;
135       hash_unset (pm->proxy_session_by_server_handle,
136                   session_handle (server_session));
137       vnet_disconnect_session (a);
138     }
139
140   clib_spinlock_unlock_if_init (&pm->sessions_lock);
141 }
142
143 static int
144 proxy_accept_callback (session_t * s)
145 {
146   proxy_main_t *pm = &proxy_main;
147
148   s->session_state = SESSION_STATE_READY;
149
150   clib_spinlock_lock_if_init (&pm->sessions_lock);
151
152   return 0;
153 }
154
155 static void
156 proxy_disconnect_callback (session_t * s)
157 {
158   delete_proxy_session (s, 0 /* is_active_open */ );
159 }
160
161 static void
162 proxy_reset_callback (session_t * s)
163 {
164   clib_warning ("Reset session %U", format_session, s, 2);
165   delete_proxy_session (s, 0 /* is_active_open */ );
166 }
167
168 static int
169 proxy_connected_callback (u32 app_index, u32 api_context,
170                           session_t * s, u8 is_fail)
171 {
172   clib_warning ("called...");
173   return -1;
174 }
175
176 static int
177 proxy_add_segment_callback (u32 client_index, u64 segment_handle)
178 {
179   clib_warning ("called...");
180   return -1;
181 }
182
183 static int
184 proxy_rx_callback (session_t * s)
185 {
186   u32 max_dequeue;
187   int actual_transfer __attribute__ ((unused));
188   svm_fifo_t *tx_fifo, *rx_fifo;
189   proxy_main_t *pm = &proxy_main;
190   u32 thread_index = vlib_get_thread_index ();
191   vnet_connect_args_t _a, *a = &_a;
192   proxy_session_t *ps;
193   int proxy_index;
194   uword *p;
195   svm_fifo_t *ao_tx_fifo;
196
197   ASSERT (s->thread_index == thread_index);
198
199   clib_spinlock_lock_if_init (&pm->sessions_lock);
200   p = hash_get (pm->proxy_session_by_server_handle, session_handle (s));
201
202   if (PREDICT_TRUE (p != 0))
203     {
204       clib_spinlock_unlock_if_init (&pm->sessions_lock);
205       ao_tx_fifo = s->rx_fifo;
206
207       /*
208        * Send event for active open tx fifo
209        */
210       if (svm_fifo_set_event (ao_tx_fifo))
211         {
212           u32 ao_thread_index = ao_tx_fifo->master_thread_index;
213           u32 ao_session_index = ao_tx_fifo->master_session_index;
214           if (session_send_io_evt_to_thread_custom (&ao_session_index,
215                                                     ao_thread_index,
216                                                     SESSION_IO_EVT_TX))
217             clib_warning ("failed to enqueue tx evt");
218         }
219
220       if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
221         svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
222     }
223   else
224     {
225       rx_fifo = s->rx_fifo;
226       tx_fifo = s->tx_fifo;
227
228       ASSERT (rx_fifo->master_thread_index == thread_index);
229       ASSERT (tx_fifo->master_thread_index == thread_index);
230
231       max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
232
233       if (PREDICT_FALSE (max_dequeue == 0))
234         return 0;
235
236       max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
237       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
238                                        max_dequeue, pm->rx_buf[thread_index]);
239
240       /* $$$ your message in this space: parse url, etc. */
241
242       clib_memset (a, 0, sizeof (*a));
243
244       pool_get (pm->sessions, ps);
245       clib_memset (ps, 0, sizeof (*ps));
246       ps->server_rx_fifo = rx_fifo;
247       ps->server_tx_fifo = tx_fifo;
248       ps->vpp_server_handle = session_handle (s);
249
250       proxy_index = ps - pm->sessions;
251
252       hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
253                 proxy_index);
254
255       clib_spinlock_unlock_if_init (&pm->sessions_lock);
256
257       a->uri = (char *) pm->client_uri;
258       a->api_context = proxy_index;
259       a->app_index = pm->active_open_app_index;
260       proxy_call_main_thread (a);
261     }
262
263   return 0;
264 }
265
266 static int
267 proxy_tx_callback (session_t * proxy_s)
268 {
269   proxy_main_t *pm = &proxy_main;
270   transport_connection_t *tc;
271   session_handle_t handle;
272   proxy_session_t *ps;
273   session_t *ao_s;
274   u32 min_free;
275   uword *p;
276
277   min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
278   if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free)
279     {
280       svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
281       return 0;
282     }
283
284   clib_spinlock_lock_if_init (&pm->sessions_lock);
285
286   handle = session_handle (proxy_s);
287   p = hash_get (pm->proxy_session_by_server_handle, handle);
288   if (!p)
289     return 0;
290
291   if (pool_is_free_index (pm->sessions, p[0]))
292     return 0;
293
294   ps = pool_elt_at_index (pm->sessions, p[0]);
295   if (ps->vpp_active_open_handle == ~0)
296     return 0;
297
298   ao_s = session_get_from_handle (ps->vpp_active_open_handle);
299
300   /* Force ack on active open side to update rcv wnd */
301   tc = session_get_transport (ao_s);
302   tcp_send_ack ((tcp_connection_t *) tc);
303
304   clib_spinlock_unlock_if_init (&pm->sessions_lock);
305
306   return 0;
307 }
308
309 static session_cb_vft_t proxy_session_cb_vft = {
310   .session_accept_callback = proxy_accept_callback,
311   .session_disconnect_callback = proxy_disconnect_callback,
312   .session_connected_callback = proxy_connected_callback,
313   .add_segment_callback = proxy_add_segment_callback,
314   .builtin_app_rx_callback = proxy_rx_callback,
315   .builtin_app_tx_callback = proxy_tx_callback,
316   .session_reset_callback = proxy_reset_callback
317 };
318
319 static int
320 active_open_connected_callback (u32 app_index, u32 opaque,
321                                 session_t * s, u8 is_fail)
322 {
323   proxy_main_t *pm = &proxy_main;
324   proxy_session_t *ps;
325   u8 thread_index = vlib_get_thread_index ();
326
327   if (is_fail)
328     {
329       clib_warning ("connection %d failed!", opaque);
330       return 0;
331     }
332
333   /*
334    * Setup proxy session handle.
335    */
336   clib_spinlock_lock_if_init (&pm->sessions_lock);
337
338   ps = pool_elt_at_index (pm->sessions, opaque);
339   ps->vpp_active_open_handle = session_handle (s);
340
341   s->tx_fifo = ps->server_rx_fifo;
342   s->rx_fifo = ps->server_tx_fifo;
343
344   /*
345    * Reset the active-open tx-fifo master indices so the active-open session
346    * will receive data, etc.
347    */
348   s->tx_fifo->master_session_index = s->session_index;
349   s->tx_fifo->master_thread_index = s->thread_index;
350
351   /*
352    * Account for the active-open session's use of the fifos
353    * so they won't disappear until the last session which uses
354    * them disappears
355    */
356   s->tx_fifo->refcnt++;
357   s->rx_fifo->refcnt++;
358
359   svm_fifo_init_ooo_lookup (s->tx_fifo, 1 /* deq ooo */ );
360   svm_fifo_init_ooo_lookup (s->rx_fifo, 0 /* enq ooo */ );
361
362   hash_set (pm->proxy_session_by_active_open_handle,
363             ps->vpp_active_open_handle, opaque);
364
365   clib_spinlock_unlock_if_init (&pm->sessions_lock);
366
367   /*
368    * Send event for active open tx fifo
369    */
370   ASSERT (s->thread_index == thread_index);
371   if (svm_fifo_set_event (s->tx_fifo))
372     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
373
374   return 0;
375 }
376
377 static void
378 active_open_reset_callback (session_t * s)
379 {
380   delete_proxy_session (s, 1 /* is_active_open */ );
381 }
382
383 static int
384 active_open_create_callback (session_t * s)
385 {
386   return 0;
387 }
388
389 static void
390 active_open_disconnect_callback (session_t * s)
391 {
392   delete_proxy_session (s, 1 /* is_active_open */ );
393 }
394
395 static int
396 active_open_rx_callback (session_t * s)
397 {
398   svm_fifo_t *proxy_tx_fifo;
399
400   proxy_tx_fifo = s->rx_fifo;
401
402   /*
403    * Send event for server tx fifo
404    */
405   if (svm_fifo_set_event (proxy_tx_fifo))
406     {
407       u8 thread_index = proxy_tx_fifo->master_thread_index;
408       u32 session_index = proxy_tx_fifo->master_session_index;
409       return session_send_io_evt_to_thread_custom (&session_index,
410                                                    thread_index,
411                                                    SESSION_IO_EVT_TX);
412     }
413
414   if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS)
415     svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
416
417   return 0;
418 }
419
420 static int
421 active_open_tx_callback (session_t * ao_s)
422 {
423   proxy_main_t *pm = &proxy_main;
424   transport_connection_t *tc;
425   session_handle_t handle;
426   proxy_session_t *ps;
427   session_t *proxy_s;
428   u32 min_free;
429   uword *p;
430
431   min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
432   if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free)
433     {
434       svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
435       return 0;
436     }
437
438   clib_spinlock_lock_if_init (&pm->sessions_lock);
439
440   handle = session_handle (ao_s);
441   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
442   if (!p)
443     return 0;
444
445   if (pool_is_free_index (pm->sessions, p[0]))
446     return 0;
447
448   ps = pool_elt_at_index (pm->sessions, p[0]);
449   if (ps->vpp_server_handle == ~0)
450     return 0;
451
452   proxy_s = session_get_from_handle (ps->vpp_server_handle);
453
454   /* Force ack on proxy side to update rcv wnd */
455   tc = session_get_transport (proxy_s);
456   tcp_send_ack ((tcp_connection_t *) tc);
457
458   clib_spinlock_unlock_if_init (&pm->sessions_lock);
459
460   return 0;
461 }
462
463 /* *INDENT-OFF* */
464 static session_cb_vft_t active_open_clients = {
465   .session_reset_callback = active_open_reset_callback,
466   .session_connected_callback = active_open_connected_callback,
467   .session_accept_callback = active_open_create_callback,
468   .session_disconnect_callback = active_open_disconnect_callback,
469   .builtin_app_rx_callback = active_open_rx_callback,
470   .builtin_app_tx_callback = active_open_tx_callback,
471 };
472 /* *INDENT-ON* */
473
474 static int
475 proxy_server_attach ()
476 {
477   proxy_main_t *pm = &proxy_main;
478   u64 options[APP_OPTIONS_N_OPTIONS];
479   vnet_app_attach_args_t _a, *a = &_a;
480   u32 segment_size = 512 << 20;
481
482   clib_memset (a, 0, sizeof (*a));
483   clib_memset (options, 0, sizeof (options));
484
485   if (pm->private_segment_size)
486     segment_size = pm->private_segment_size;
487   a->name = format (0, "proxy-server");
488   a->api_client_index = pm->server_client_index;
489   a->session_cb_vft = &proxy_session_cb_vft;
490   a->options = options;
491   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
492   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
493   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
494   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
495   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
496     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
497
498   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
499
500   if (vnet_application_attach (a))
501     {
502       clib_warning ("failed to attach server");
503       return -1;
504     }
505   pm->server_app_index = a->app_index;
506
507   vec_free (a->name);
508   return 0;
509 }
510
511 static int
512 active_open_attach (void)
513 {
514   proxy_main_t *pm = &proxy_main;
515   vnet_app_attach_args_t _a, *a = &_a;
516   u64 options[16];
517
518   clib_memset (a, 0, sizeof (*a));
519   clib_memset (options, 0, sizeof (options));
520
521   a->api_client_index = pm->active_open_client_index;
522   a->session_cb_vft = &active_open_clients;
523   a->name = format (0, "proxy-active-open");
524
525   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
526   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
527   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
528   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
529   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
530   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
531     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
532
533   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
534     | APP_OPTIONS_FLAGS_IS_PROXY;
535
536   a->options = options;
537
538   if (vnet_application_attach (a))
539     return -1;
540
541   pm->active_open_app_index = a->app_index;
542
543   vec_free (a->name);
544
545   return 0;
546 }
547
548 static int
549 proxy_server_listen ()
550 {
551   proxy_main_t *pm = &proxy_main;
552   vnet_listen_args_t _a, *a = &_a;
553   clib_memset (a, 0, sizeof (*a));
554   a->app_index = pm->server_app_index;
555   a->uri = (char *) pm->server_uri;
556   return vnet_bind_uri (a);
557 }
558
559 static int
560 proxy_server_create (vlib_main_t * vm)
561 {
562   proxy_main_t *pm = &proxy_main;
563   vlib_thread_main_t *vtm = vlib_get_thread_main ();
564   u32 num_threads;
565   int i;
566
567   num_threads = 1 /* main thread */  + vtm->n_threads;
568   vec_validate (proxy_main.server_event_queue, num_threads - 1);
569   vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
570   vec_validate (pm->rx_buf, num_threads - 1);
571
572   for (i = 0; i < num_threads; i++)
573     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
574
575   if (proxy_server_attach ())
576     {
577       clib_warning ("failed to attach server app");
578       return -1;
579     }
580   if (proxy_server_listen ())
581     {
582       clib_warning ("failed to start listening");
583       return -1;
584     }
585   if (active_open_attach ())
586     {
587       clib_warning ("failed to attach active open app");
588       return -1;
589     }
590
591   for (i = 0; i < num_threads; i++)
592     {
593       pm->active_open_event_queue[i] = session_main_get_vpp_event_queue (i);
594
595       ASSERT (pm->active_open_event_queue[i]);
596
597       pm->server_event_queue[i] = session_main_get_vpp_event_queue (i);
598     }
599
600   return 0;
601 }
602
603 static clib_error_t *
604 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
605                                 vlib_cli_command_t * cmd)
606 {
607   proxy_main_t *pm = &proxy_main;
608   char *default_server_uri = "tcp://0.0.0.0/23";
609   char *default_client_uri = "tcp://6.0.2.2/23";
610   int rv;
611   u64 tmp;
612
613   pm->fifo_size = 64 << 10;
614   pm->rcv_buffer_size = 1024;
615   pm->prealloc_fifos = 0;
616   pm->private_segment_count = 0;
617   pm->private_segment_size = 0;
618   pm->server_uri = 0;
619   pm->client_uri = 0;
620
621   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
622     {
623       if (unformat (input, "fifo-size %d", &pm->fifo_size))
624         pm->fifo_size <<= 10;
625       else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
626         ;
627       else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
628         ;
629       else if (unformat (input, "private-segment-count %d",
630                          &pm->private_segment_count))
631         ;
632       else if (unformat (input, "private-segment-size %U",
633                          unformat_memory_size, &tmp))
634         {
635           if (tmp >= 0x100000000ULL)
636             return clib_error_return
637               (0, "private segment size %lld (%llu) too large", tmp, tmp);
638           pm->private_segment_size = tmp;
639         }
640       else if (unformat (input, "server-uri %s", &pm->server_uri))
641         vec_add1 (pm->server_uri, 0);
642       else if (unformat (input, "client-uri %s", &pm->client_uri))
643         vec_add1 (pm->client_uri, 0);
644       else
645         return clib_error_return (0, "unknown input `%U'",
646                                   format_unformat_error, input);
647     }
648
649   if (!pm->server_uri)
650     {
651       clib_warning ("No server-uri provided, Using default: %s",
652                     default_server_uri);
653       pm->server_uri = format (0, "%s%c", default_server_uri, 0);
654     }
655   if (!pm->client_uri)
656     {
657       clib_warning ("No client-uri provided, Using default: %s",
658                     default_client_uri);
659       pm->client_uri = format (0, "%s%c", default_client_uri, 0);
660     }
661
662   vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
663
664   rv = proxy_server_create (vm);
665   switch (rv)
666     {
667     case 0:
668       break;
669     default:
670       return clib_error_return (0, "server_create returned %d", rv);
671     }
672
673   return 0;
674 }
675
676 /* *INDENT-OFF* */
677 VLIB_CLI_COMMAND (proxy_create_command, static) =
678 {
679   .path = "test proxy server",
680   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
681       "[client-uri <tcp://ip/port>][fifo-size <nn>][rcv-buf-size <nn>]"
682       "[prealloc-fifos <nn>][private-segment-size <mem>]"
683       "[private-segment-count <nn>]",
684   .function = proxy_server_create_command_fn,
685 };
686 /* *INDENT-ON* */
687
688 clib_error_t *
689 proxy_main_init (vlib_main_t * vm)
690 {
691   proxy_main_t *pm = &proxy_main;
692   pm->server_client_index = ~0;
693   pm->active_open_client_index = ~0;
694   pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
695   pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
696
697   return 0;
698 }
699
700 VLIB_INIT_FUNCTION (proxy_main_init);
701
702 /*
703 * fd.io coding-style-patch-verification: ON
704 *
705 * Local Variables:
706 * eval: (c-set-style "gnu")
707 * End:
708 */