hsa: proxy app fixes
[vpp.git] / src / plugins / hs_apps / proxy.c
1 /*
2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <hs_apps/proxy.h>
21
22 proxy_main_t proxy_main;
23
24 typedef struct
25 {
26   char uri[128];
27   u32 app_index;
28   u32 api_context;
29 } proxy_connect_args_t;
30
31 static void
32 proxy_cb_fn (void *data, u32 data_len)
33 {
34   proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
35   vnet_connect_args_t a;
36
37   memset (&a, 0, sizeof (a));
38   a.api_context = pa->api_context;
39   a.app_index = pa->app_index;
40   a.uri = pa->uri;
41   vnet_connect_uri (&a);
42 }
43
44 static void
45 proxy_call_main_thread (vnet_connect_args_t * a)
46 {
47   if (vlib_get_thread_index () == 0)
48     {
49       vnet_connect_uri (a);
50     }
51   else
52     {
53       proxy_connect_args_t args;
54       args.api_context = a->api_context;
55       args.app_index = a->app_index;
56       clib_memcpy (args.uri, a->uri, vec_len (a->uri));
57       vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
58     }
59 }
60
61 static void
62 delete_proxy_session (session_t * s, int is_active_open)
63 {
64   proxy_main_t *pm = &proxy_main;
65   proxy_session_t *ps = 0;
66   vnet_disconnect_args_t _a, *a = &_a;
67   session_t *active_open_session = 0;
68   session_t *server_session = 0;
69   uword *p;
70   u64 handle;
71
72   clib_spinlock_lock_if_init (&pm->sessions_lock);
73
74   handle = session_handle (s);
75
76   if (is_active_open)
77     {
78       p = hash_get (pm->proxy_session_by_active_open_handle, handle);
79       if (p == 0)
80         {
81           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
82                         is_active_open ? "active open" : "server",
83                         handle, handle);
84         }
85       else if (!pool_is_free_index (pm->sessions, p[0]))
86         {
87           active_open_session = s;
88           ps = pool_elt_at_index (pm->sessions, p[0]);
89           if (ps->vpp_server_handle != ~0)
90             server_session = session_get_from_handle (ps->vpp_server_handle);
91         }
92     }
93   else
94     {
95       p = hash_get (pm->proxy_session_by_server_handle, handle);
96       if (p == 0)
97         {
98           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
99                         is_active_open ? "active open" : "server",
100                         handle, handle);
101         }
102       else if (!pool_is_free_index (pm->sessions, p[0]))
103         {
104           server_session = s;
105           ps = pool_elt_at_index (pm->sessions, p[0]);
106           if (ps->vpp_active_open_handle != ~0)
107             active_open_session = session_get_from_handle
108               (ps->vpp_active_open_handle);
109         }
110     }
111
112   if (ps)
113     {
114       if (CLIB_DEBUG > 0)
115         clib_memset (ps, 0xFE, sizeof (*ps));
116       pool_put (pm->sessions, ps);
117     }
118
119   if (active_open_session)
120     {
121       a->handle = session_handle (active_open_session);
122       a->app_index = pm->active_open_app_index;
123       hash_unset (pm->proxy_session_by_active_open_handle,
124                   session_handle (active_open_session));
125       vnet_disconnect_session (a);
126     }
127
128   if (server_session)
129     {
130       a->handle = session_handle (server_session);
131       a->app_index = pm->server_app_index;
132       hash_unset (pm->proxy_session_by_server_handle,
133                   session_handle (server_session));
134       vnet_disconnect_session (a);
135     }
136
137   clib_spinlock_unlock_if_init (&pm->sessions_lock);
138 }
139
140 static int
141 proxy_accept_callback (session_t * s)
142 {
143   proxy_main_t *pm = &proxy_main;
144
145   s->session_state = SESSION_STATE_READY;
146
147   clib_spinlock_lock_if_init (&pm->sessions_lock);
148
149   return 0;
150 }
151
152 static void
153 proxy_disconnect_callback (session_t * s)
154 {
155   delete_proxy_session (s, 0 /* is_active_open */ );
156 }
157
158 static void
159 proxy_reset_callback (session_t * s)
160 {
161   clib_warning ("Reset session %U", format_session, s, 2);
162   delete_proxy_session (s, 0 /* is_active_open */ );
163 }
164
165 static int
166 proxy_connected_callback (u32 app_index, u32 api_context,
167                           session_t * s, u8 is_fail)
168 {
169   clib_warning ("called...");
170   return -1;
171 }
172
173 static int
174 proxy_add_segment_callback (u32 client_index, u64 segment_handle)
175 {
176   clib_warning ("called...");
177   return -1;
178 }
179
180 static int
181 proxy_rx_callback (session_t * s)
182 {
183   u32 max_dequeue;
184   int actual_transfer __attribute__ ((unused));
185   svm_fifo_t *tx_fifo, *rx_fifo;
186   proxy_main_t *pm = &proxy_main;
187   u32 thread_index = vlib_get_thread_index ();
188   vnet_connect_args_t _a, *a = &_a;
189   proxy_session_t *ps;
190   int proxy_index;
191   uword *p;
192   svm_fifo_t *active_open_tx_fifo;
193
194   ASSERT (s->thread_index == thread_index);
195
196   clib_spinlock_lock_if_init (&pm->sessions_lock);
197   p = hash_get (pm->proxy_session_by_server_handle, session_handle (s));
198
199   if (PREDICT_TRUE (p != 0))
200     {
201       clib_spinlock_unlock_if_init (&pm->sessions_lock);
202       active_open_tx_fifo = s->rx_fifo;
203
204       /*
205        * Send event for active open tx fifo
206        */
207       if (svm_fifo_set_event (active_open_tx_fifo))
208         {
209           u32 ao_thread_index = active_open_tx_fifo->master_thread_index;
210           u32 ao_session_index = active_open_tx_fifo->master_session_index;
211           if (session_send_io_evt_to_thread_custom (&ao_session_index,
212                                                     ao_thread_index,
213                                                     SESSION_IO_EVT_TX))
214             clib_warning ("failed to enqueue tx evt");
215         }
216     }
217   else
218     {
219       rx_fifo = s->rx_fifo;
220       tx_fifo = s->tx_fifo;
221
222       ASSERT (rx_fifo->master_thread_index == thread_index);
223       ASSERT (tx_fifo->master_thread_index == thread_index);
224
225       max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
226
227       if (PREDICT_FALSE (max_dequeue == 0))
228         return 0;
229
230       max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
231       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
232                                        max_dequeue, pm->rx_buf[thread_index]);
233
234       /* $$$ your message in this space: parse url, etc. */
235
236       clib_memset (a, 0, sizeof (*a));
237
238       pool_get (pm->sessions, ps);
239       clib_memset (ps, 0, sizeof (*ps));
240       ps->server_rx_fifo = rx_fifo;
241       ps->server_tx_fifo = tx_fifo;
242       ps->vpp_server_handle = session_handle (s);
243
244       proxy_index = ps - pm->sessions;
245
246       hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
247                 proxy_index);
248
249       clib_spinlock_unlock_if_init (&pm->sessions_lock);
250
251       a->uri = (char *) pm->client_uri;
252       a->api_context = proxy_index;
253       a->app_index = pm->active_open_app_index;
254       proxy_call_main_thread (a);
255     }
256
257   return 0;
258 }
259
260 static session_cb_vft_t proxy_session_cb_vft = {
261   .session_accept_callback = proxy_accept_callback,
262   .session_disconnect_callback = proxy_disconnect_callback,
263   .session_connected_callback = proxy_connected_callback,
264   .add_segment_callback = proxy_add_segment_callback,
265   .builtin_app_rx_callback = proxy_rx_callback,
266   .session_reset_callback = proxy_reset_callback
267 };
268
269 static int
270 active_open_connected_callback (u32 app_index, u32 opaque,
271                                 session_t * s, u8 is_fail)
272 {
273   proxy_main_t *pm = &proxy_main;
274   proxy_session_t *ps;
275   u8 thread_index = vlib_get_thread_index ();
276
277   if (is_fail)
278     {
279       clib_warning ("connection %d failed!", opaque);
280       return 0;
281     }
282
283   /*
284    * Setup proxy session handle.
285    */
286   clib_spinlock_lock_if_init (&pm->sessions_lock);
287
288   ps = pool_elt_at_index (pm->sessions, opaque);
289   ps->vpp_active_open_handle = session_handle (s);
290
291   s->tx_fifo = ps->server_rx_fifo;
292   s->rx_fifo = ps->server_tx_fifo;
293
294   /*
295    * Reset the active-open tx-fifo master indices so the active-open session
296    * will receive data, etc.
297    */
298   s->tx_fifo->master_session_index = s->session_index;
299   s->tx_fifo->master_thread_index = s->thread_index;
300
301   /*
302    * Account for the active-open session's use of the fifos
303    * so they won't disappear until the last session which uses
304    * them disappears
305    */
306   s->tx_fifo->refcnt++;
307   s->rx_fifo->refcnt++;
308
309   hash_set (pm->proxy_session_by_active_open_handle,
310             ps->vpp_active_open_handle, opaque);
311
312   clib_spinlock_unlock_if_init (&pm->sessions_lock);
313
314   /*
315    * Send event for active open tx fifo
316    */
317   ASSERT (s->thread_index == thread_index);
318   if (svm_fifo_set_event (s->tx_fifo))
319     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
320
321   return 0;
322 }
323
324 static void
325 active_open_reset_callback (session_t * s)
326 {
327   delete_proxy_session (s, 1 /* is_active_open */ );
328 }
329
330 static int
331 active_open_create_callback (session_t * s)
332 {
333   return 0;
334 }
335
336 static void
337 active_open_disconnect_callback (session_t * s)
338 {
339   delete_proxy_session (s, 1 /* is_active_open */ );
340 }
341
342 static int
343 active_open_rx_callback (session_t * s)
344 {
345   svm_fifo_t *proxy_tx_fifo;
346
347   proxy_tx_fifo = s->rx_fifo;
348
349   /*
350    * Send event for server tx fifo
351    */
352   if (svm_fifo_set_event (proxy_tx_fifo))
353     {
354       u8 thread_index = proxy_tx_fifo->master_thread_index;
355       u32 session_index = proxy_tx_fifo->master_session_index;
356       return session_send_io_evt_to_thread_custom (&session_index,
357                                                    thread_index,
358                                                    SESSION_IO_EVT_TX);
359     }
360
361   return 0;
362 }
363
364 /* *INDENT-OFF* */
365 static session_cb_vft_t active_open_clients = {
366   .session_reset_callback = active_open_reset_callback,
367   .session_connected_callback = active_open_connected_callback,
368   .session_accept_callback = active_open_create_callback,
369   .session_disconnect_callback = active_open_disconnect_callback,
370   .builtin_app_rx_callback = active_open_rx_callback
371 };
372 /* *INDENT-ON* */
373
374 static int
375 proxy_server_attach ()
376 {
377   proxy_main_t *pm = &proxy_main;
378   u64 options[APP_OPTIONS_N_OPTIONS];
379   vnet_app_attach_args_t _a, *a = &_a;
380   u32 segment_size = 512 << 20;
381
382   clib_memset (a, 0, sizeof (*a));
383   clib_memset (options, 0, sizeof (options));
384
385   if (pm->private_segment_size)
386     segment_size = pm->private_segment_size;
387   a->name = format (0, "proxy-server");
388   a->api_client_index = pm->server_client_index;
389   a->session_cb_vft = &proxy_session_cb_vft;
390   a->options = options;
391   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
392   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
393   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
394   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
395   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
396     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
397
398   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
399
400   if (vnet_application_attach (a))
401     {
402       clib_warning ("failed to attach server");
403       return -1;
404     }
405   pm->server_app_index = a->app_index;
406
407   vec_free (a->name);
408   return 0;
409 }
410
411 static int
412 active_open_attach (void)
413 {
414   proxy_main_t *pm = &proxy_main;
415   vnet_app_attach_args_t _a, *a = &_a;
416   u64 options[16];
417
418   clib_memset (a, 0, sizeof (*a));
419   clib_memset (options, 0, sizeof (options));
420
421   a->api_client_index = pm->active_open_client_index;
422   a->session_cb_vft = &active_open_clients;
423   a->name = format (0, "proxy-active-open");
424
425   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
426   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
427   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
428   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
429   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
430   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
431     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
432
433   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
434     | APP_OPTIONS_FLAGS_IS_PROXY;
435
436   a->options = options;
437
438   if (vnet_application_attach (a))
439     return -1;
440
441   pm->active_open_app_index = a->app_index;
442
443   vec_free (a->name);
444
445   return 0;
446 }
447
448 static int
449 proxy_server_listen ()
450 {
451   proxy_main_t *pm = &proxy_main;
452   vnet_listen_args_t _a, *a = &_a;
453   clib_memset (a, 0, sizeof (*a));
454   a->app_index = pm->server_app_index;
455   a->uri = (char *) pm->server_uri;
456   return vnet_bind_uri (a);
457 }
458
459 static int
460 proxy_server_create (vlib_main_t * vm)
461 {
462   proxy_main_t *pm = &proxy_main;
463   vlib_thread_main_t *vtm = vlib_get_thread_main ();
464   u32 num_threads;
465   int i;
466
467   num_threads = 1 /* main thread */  + vtm->n_threads;
468   vec_validate (proxy_main.server_event_queue, num_threads - 1);
469   vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
470   vec_validate (pm->rx_buf, num_threads - 1);
471
472   for (i = 0; i < num_threads; i++)
473     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
474
475   if (proxy_server_attach ())
476     {
477       clib_warning ("failed to attach server app");
478       return -1;
479     }
480   if (proxy_server_listen ())
481     {
482       clib_warning ("failed to start listening");
483       return -1;
484     }
485   if (active_open_attach ())
486     {
487       clib_warning ("failed to attach active open app");
488       return -1;
489     }
490
491   for (i = 0; i < num_threads; i++)
492     {
493       pm->active_open_event_queue[i] = session_main_get_vpp_event_queue (i);
494
495       ASSERT (pm->active_open_event_queue[i]);
496
497       pm->server_event_queue[i] = session_main_get_vpp_event_queue (i);
498     }
499
500   return 0;
501 }
502
503 static clib_error_t *
504 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
505                                 vlib_cli_command_t * cmd)
506 {
507   proxy_main_t *pm = &proxy_main;
508   char *default_server_uri = "tcp://0.0.0.0/23";
509   char *default_client_uri = "tcp://6.0.2.2/23";
510   int rv;
511   u64 tmp;
512
513   pm->fifo_size = 64 << 10;
514   pm->rcv_buffer_size = 1024;
515   pm->prealloc_fifos = 0;
516   pm->private_segment_count = 0;
517   pm->private_segment_size = 0;
518   pm->server_uri = 0;
519   pm->client_uri = 0;
520
521   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
522     {
523       if (unformat (input, "fifo-size %d", &pm->fifo_size))
524         pm->fifo_size <<= 10;
525       else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
526         ;
527       else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
528         ;
529       else if (unformat (input, "private-segment-count %d",
530                          &pm->private_segment_count))
531         ;
532       else if (unformat (input, "private-segment-size %U",
533                          unformat_memory_size, &tmp))
534         {
535           if (tmp >= 0x100000000ULL)
536             return clib_error_return
537               (0, "private segment size %lld (%llu) too large", tmp, tmp);
538           pm->private_segment_size = tmp;
539         }
540       else if (unformat (input, "server-uri %s", &pm->server_uri))
541         vec_add1 (pm->server_uri, 0);
542       else if (unformat (input, "client-uri %s", &pm->client_uri))
543         vec_add1 (pm->client_uri, 0);
544       else
545         return clib_error_return (0, "unknown input `%U'",
546                                   format_unformat_error, input);
547     }
548
549   if (!pm->server_uri)
550     {
551       clib_warning ("No server-uri provided, Using default: %s",
552                     default_server_uri);
553       pm->server_uri = format (0, "%s%c", default_server_uri, 0);
554     }
555   if (!pm->client_uri)
556     {
557       clib_warning ("No client-uri provided, Using default: %s",
558                     default_client_uri);
559       pm->client_uri = format (0, "%s%c", default_client_uri, 0);
560     }
561
562   vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
563
564   rv = proxy_server_create (vm);
565   switch (rv)
566     {
567     case 0:
568       break;
569     default:
570       return clib_error_return (0, "server_create returned %d", rv);
571     }
572
573   return 0;
574 }
575
576 /* *INDENT-OFF* */
577 VLIB_CLI_COMMAND (proxy_create_command, static) =
578 {
579   .path = "test proxy server",
580   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
581       "[client-uri <tcp://ip/port>][fifo-size <nn>][rcv-buf-size <nn>]"
582       "[prealloc-fifos <nn>][private-segment-size <mem>]"
583       "[private-segment-count <nn>]",
584   .function = proxy_server_create_command_fn,
585 };
586 /* *INDENT-ON* */
587
588 clib_error_t *
589 proxy_main_init (vlib_main_t * vm)
590 {
591   proxy_main_t *pm = &proxy_main;
592   pm->server_client_index = ~0;
593   pm->active_open_client_index = ~0;
594   pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
595   pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
596
597   return 0;
598 }
599
600 VLIB_INIT_FUNCTION (proxy_main_init);
601
602 /*
603 * fd.io coding-style-patch-verification: ON
604 *
605 * Local Variables:
606 * eval: (c-set-style "gnu")
607 * End:
608 */