hsa: proxy rcv wnd update acks after full fifos
[vpp.git] / src / plugins / hs_apps / proxy.c
1 /*
2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <hs_apps/proxy.h>
21 #include <vnet/tcp/tcp.h>
22
23 proxy_main_t proxy_main;
24
25 #define TCP_MSS 1460
26
27 typedef struct
28 {
29   char uri[128];
30   u32 app_index;
31   u32 api_context;
32 } proxy_connect_args_t;
33
34 static void
35 proxy_cb_fn (void *data, u32 data_len)
36 {
37   proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
38   vnet_connect_args_t a;
39
40   memset (&a, 0, sizeof (a));
41   a.api_context = pa->api_context;
42   a.app_index = pa->app_index;
43   a.uri = pa->uri;
44   vnet_connect_uri (&a);
45 }
46
47 static void
48 proxy_call_main_thread (vnet_connect_args_t * a)
49 {
50   if (vlib_get_thread_index () == 0)
51     {
52       vnet_connect_uri (a);
53     }
54   else
55     {
56       proxy_connect_args_t args;
57       args.api_context = a->api_context;
58       args.app_index = a->app_index;
59       clib_memcpy (args.uri, a->uri, vec_len (a->uri));
60       vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
61     }
62 }
63
64 static void
65 delete_proxy_session (session_t * s, int is_active_open)
66 {
67   proxy_main_t *pm = &proxy_main;
68   proxy_session_t *ps = 0;
69   vnet_disconnect_args_t _a, *a = &_a;
70   session_t *active_open_session = 0;
71   session_t *server_session = 0;
72   uword *p;
73   u64 handle;
74
75   clib_spinlock_lock_if_init (&pm->sessions_lock);
76
77   handle = session_handle (s);
78
79   if (is_active_open)
80     {
81       p = hash_get (pm->proxy_session_by_active_open_handle, handle);
82       if (p == 0)
83         {
84           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
85                         is_active_open ? "active open" : "server",
86                         handle, handle);
87         }
88       else if (!pool_is_free_index (pm->sessions, p[0]))
89         {
90           active_open_session = s;
91           ps = pool_elt_at_index (pm->sessions, p[0]);
92           if (ps->vpp_server_handle != ~0)
93             server_session = session_get_from_handle (ps->vpp_server_handle);
94         }
95     }
96   else
97     {
98       p = hash_get (pm->proxy_session_by_server_handle, handle);
99       if (p == 0)
100         {
101           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
102                         is_active_open ? "active open" : "server",
103                         handle, handle);
104         }
105       else if (!pool_is_free_index (pm->sessions, p[0]))
106         {
107           server_session = s;
108           ps = pool_elt_at_index (pm->sessions, p[0]);
109           if (ps->vpp_active_open_handle != ~0)
110             active_open_session = session_get_from_handle
111               (ps->vpp_active_open_handle);
112         }
113     }
114
115   if (ps)
116     {
117       if (CLIB_DEBUG > 0)
118         clib_memset (ps, 0xFE, sizeof (*ps));
119       pool_put (pm->sessions, ps);
120     }
121
122   if (active_open_session)
123     {
124       a->handle = session_handle (active_open_session);
125       a->app_index = pm->active_open_app_index;
126       hash_unset (pm->proxy_session_by_active_open_handle,
127                   session_handle (active_open_session));
128       vnet_disconnect_session (a);
129     }
130
131   if (server_session)
132     {
133       a->handle = session_handle (server_session);
134       a->app_index = pm->server_app_index;
135       hash_unset (pm->proxy_session_by_server_handle,
136                   session_handle (server_session));
137       vnet_disconnect_session (a);
138     }
139
140   clib_spinlock_unlock_if_init (&pm->sessions_lock);
141 }
142
143 static int
144 proxy_accept_callback (session_t * s)
145 {
146   proxy_main_t *pm = &proxy_main;
147
148   s->session_state = SESSION_STATE_READY;
149
150   clib_spinlock_lock_if_init (&pm->sessions_lock);
151
152   return 0;
153 }
154
155 static void
156 proxy_disconnect_callback (session_t * s)
157 {
158   delete_proxy_session (s, 0 /* is_active_open */ );
159 }
160
161 static void
162 proxy_reset_callback (session_t * s)
163 {
164   clib_warning ("Reset session %U", format_session, s, 2);
165   delete_proxy_session (s, 0 /* is_active_open */ );
166 }
167
168 static int
169 proxy_connected_callback (u32 app_index, u32 api_context,
170                           session_t * s, u8 is_fail)
171 {
172   clib_warning ("called...");
173   return -1;
174 }
175
176 static int
177 proxy_add_segment_callback (u32 client_index, u64 segment_handle)
178 {
179   clib_warning ("called...");
180   return -1;
181 }
182
183 static int
184 proxy_rx_callback (session_t * s)
185 {
186   u32 max_dequeue;
187   int actual_transfer __attribute__ ((unused));
188   svm_fifo_t *tx_fifo, *rx_fifo;
189   proxy_main_t *pm = &proxy_main;
190   u32 thread_index = vlib_get_thread_index ();
191   vnet_connect_args_t _a, *a = &_a;
192   proxy_session_t *ps;
193   int proxy_index;
194   uword *p;
195   svm_fifo_t *ao_tx_fifo;
196
197   ASSERT (s->thread_index == thread_index);
198
199   clib_spinlock_lock_if_init (&pm->sessions_lock);
200   p = hash_get (pm->proxy_session_by_server_handle, session_handle (s));
201
202   if (PREDICT_TRUE (p != 0))
203     {
204       clib_spinlock_unlock_if_init (&pm->sessions_lock);
205       ao_tx_fifo = s->rx_fifo;
206
207       /*
208        * Send event for active open tx fifo
209        */
210       if (svm_fifo_set_event (ao_tx_fifo))
211         {
212           u32 ao_thread_index = ao_tx_fifo->master_thread_index;
213           u32 ao_session_index = ao_tx_fifo->master_session_index;
214           if (session_send_io_evt_to_thread_custom (&ao_session_index,
215                                                     ao_thread_index,
216                                                     SESSION_IO_EVT_TX))
217             clib_warning ("failed to enqueue tx evt");
218         }
219
220       if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
221         svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
222     }
223   else
224     {
225       rx_fifo = s->rx_fifo;
226       tx_fifo = s->tx_fifo;
227
228       ASSERT (rx_fifo->master_thread_index == thread_index);
229       ASSERT (tx_fifo->master_thread_index == thread_index);
230
231       max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
232
233       if (PREDICT_FALSE (max_dequeue == 0))
234         return 0;
235
236       max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
237       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
238                                        max_dequeue, pm->rx_buf[thread_index]);
239
240       /* $$$ your message in this space: parse url, etc. */
241
242       clib_memset (a, 0, sizeof (*a));
243
244       pool_get (pm->sessions, ps);
245       clib_memset (ps, 0, sizeof (*ps));
246       ps->server_rx_fifo = rx_fifo;
247       ps->server_tx_fifo = tx_fifo;
248       ps->vpp_server_handle = session_handle (s);
249
250       proxy_index = ps - pm->sessions;
251
252       hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
253                 proxy_index);
254
255       clib_spinlock_unlock_if_init (&pm->sessions_lock);
256
257       a->uri = (char *) pm->client_uri;
258       a->api_context = proxy_index;
259       a->app_index = pm->active_open_app_index;
260       proxy_call_main_thread (a);
261     }
262
263   return 0;
264 }
265
266 static int
267 proxy_tx_callback (session_t * proxy_s)
268 {
269   proxy_main_t *pm = &proxy_main;
270   transport_connection_t *tc;
271   session_handle_t handle;
272   proxy_session_t *ps;
273   session_t *ao_s;
274   uword *p;
275
276   clib_spinlock_lock_if_init (&pm->sessions_lock);
277
278   handle = session_handle (proxy_s);
279   p = hash_get (pm->proxy_session_by_server_handle, handle);
280   if (!p)
281     return 0;
282
283   if (pool_is_free_index (pm->sessions, p[0]))
284     return 0;
285
286   ps = pool_elt_at_index (pm->sessions, p[0]);
287   if (ps->vpp_active_open_handle == ~0)
288     return 0;
289
290   ao_s = session_get_from_handle (ps->vpp_active_open_handle);
291
292   /* Force ack on active open side to update rcv wnd */
293   tc = session_get_transport (ao_s);
294   tcp_send_ack ((tcp_connection_t *) tc);
295
296   clib_spinlock_unlock_if_init (&pm->sessions_lock);
297
298   return 0;
299 }
300
301 static session_cb_vft_t proxy_session_cb_vft = {
302   .session_accept_callback = proxy_accept_callback,
303   .session_disconnect_callback = proxy_disconnect_callback,
304   .session_connected_callback = proxy_connected_callback,
305   .add_segment_callback = proxy_add_segment_callback,
306   .builtin_app_rx_callback = proxy_rx_callback,
307   .builtin_app_tx_callback = proxy_tx_callback,
308   .session_reset_callback = proxy_reset_callback
309 };
310
311 static int
312 active_open_connected_callback (u32 app_index, u32 opaque,
313                                 session_t * s, u8 is_fail)
314 {
315   proxy_main_t *pm = &proxy_main;
316   proxy_session_t *ps;
317   u8 thread_index = vlib_get_thread_index ();
318
319   if (is_fail)
320     {
321       clib_warning ("connection %d failed!", opaque);
322       return 0;
323     }
324
325   /*
326    * Setup proxy session handle.
327    */
328   clib_spinlock_lock_if_init (&pm->sessions_lock);
329
330   ps = pool_elt_at_index (pm->sessions, opaque);
331   ps->vpp_active_open_handle = session_handle (s);
332
333   s->tx_fifo = ps->server_rx_fifo;
334   s->rx_fifo = ps->server_tx_fifo;
335
336   /*
337    * Reset the active-open tx-fifo master indices so the active-open session
338    * will receive data, etc.
339    */
340   s->tx_fifo->master_session_index = s->session_index;
341   s->tx_fifo->master_thread_index = s->thread_index;
342
343   /*
344    * Account for the active-open session's use of the fifos
345    * so they won't disappear until the last session which uses
346    * them disappears
347    */
348   s->tx_fifo->refcnt++;
349   s->rx_fifo->refcnt++;
350
351   hash_set (pm->proxy_session_by_active_open_handle,
352             ps->vpp_active_open_handle, opaque);
353
354   clib_spinlock_unlock_if_init (&pm->sessions_lock);
355
356   /*
357    * Send event for active open tx fifo
358    */
359   ASSERT (s->thread_index == thread_index);
360   if (svm_fifo_set_event (s->tx_fifo))
361     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
362
363   return 0;
364 }
365
366 static void
367 active_open_reset_callback (session_t * s)
368 {
369   delete_proxy_session (s, 1 /* is_active_open */ );
370 }
371
372 static int
373 active_open_create_callback (session_t * s)
374 {
375   return 0;
376 }
377
378 static void
379 active_open_disconnect_callback (session_t * s)
380 {
381   delete_proxy_session (s, 1 /* is_active_open */ );
382 }
383
384 static int
385 active_open_rx_callback (session_t * s)
386 {
387   svm_fifo_t *proxy_tx_fifo;
388
389   proxy_tx_fifo = s->rx_fifo;
390
391   /*
392    * Send event for server tx fifo
393    */
394   if (svm_fifo_set_event (proxy_tx_fifo))
395     {
396       u8 thread_index = proxy_tx_fifo->master_thread_index;
397       u32 session_index = proxy_tx_fifo->master_session_index;
398       return session_send_io_evt_to_thread_custom (&session_index,
399                                                    thread_index,
400                                                    SESSION_IO_EVT_TX);
401     }
402
403   if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS)
404     svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
405
406   return 0;
407 }
408
409 static int
410 active_open_tx_callback (session_t * ao_s)
411 {
412   proxy_main_t *pm = &proxy_main;
413   transport_connection_t *tc;
414   session_handle_t handle;
415   proxy_session_t *ps;
416   session_t *proxy_s;
417   uword *p;
418
419   clib_spinlock_lock_if_init (&pm->sessions_lock);
420
421   handle = session_handle (ao_s);
422   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
423   if (!p)
424     return 0;
425
426   if (pool_is_free_index (pm->sessions, p[0]))
427     return 0;
428
429   ps = pool_elt_at_index (pm->sessions, p[0]);
430   if (ps->vpp_server_handle == ~0)
431     return 0;
432
433   proxy_s = session_get_from_handle (ps->vpp_server_handle);
434
435   /* Force ack on proxy side to update rcv wnd */
436   tc = session_get_transport (proxy_s);
437   tcp_send_ack ((tcp_connection_t *) tc);
438
439   clib_spinlock_unlock_if_init (&pm->sessions_lock);
440
441   return 0;
442 }
443
444 /* *INDENT-OFF* */
445 static session_cb_vft_t active_open_clients = {
446   .session_reset_callback = active_open_reset_callback,
447   .session_connected_callback = active_open_connected_callback,
448   .session_accept_callback = active_open_create_callback,
449   .session_disconnect_callback = active_open_disconnect_callback,
450   .builtin_app_rx_callback = active_open_rx_callback,
451   .builtin_app_tx_callback = active_open_tx_callback,
452 };
453 /* *INDENT-ON* */
454
455 static int
456 proxy_server_attach ()
457 {
458   proxy_main_t *pm = &proxy_main;
459   u64 options[APP_OPTIONS_N_OPTIONS];
460   vnet_app_attach_args_t _a, *a = &_a;
461   u32 segment_size = 512 << 20;
462
463   clib_memset (a, 0, sizeof (*a));
464   clib_memset (options, 0, sizeof (options));
465
466   if (pm->private_segment_size)
467     segment_size = pm->private_segment_size;
468   a->name = format (0, "proxy-server");
469   a->api_client_index = pm->server_client_index;
470   a->session_cb_vft = &proxy_session_cb_vft;
471   a->options = options;
472   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
473   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
474   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
475   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
476   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
477     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
478
479   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
480
481   if (vnet_application_attach (a))
482     {
483       clib_warning ("failed to attach server");
484       return -1;
485     }
486   pm->server_app_index = a->app_index;
487
488   vec_free (a->name);
489   return 0;
490 }
491
492 static int
493 active_open_attach (void)
494 {
495   proxy_main_t *pm = &proxy_main;
496   vnet_app_attach_args_t _a, *a = &_a;
497   u64 options[16];
498
499   clib_memset (a, 0, sizeof (*a));
500   clib_memset (options, 0, sizeof (options));
501
502   a->api_client_index = pm->active_open_client_index;
503   a->session_cb_vft = &active_open_clients;
504   a->name = format (0, "proxy-active-open");
505
506   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
507   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
508   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
509   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
510   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
511   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
512     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
513
514   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
515     | APP_OPTIONS_FLAGS_IS_PROXY;
516
517   a->options = options;
518
519   if (vnet_application_attach (a))
520     return -1;
521
522   pm->active_open_app_index = a->app_index;
523
524   vec_free (a->name);
525
526   return 0;
527 }
528
529 static int
530 proxy_server_listen ()
531 {
532   proxy_main_t *pm = &proxy_main;
533   vnet_listen_args_t _a, *a = &_a;
534   clib_memset (a, 0, sizeof (*a));
535   a->app_index = pm->server_app_index;
536   a->uri = (char *) pm->server_uri;
537   return vnet_bind_uri (a);
538 }
539
540 static int
541 proxy_server_create (vlib_main_t * vm)
542 {
543   proxy_main_t *pm = &proxy_main;
544   vlib_thread_main_t *vtm = vlib_get_thread_main ();
545   u32 num_threads;
546   int i;
547
548   num_threads = 1 /* main thread */  + vtm->n_threads;
549   vec_validate (proxy_main.server_event_queue, num_threads - 1);
550   vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
551   vec_validate (pm->rx_buf, num_threads - 1);
552
553   for (i = 0; i < num_threads; i++)
554     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
555
556   if (proxy_server_attach ())
557     {
558       clib_warning ("failed to attach server app");
559       return -1;
560     }
561   if (proxy_server_listen ())
562     {
563       clib_warning ("failed to start listening");
564       return -1;
565     }
566   if (active_open_attach ())
567     {
568       clib_warning ("failed to attach active open app");
569       return -1;
570     }
571
572   for (i = 0; i < num_threads; i++)
573     {
574       pm->active_open_event_queue[i] = session_main_get_vpp_event_queue (i);
575
576       ASSERT (pm->active_open_event_queue[i]);
577
578       pm->server_event_queue[i] = session_main_get_vpp_event_queue (i);
579     }
580
581   return 0;
582 }
583
584 static clib_error_t *
585 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
586                                 vlib_cli_command_t * cmd)
587 {
588   proxy_main_t *pm = &proxy_main;
589   char *default_server_uri = "tcp://0.0.0.0/23";
590   char *default_client_uri = "tcp://6.0.2.2/23";
591   int rv;
592   u64 tmp;
593
594   pm->fifo_size = 64 << 10;
595   pm->rcv_buffer_size = 1024;
596   pm->prealloc_fifos = 0;
597   pm->private_segment_count = 0;
598   pm->private_segment_size = 0;
599   pm->server_uri = 0;
600   pm->client_uri = 0;
601
602   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
603     {
604       if (unformat (input, "fifo-size %d", &pm->fifo_size))
605         pm->fifo_size <<= 10;
606       else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
607         ;
608       else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
609         ;
610       else if (unformat (input, "private-segment-count %d",
611                          &pm->private_segment_count))
612         ;
613       else if (unformat (input, "private-segment-size %U",
614                          unformat_memory_size, &tmp))
615         {
616           if (tmp >= 0x100000000ULL)
617             return clib_error_return
618               (0, "private segment size %lld (%llu) too large", tmp, tmp);
619           pm->private_segment_size = tmp;
620         }
621       else if (unformat (input, "server-uri %s", &pm->server_uri))
622         vec_add1 (pm->server_uri, 0);
623       else if (unformat (input, "client-uri %s", &pm->client_uri))
624         vec_add1 (pm->client_uri, 0);
625       else
626         return clib_error_return (0, "unknown input `%U'",
627                                   format_unformat_error, input);
628     }
629
630   if (!pm->server_uri)
631     {
632       clib_warning ("No server-uri provided, Using default: %s",
633                     default_server_uri);
634       pm->server_uri = format (0, "%s%c", default_server_uri, 0);
635     }
636   if (!pm->client_uri)
637     {
638       clib_warning ("No client-uri provided, Using default: %s",
639                     default_client_uri);
640       pm->client_uri = format (0, "%s%c", default_client_uri, 0);
641     }
642
643   vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
644
645   rv = proxy_server_create (vm);
646   switch (rv)
647     {
648     case 0:
649       break;
650     default:
651       return clib_error_return (0, "server_create returned %d", rv);
652     }
653
654   return 0;
655 }
656
657 /* *INDENT-OFF* */
658 VLIB_CLI_COMMAND (proxy_create_command, static) =
659 {
660   .path = "test proxy server",
661   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
662       "[client-uri <tcp://ip/port>][fifo-size <nn>][rcv-buf-size <nn>]"
663       "[prealloc-fifos <nn>][private-segment-size <mem>]"
664       "[private-segment-count <nn>]",
665   .function = proxy_server_create_command_fn,
666 };
667 /* *INDENT-ON* */
668
669 clib_error_t *
670 proxy_main_init (vlib_main_t * vm)
671 {
672   proxy_main_t *pm = &proxy_main;
673   pm->server_client_index = ~0;
674   pm->active_open_client_index = ~0;
675   pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
676   pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
677
678   return 0;
679 }
680
681 VLIB_INIT_FUNCTION (proxy_main_init);
682
683 /*
684 * fd.io coding-style-patch-verification: ON
685 *
686 * Local Variables:
687 * eval: (c-set-style "gnu")
688 * End:
689 */