tcp: cleanup connection/session fixes
[vpp.git] / src / vnet / session-apps / proxy.c
1 /*
2 * Copyright (c) 2015-2017 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <vnet/session-apps/proxy.h>
21
22 proxy_main_t proxy_main;
23
24 typedef struct
25 {
26   char uri[128];
27   u32 app_index;
28   u32 api_context;
29 } proxy_connect_args_t;
30
31 static void
32 proxy_cb_fn (void *data, u32 data_len)
33 {
34   proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
35   vnet_connect_args_t a;
36
37   a.api_context = pa->api_context;
38   a.app_index = pa->app_index;
39   a.uri = pa->uri;
40   vnet_connect_uri (&a);
41 }
42
43 static void
44 proxy_call_main_thread (vnet_connect_args_t * a)
45 {
46   if (vlib_get_thread_index () == 0)
47     {
48       vnet_connect_uri (a);
49     }
50   else
51     {
52       proxy_connect_args_t args;
53       args.api_context = a->api_context;
54       args.app_index = a->app_index;
55       clib_memcpy (args.uri, a->uri, vec_len (a->uri));
56       vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
57     }
58 }
59
60 static void
61 delete_proxy_session (stream_session_t * s, int is_active_open)
62 {
63   proxy_main_t *pm = &proxy_main;
64   proxy_session_t *ps = 0;
65   vnet_disconnect_args_t _a, *a = &_a;
66   stream_session_t *active_open_session = 0;
67   stream_session_t *server_session = 0;
68   uword *p;
69   u64 handle;
70
71   handle = session_handle (s);
72
73   clib_spinlock_lock_if_init (&pm->sessions_lock);
74   if (is_active_open)
75     {
76       active_open_session = s;
77
78       p = hash_get (pm->proxy_session_by_active_open_handle, handle);
79       if (p == 0)
80         {
81           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
82                         is_active_open ? "active open" : "server",
83                         handle, handle);
84         }
85       else if (!pool_is_free_index (pm->sessions, p[0]))
86         {
87           ps = pool_elt_at_index (pm->sessions, p[0]);
88           if (ps->vpp_server_handle != ~0)
89             server_session = session_get_from_handle (ps->vpp_server_handle);
90           else
91             server_session = 0;
92         }
93     }
94   else
95     {
96       server_session = s;
97
98       p = hash_get (pm->proxy_session_by_server_handle, handle);
99       if (p == 0)
100         {
101           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
102                         is_active_open ? "active open" : "server",
103                         handle, handle);
104         }
105       else if (!pool_is_free_index (pm->sessions, p[0]))
106         {
107           ps = pool_elt_at_index (pm->sessions, p[0]);
108           if (ps->vpp_active_open_handle != ~0)
109             active_open_session = session_get_from_handle
110               (ps->vpp_active_open_handle);
111           else
112             active_open_session = 0;
113         }
114     }
115
116   if (ps)
117     {
118       if (CLIB_DEBUG > 0)
119         memset (ps, 0xFE, sizeof (*ps));
120       pool_put (pm->sessions, ps);
121     }
122
123   clib_spinlock_unlock_if_init (&pm->sessions_lock);
124
125   if (active_open_session)
126     {
127       a->handle = session_handle (active_open_session);
128       a->app_index = pm->active_open_app_index;
129       hash_unset (pm->proxy_session_by_active_open_handle,
130                   session_handle (active_open_session));
131       vnet_disconnect_session (a);
132     }
133
134   if (server_session)
135     {
136       a->handle = session_handle (server_session);
137       a->app_index = pm->server_app_index;
138       hash_unset (pm->proxy_session_by_server_handle,
139                   session_handle (server_session));
140       vnet_disconnect_session (a);
141     }
142 }
143
144 static int
145 proxy_accept_callback (stream_session_t * s)
146 {
147   proxy_main_t *pm = &proxy_main;
148
149   s->session_state = SESSION_STATE_READY;
150
151   clib_spinlock_lock_if_init (&pm->sessions_lock);
152
153   return 0;
154 }
155
156 static void
157 proxy_disconnect_callback (stream_session_t * s)
158 {
159   delete_proxy_session (s, 0 /* is_active_open */ );
160 }
161
162 static void
163 proxy_reset_callback (stream_session_t * s)
164 {
165   clib_warning ("Reset session %U", format_stream_session, s, 2);
166   delete_proxy_session (s, 0 /* is_active_open */ );
167 }
168
169 static int
170 proxy_connected_callback (u32 app_index, u32 api_context,
171                           stream_session_t * s, u8 is_fail)
172 {
173   clib_warning ("called...");
174   return -1;
175 }
176
177 static int
178 proxy_add_segment_callback (u32 client_index, const ssvm_private_t * sp)
179 {
180   clib_warning ("called...");
181   return -1;
182 }
183
184 static int
185 proxy_rx_callback (stream_session_t * s)
186 {
187   u32 max_dequeue;
188   int actual_transfer __attribute__ ((unused));
189   svm_fifo_t *tx_fifo, *rx_fifo;
190   proxy_main_t *pm = &proxy_main;
191   u32 thread_index = vlib_get_thread_index ();
192   vnet_connect_args_t _a, *a = &_a;
193   proxy_session_t *ps;
194   int proxy_index;
195   uword *p;
196   svm_fifo_t *active_open_tx_fifo;
197   session_fifo_event_t evt;
198
199   ASSERT (s->thread_index == thread_index);
200
201   clib_spinlock_lock_if_init (&pm->sessions_lock);
202   p = hash_get (pm->proxy_session_by_server_handle, session_handle (s));
203
204   if (PREDICT_TRUE (p != 0))
205     {
206       clib_spinlock_unlock_if_init (&pm->sessions_lock);
207       active_open_tx_fifo = s->server_rx_fifo;
208
209       /*
210        * Send event for active open tx fifo
211        */
212       if (svm_fifo_set_event (active_open_tx_fifo))
213         {
214           u32 ao_thread_index = active_open_tx_fifo->master_thread_index;
215           evt.fifo = active_open_tx_fifo;
216           evt.event_type = FIFO_EVENT_APP_TX;
217           if (svm_queue_add (pm->active_open_event_queue[ao_thread_index],
218                              (u8 *) & evt, 0 /* do wait for mutex */ ))
219             clib_warning ("failed to enqueue tx evt");
220         }
221     }
222   else
223     {
224       rx_fifo = s->server_rx_fifo;
225       tx_fifo = s->server_tx_fifo;
226
227       ASSERT (rx_fifo->master_thread_index == thread_index);
228       ASSERT (tx_fifo->master_thread_index == thread_index);
229
230       max_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
231
232       if (PREDICT_FALSE (max_dequeue == 0))
233         return 0;
234
235       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
236                                        max_dequeue, pm->rx_buf[thread_index]);
237
238       /* $$$ your message in this space: parse url, etc. */
239
240       memset (a, 0, sizeof (*a));
241
242       clib_spinlock_lock_if_init (&pm->sessions_lock);
243       pool_get (pm->sessions, ps);
244       memset (ps, 0, sizeof (*ps));
245       ps->server_rx_fifo = rx_fifo;
246       ps->server_tx_fifo = tx_fifo;
247       ps->vpp_server_handle = session_handle (s);
248
249       proxy_index = ps - pm->sessions;
250
251       hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
252                 proxy_index);
253
254       clib_spinlock_unlock_if_init (&pm->sessions_lock);
255
256       a->uri = (char *) pm->client_uri;
257       a->api_context = proxy_index;
258       a->app_index = pm->active_open_app_index;
259       proxy_call_main_thread (a);
260     }
261
262   return 0;
263 }
264
265 static session_cb_vft_t proxy_session_cb_vft = {
266   .session_accept_callback = proxy_accept_callback,
267   .session_disconnect_callback = proxy_disconnect_callback,
268   .session_connected_callback = proxy_connected_callback,
269   .add_segment_callback = proxy_add_segment_callback,
270   .builtin_app_rx_callback = proxy_rx_callback,
271   .session_reset_callback = proxy_reset_callback
272 };
273
274 static int
275 active_open_connected_callback (u32 app_index, u32 opaque,
276                                 stream_session_t * s, u8 is_fail)
277 {
278   proxy_main_t *pm = &proxy_main;
279   proxy_session_t *ps;
280   u8 thread_index = vlib_get_thread_index ();
281   session_fifo_event_t evt;
282
283   if (is_fail)
284     {
285       clib_warning ("connection %d failed!", opaque);
286       return 0;
287     }
288
289   /*
290    * Setup proxy session handle.
291    */
292   clib_spinlock_lock_if_init (&pm->sessions_lock);
293
294   ps = pool_elt_at_index (pm->sessions, opaque);
295   ps->vpp_active_open_handle = session_handle (s);
296
297   s->server_tx_fifo = ps->server_rx_fifo;
298   s->server_rx_fifo = ps->server_tx_fifo;
299
300   /*
301    * Reset the active-open tx-fifo master indices so the active-open session
302    * will receive data, etc.
303    */
304   s->server_tx_fifo->master_session_index = s->session_index;
305   s->server_tx_fifo->master_thread_index = s->thread_index;
306
307   /*
308    * Account for the active-open session's use of the fifos
309    * so they won't disappear until the last session which uses
310    * them disappears
311    */
312   s->server_tx_fifo->refcnt++;
313   s->server_rx_fifo->refcnt++;
314
315   hash_set (pm->proxy_session_by_active_open_handle,
316             ps->vpp_active_open_handle, opaque);
317
318   clib_spinlock_unlock_if_init (&pm->sessions_lock);
319
320   /*
321    * Send event for active open tx fifo
322    */
323   if (svm_fifo_set_event (s->server_tx_fifo))
324     {
325       evt.fifo = s->server_tx_fifo;
326       evt.event_type = FIFO_EVENT_APP_TX;
327       if (svm_queue_add
328           (pm->active_open_event_queue[thread_index], (u8 *) & evt,
329            0 /* do wait for mutex */ ))
330         clib_warning ("failed to enqueue tx evt");
331     }
332
333   return 0;
334 }
335
336 static void
337 active_open_reset_callback (stream_session_t * s)
338 {
339   delete_proxy_session (s, 1 /* is_active_open */ );
340 }
341
342 static int
343 active_open_create_callback (stream_session_t * s)
344 {
345   return 0;
346 }
347
348 static void
349 active_open_disconnect_callback (stream_session_t * s)
350 {
351   delete_proxy_session (s, 1 /* is_active_open */ );
352 }
353
354 static int
355 active_open_rx_callback (stream_session_t * s)
356 {
357   proxy_main_t *pm = &proxy_main;
358   session_fifo_event_t evt;
359   svm_fifo_t *proxy_tx_fifo;
360
361   proxy_tx_fifo = s->server_rx_fifo;
362
363   /*
364    * Send event for server tx fifo
365    */
366   if (svm_fifo_set_event (proxy_tx_fifo))
367     {
368       u32 p_thread_index = proxy_tx_fifo->master_thread_index;
369       evt.fifo = proxy_tx_fifo;
370       evt.event_type = FIFO_EVENT_APP_TX;
371       if (svm_queue_add (pm->server_event_queue[p_thread_index], (u8 *) & evt,
372                          0 /* do wait for mutex */ ))
373         clib_warning ("failed to enqueue server rx evt");
374     }
375
376   return 0;
377 }
378
379 /* *INDENT-OFF* */
380 static session_cb_vft_t active_open_clients = {
381   .session_reset_callback = active_open_reset_callback,
382   .session_connected_callback = active_open_connected_callback,
383   .session_accept_callback = active_open_create_callback,
384   .session_disconnect_callback = active_open_disconnect_callback,
385   .builtin_app_rx_callback = active_open_rx_callback
386 };
387 /* *INDENT-ON* */
388
389
390 static void
391 create_api_loopbacks (vlib_main_t * vm)
392 {
393   proxy_main_t *pm = &proxy_main;
394   api_main_t *am = &api_main;
395   vl_shmem_hdr_t *shmem_hdr;
396
397   shmem_hdr = am->shmem_hdr;
398   pm->vl_input_queue = shmem_hdr->vl_input_queue;
399   pm->server_client_index =
400     vl_api_memclnt_create_internal ("proxy_server", pm->vl_input_queue);
401   pm->active_open_client_index =
402     vl_api_memclnt_create_internal ("proxy_active_open", pm->vl_input_queue);
403 }
404
405 static int
406 proxy_server_attach ()
407 {
408   proxy_main_t *pm = &proxy_main;
409   u64 options[APP_OPTIONS_N_OPTIONS];
410   vnet_app_attach_args_t _a, *a = &_a;
411   u32 segment_size = 512 << 20;
412
413   memset (a, 0, sizeof (*a));
414   memset (options, 0, sizeof (options));
415
416   if (pm->private_segment_size)
417     segment_size = pm->private_segment_size;
418   a->api_client_index = pm->server_client_index;
419   a->session_cb_vft = &proxy_session_cb_vft;
420   a->options = options;
421   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
422   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
423   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
424   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
425   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
426     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
427
428   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
429
430   if (vnet_application_attach (a))
431     {
432       clib_warning ("failed to attach server");
433       return -1;
434     }
435   pm->server_app_index = a->app_index;
436
437   return 0;
438 }
439
440 static int
441 active_open_attach (void)
442 {
443   proxy_main_t *pm = &proxy_main;
444   vnet_app_attach_args_t _a, *a = &_a;
445   u64 options[16];
446
447   memset (a, 0, sizeof (*a));
448   memset (options, 0, sizeof (options));
449
450   a->api_client_index = pm->active_open_client_index;
451   a->session_cb_vft = &active_open_clients;
452
453   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
454   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
455   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
456   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
457   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
458   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
459     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
460
461   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
462     | APP_OPTIONS_FLAGS_IS_PROXY;
463
464   a->options = options;
465
466   if (vnet_application_attach (a))
467     return -1;
468
469   pm->active_open_app_index = a->app_index;
470
471   return 0;
472 }
473
474 static int
475 proxy_server_listen ()
476 {
477   proxy_main_t *pm = &proxy_main;
478   vnet_bind_args_t _a, *a = &_a;
479   memset (a, 0, sizeof (*a));
480   a->app_index = pm->server_app_index;
481   a->uri = (char *) pm->server_uri;
482   return vnet_bind_uri (a);
483 }
484
485 static int
486 proxy_server_create (vlib_main_t * vm)
487 {
488   proxy_main_t *pm = &proxy_main;
489   vlib_thread_main_t *vtm = vlib_get_thread_main ();
490   u32 num_threads;
491   int i;
492
493   if (pm->server_client_index == (u32) ~ 0)
494     create_api_loopbacks (vm);
495
496   num_threads = 1 /* main thread */  + vtm->n_threads;
497   vec_validate (proxy_main.server_event_queue, num_threads - 1);
498   vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
499   vec_validate (pm->rx_buf, num_threads - 1);
500
501   for (i = 0; i < num_threads; i++)
502     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
503
504   if (proxy_server_attach ())
505     {
506       clib_warning ("failed to attach server app");
507       return -1;
508     }
509   if (proxy_server_listen ())
510     {
511       clib_warning ("failed to start listening");
512       return -1;
513     }
514   if (active_open_attach ())
515     {
516       clib_warning ("failed to attach active open app");
517       return -1;
518     }
519
520   for (i = 0; i < num_threads; i++)
521     {
522       pm->active_open_event_queue[i] =
523         session_manager_get_vpp_event_queue (i);
524
525       ASSERT (pm->active_open_event_queue[i]);
526
527       pm->server_event_queue[i] = session_manager_get_vpp_event_queue (i);
528     }
529
530   return 0;
531 }
532
533 static clib_error_t *
534 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
535                                 vlib_cli_command_t * cmd)
536 {
537   proxy_main_t *pm = &proxy_main;
538   char *default_server_uri = "tcp://0.0.0.0/23";
539   char *default_client_uri = "tcp://6.0.2.2/23";
540   int rv;
541   u64 tmp;
542
543   pm->fifo_size = 64 << 10;
544   pm->rcv_buffer_size = 1024;
545   pm->prealloc_fifos = 0;
546   pm->private_segment_count = 0;
547   pm->private_segment_size = 0;
548   pm->server_uri = 0;
549
550   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
551     {
552       if (unformat (input, "fifo-size %d", &pm->fifo_size))
553         pm->fifo_size <<= 10;
554       else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
555         ;
556       else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
557         ;
558       else if (unformat (input, "private-segment-count %d",
559                          &pm->private_segment_count))
560         ;
561       else if (unformat (input, "private-segment-size %U",
562                          unformat_memory_size, &tmp))
563         {
564           if (tmp >= 0x100000000ULL)
565             return clib_error_return
566               (0, "private segment size %lld (%llu) too large", tmp, tmp);
567           pm->private_segment_size = tmp;
568         }
569       else if (unformat (input, "server-uri %s", &pm->server_uri))
570         ;
571       else if (unformat (input, "client-uri %s", &pm->client_uri))
572         pm->client_uri = format (0, "%s%c", pm->client_uri, 0);
573       else
574         return clib_error_return (0, "unknown input `%U'",
575                                   format_unformat_error, input);
576     }
577
578   if (!pm->server_uri)
579     {
580       clib_warning ("No server-uri provided, Using default: %s",
581                     default_server_uri);
582       pm->server_uri = format (0, "%s%c", default_server_uri, 0);
583     }
584   if (!pm->client_uri)
585     {
586       clib_warning ("No client-uri provided, Using default: %s",
587                     default_client_uri);
588       pm->client_uri = format (0, "%s%c", default_client_uri, 0);
589     }
590
591   vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
592
593   rv = proxy_server_create (vm);
594   switch (rv)
595     {
596     case 0:
597       break;
598     default:
599       return clib_error_return (0, "server_create returned %d", rv);
600     }
601
602   return 0;
603 }
604
605 /* *INDENT-OFF* */
606 VLIB_CLI_COMMAND (proxy_create_command, static) =
607 {
608   .path = "test proxy server",
609   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
610       "[client-uri <tcp://ip/port>][fifo-size <nn>][rcv-buf-size <nn>]"
611       "[prealloc-fifos <nn>][private-segment-size <mem>]"
612       "[private-segment-count <nn>]",
613   .function = proxy_server_create_command_fn,
614 };
615 /* *INDENT-ON* */
616
617 clib_error_t *
618 proxy_main_init (vlib_main_t * vm)
619 {
620   proxy_main_t *pm = &proxy_main;
621   pm->server_client_index = ~0;
622   pm->active_open_client_index = ~0;
623   pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
624   pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
625
626   return 0;
627 }
628
629 VLIB_INIT_FUNCTION (proxy_main_init);
630
631 /*
632 * fd.io coding-style-patch-verification: ON
633 *
634 * Local Variables:
635 * eval: (c-set-style "gnu")
636 * End:
637 */