api: refactor vlibmemory
[vpp.git] / src / vnet / tcp / builtin_proxy.c
1 /*
2 * Copyright (c) 2015-2017 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <vnet/tcp/builtin_proxy.h>
21
22 builtin_proxy_main_t builtin_proxy_main;
23
24 static void
25 delete_proxy_session (stream_session_t * s, int is_active_open)
26 {
27   builtin_proxy_main_t *bpm = &builtin_proxy_main;
28   proxy_session_t *ps = 0;
29   vnet_disconnect_args_t _a, *a = &_a;
30   stream_session_t *active_open_session = 0;
31   stream_session_t *server_session = 0;
32   uword *p;
33   u64 handle;
34
35   handle = session_handle (s);
36
37   clib_spinlock_lock_if_init (&bpm->sessions_lock);
38   if (is_active_open)
39     {
40       active_open_session = s;
41
42       p = hash_get (bpm->proxy_session_by_active_open_handle, handle);
43       if (p == 0)
44         {
45           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
46                         is_active_open ? "active open" : "server",
47                         handle, handle);
48         }
49       else
50         {
51           ps = pool_elt_at_index (bpm->sessions, p[0]);
52           if (ps->vpp_server_handle != ~0)
53             server_session = session_get_from_handle (ps->vpp_server_handle);
54           else
55             server_session = 0;
56         }
57     }
58   else
59     {
60       server_session = s;
61
62       p = hash_get (bpm->proxy_session_by_server_handle, handle);
63       if (p == 0)
64         {
65           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
66                         is_active_open ? "active open" : "server",
67                         handle, handle);
68         }
69       else
70         {
71           ps = pool_elt_at_index (bpm->sessions, p[0]);
72           if (ps->vpp_server_handle != ~0)
73             active_open_session = session_get_from_handle
74               (ps->vpp_server_handle);
75           else
76             active_open_session = 0;
77         }
78     }
79
80   if (ps)
81     {
82       if (CLIB_DEBUG > 0)
83         memset (ps, 0xFE, sizeof (*ps));
84       pool_put (bpm->sessions, ps);
85     }
86
87   clib_spinlock_unlock_if_init (&bpm->sessions_lock);
88
89   if (active_open_session)
90     {
91       a->handle = session_handle (active_open_session);
92       a->app_index = bpm->active_open_app_index;
93       hash_unset (bpm->proxy_session_by_active_open_handle,
94                   session_handle (active_open_session));
95       vnet_disconnect_session (a);
96     }
97
98   if (server_session)
99     {
100       a->handle = session_handle (server_session);
101       a->app_index = bpm->server_app_index;
102       hash_unset (bpm->proxy_session_by_server_handle,
103                   session_handle (server_session));
104       vnet_disconnect_session (a);
105     }
106 }
107
108 static int
109 server_accept_callback (stream_session_t * s)
110 {
111   builtin_proxy_main_t *bpm = &builtin_proxy_main;
112
113   s->session_state = SESSION_STATE_READY;
114
115   clib_spinlock_lock_if_init (&bpm->sessions_lock);
116
117   return 0;
118 }
119
120 static void
121 server_disconnect_callback (stream_session_t * s)
122 {
123   delete_proxy_session (s, 0 /* is_active_open */ );
124 }
125
126 static void
127 server_reset_callback (stream_session_t * s)
128 {
129   clib_warning ("Reset session %U", format_stream_session, s, 2);
130   delete_proxy_session (s, 0 /* is_active_open */ );
131 }
132
133 static int
134 server_connected_callback (u32 app_index, u32 api_context,
135                            stream_session_t * s, u8 is_fail)
136 {
137   clib_warning ("called...");
138   return -1;
139 }
140
141 static int
142 server_add_segment_callback (u32 client_index,
143                              const u8 * seg_name, u32 seg_size)
144 {
145   clib_warning ("called...");
146   return -1;
147 }
148
149 static int
150 server_redirect_connect_callback (u32 client_index, void *mp)
151 {
152   clib_warning ("called...");
153   return -1;
154 }
155
156 static int
157 server_rx_callback (stream_session_t * s)
158 {
159   u32 max_dequeue;
160   int actual_transfer __attribute__ ((unused));
161   svm_fifo_t *tx_fifo, *rx_fifo;
162   builtin_proxy_main_t *bpm = &builtin_proxy_main;
163   u32 thread_index = vlib_get_thread_index ();
164   vnet_connect_args_t _a, *a = &_a;
165   proxy_session_t *ps;
166   int proxy_index;
167   uword *p;
168   svm_fifo_t *active_open_tx_fifo;
169   session_fifo_event_t evt;
170
171   ASSERT (s->thread_index == thread_index);
172
173   clib_spinlock_lock_if_init (&bpm->sessions_lock);
174   p = hash_get (bpm->proxy_session_by_server_handle, session_handle (s));
175
176   if (PREDICT_TRUE (p != 0))
177     {
178       clib_spinlock_unlock_if_init (&bpm->sessions_lock);
179       active_open_tx_fifo = s->server_rx_fifo;
180
181       /*
182        * Send event for active open tx fifo
183        */
184       if (svm_fifo_set_event (active_open_tx_fifo))
185         {
186           evt.fifo = active_open_tx_fifo;
187           evt.event_type = FIFO_EVENT_APP_TX;
188           if (svm_queue_add
189               (bpm->active_open_event_queue[thread_index], (u8 *) & evt,
190                0 /* do wait for mutex */ ))
191             clib_warning ("failed to enqueue tx evt");
192         }
193     }
194   else
195     {
196       rx_fifo = s->server_rx_fifo;
197       tx_fifo = s->server_tx_fifo;
198
199       ASSERT (rx_fifo->master_thread_index == thread_index);
200       ASSERT (tx_fifo->master_thread_index == thread_index);
201
202       max_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
203
204       if (PREDICT_FALSE (max_dequeue == 0))
205         return 0;
206
207       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
208                                        max_dequeue,
209                                        bpm->rx_buf[thread_index]);
210
211       /* $$$ your message in this space: parse url, etc. */
212
213       memset (a, 0, sizeof (*a));
214
215       clib_spinlock_lock_if_init (&bpm->sessions_lock);
216       pool_get (bpm->sessions, ps);
217       memset (ps, 0, sizeof (*ps));
218       ps->server_rx_fifo = rx_fifo;
219       ps->server_tx_fifo = tx_fifo;
220       ps->vpp_server_handle = session_handle (s);
221
222       proxy_index = ps - bpm->sessions;
223
224       hash_set (bpm->proxy_session_by_server_handle, ps->vpp_server_handle,
225                 proxy_index);
226
227       clib_spinlock_unlock_if_init (&bpm->sessions_lock);
228
229       a->uri = "tcp://6.0.2.2/23";
230       a->api_context = proxy_index;
231       a->app_index = bpm->active_open_app_index;
232       a->mp = 0;
233       vnet_connect_uri (a);
234     }
235
236   return 0;
237 }
238
239 static session_cb_vft_t builtin_session_cb_vft = {
240   .session_accept_callback = server_accept_callback,
241   .session_disconnect_callback = server_disconnect_callback,
242   .session_connected_callback = server_connected_callback,
243   .add_segment_callback = server_add_segment_callback,
244   .redirect_connect_callback = server_redirect_connect_callback,
245   .builtin_server_rx_callback = server_rx_callback,
246   .session_reset_callback = server_reset_callback
247 };
248
249 static int
250 active_open_connected_callback (u32 app_index, u32 opaque,
251                                 stream_session_t * s, u8 is_fail)
252 {
253   builtin_proxy_main_t *bpm = &builtin_proxy_main;
254   proxy_session_t *ps;
255   u8 thread_index = vlib_get_thread_index ();
256   session_fifo_event_t evt;
257
258   if (is_fail)
259     {
260       clib_warning ("connection %d failed!", opaque);
261       return 0;
262     }
263
264   /*
265    * Setup proxy session handle.
266    */
267   clib_spinlock_lock_if_init (&bpm->sessions_lock);
268
269   ps = pool_elt_at_index (bpm->sessions, opaque);
270   ps->vpp_active_open_handle = session_handle (s);
271
272   s->server_tx_fifo = ps->server_rx_fifo;
273   s->server_rx_fifo = ps->server_tx_fifo;
274
275   /*
276    * Reset the active-open tx-fifo master indices so the active-open session
277    * will receive data, etc.
278    */
279   s->server_tx_fifo->master_session_index = s->session_index;
280   s->server_tx_fifo->master_thread_index = s->thread_index;
281
282   /*
283    * Account for the active-open session's use of the fifos
284    * so they won't disappear until the last session which uses
285    * them disappears
286    */
287   s->server_tx_fifo->refcnt++;
288   s->server_rx_fifo->refcnt++;
289
290   hash_set (bpm->proxy_session_by_active_open_handle,
291             ps->vpp_active_open_handle, opaque);
292
293   clib_spinlock_unlock_if_init (&bpm->sessions_lock);
294
295   /*
296    * Send event for active open tx fifo
297    */
298   if (svm_fifo_set_event (s->server_tx_fifo))
299     {
300       evt.fifo = s->server_tx_fifo;
301       evt.event_type = FIFO_EVENT_APP_TX;
302       if (svm_queue_add
303           (bpm->active_open_event_queue[thread_index], (u8 *) & evt,
304            0 /* do wait for mutex */ ))
305         clib_warning ("failed to enqueue tx evt");
306     }
307
308   return 0;
309 }
310
311 static void
312 active_open_reset_callback (stream_session_t * s)
313 {
314   delete_proxy_session (s, 1 /* is_active_open */ );
315 }
316
317 static int
318 active_open_create_callback (stream_session_t * s)
319 {
320   return 0;
321 }
322
323 static void
324 active_open_disconnect_callback (stream_session_t * s)
325 {
326   delete_proxy_session (s, 1 /* is_active_open */ );
327 }
328
329 static int
330 active_open_rx_callback (stream_session_t * s)
331 {
332   builtin_proxy_main_t *bpm = &builtin_proxy_main;
333   session_fifo_event_t evt;
334   svm_fifo_t *server_rx_fifo;
335   u32 thread_index = vlib_get_thread_index ();
336
337   server_rx_fifo = s->server_rx_fifo;
338
339   /*
340    * Send event for server tx fifo
341    */
342   if (svm_fifo_set_event (server_rx_fifo))
343     {
344       evt.fifo = server_rx_fifo;
345       evt.event_type = FIFO_EVENT_APP_TX;
346       if (svm_queue_add
347           (bpm->server_event_queue[thread_index], (u8 *) & evt,
348            0 /* do wait for mutex */ ))
349         clib_warning ("failed to enqueue server rx evt");
350     }
351
352   return 0;
353 }
354
355 /* *INDENT-OFF* */
356 static session_cb_vft_t builtin_clients = {
357   .session_reset_callback = active_open_reset_callback,
358   .session_connected_callback = active_open_connected_callback,
359   .session_accept_callback = active_open_create_callback,
360   .session_disconnect_callback = active_open_disconnect_callback,
361   .builtin_server_rx_callback = active_open_rx_callback
362 };
363 /* *INDENT-ON* */
364
365
366 static void
367 create_api_loopbacks (vlib_main_t * vm)
368 {
369   builtin_proxy_main_t *bpm = &builtin_proxy_main;
370   api_main_t *am = &api_main;
371   vl_shmem_hdr_t *shmem_hdr;
372
373   shmem_hdr = am->shmem_hdr;
374   bpm->vl_input_queue = shmem_hdr->vl_input_queue;
375   bpm->server_client_index =
376     vl_api_memclnt_create_internal ("proxy_server", bpm->vl_input_queue);
377   bpm->active_open_client_index =
378     vl_api_memclnt_create_internal ("proxy_active_open", bpm->vl_input_queue);
379 }
380
381 static int
382 server_attach ()
383 {
384   builtin_proxy_main_t *bpm = &builtin_proxy_main;
385   u8 segment_name[128];
386   u64 options[APP_OPTIONS_N_OPTIONS];
387   vnet_app_attach_args_t _a, *a = &_a;
388   u32 segment_size = 512 << 20;
389
390   memset (a, 0, sizeof (*a));
391   memset (options, 0, sizeof (options));
392
393   if (bpm->private_segment_size)
394     segment_size = bpm->private_segment_size;
395   a->api_client_index = bpm->server_client_index;
396   a->session_cb_vft = &builtin_session_cb_vft;
397   a->options = options;
398   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
399   a->options[APP_OPTIONS_RX_FIFO_SIZE] = bpm->fifo_size;
400   a->options[APP_OPTIONS_TX_FIFO_SIZE] = bpm->fifo_size;
401   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = bpm->private_segment_count;
402   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
403     bpm->prealloc_fifos ? bpm->prealloc_fifos : 1;
404
405   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
406
407   a->segment_name = segment_name;
408   a->segment_name_length = ARRAY_LEN (segment_name);
409
410   if (vnet_application_attach (a))
411     {
412       clib_warning ("failed to attach server");
413       return -1;
414     }
415   bpm->server_app_index = a->app_index;
416
417   return 0;
418 }
419
420 static int
421 active_open_attach (void)
422 {
423   builtin_proxy_main_t *bpm = &builtin_proxy_main;
424   vnet_app_attach_args_t _a, *a = &_a;
425   u8 segment_name[128];
426   u32 segment_name_length;
427   u64 options[16];
428
429   segment_name_length = ARRAY_LEN (segment_name);
430
431   memset (a, 0, sizeof (*a));
432   memset (options, 0, sizeof (options));
433
434   a->api_client_index = bpm->active_open_client_index;
435   a->segment_name = segment_name;
436   a->segment_name_length = segment_name_length;
437   a->session_cb_vft = &builtin_clients;
438
439   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
440   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
441   options[APP_OPTIONS_RX_FIFO_SIZE] = bpm->fifo_size;
442   options[APP_OPTIONS_TX_FIFO_SIZE] = bpm->fifo_size;
443   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = bpm->private_segment_count;
444   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
445     bpm->prealloc_fifos ? bpm->prealloc_fifos : 1;
446
447   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
448     | APP_OPTIONS_FLAGS_IS_PROXY;
449
450   a->options = options;
451
452   if (vnet_application_attach (a))
453     return -1;
454
455   bpm->active_open_app_index = a->app_index;
456
457   return 0;
458 }
459
460 static int
461 server_listen ()
462 {
463   builtin_proxy_main_t *bpm = &builtin_proxy_main;
464   vnet_bind_args_t _a, *a = &_a;
465   memset (a, 0, sizeof (*a));
466   a->app_index = bpm->server_app_index;
467   a->uri = "tcp://0.0.0.0/23";
468   return vnet_bind_uri (a);
469 }
470
471 static int
472 server_create (vlib_main_t * vm)
473 {
474   builtin_proxy_main_t *bpm = &builtin_proxy_main;
475   vlib_thread_main_t *vtm = vlib_get_thread_main ();
476   u32 num_threads;
477   int i;
478
479   if (bpm->server_client_index == (u32) ~ 0)
480     create_api_loopbacks (vm);
481
482   num_threads = 1 /* main thread */  + vtm->n_threads;
483   vec_validate (builtin_proxy_main.server_event_queue, num_threads - 1);
484   vec_validate (builtin_proxy_main.active_open_event_queue, num_threads - 1);
485   vec_validate (bpm->rx_buf, num_threads - 1);
486
487   for (i = 0; i < num_threads; i++)
488     vec_validate (bpm->rx_buf[i], bpm->rcv_buffer_size);
489
490   if (server_attach ())
491     {
492       clib_warning ("failed to attach server app");
493       return -1;
494     }
495   if (server_listen ())
496     {
497       clib_warning ("failed to start listening");
498       return -1;
499     }
500   if (active_open_attach ())
501     {
502       clib_warning ("failed to attach active open app");
503       return -1;
504     }
505
506   for (i = 0; i < num_threads; i++)
507     {
508       bpm->active_open_event_queue[i] =
509         session_manager_get_vpp_event_queue (i);
510
511       ASSERT (bpm->active_open_event_queue[i]);
512
513       bpm->server_event_queue[i] = session_manager_get_vpp_event_queue (i);
514     }
515
516   return 0;
517 }
518
519 static clib_error_t *
520 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
521                                 vlib_cli_command_t * cmd)
522 {
523   builtin_proxy_main_t *bpm = &builtin_proxy_main;
524   int rv;
525   u64 tmp;
526
527   bpm->fifo_size = 64 << 10;
528   bpm->rcv_buffer_size = 1024;
529   bpm->prealloc_fifos = 0;
530   bpm->private_segment_count = 0;
531   bpm->private_segment_size = 0;
532
533   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
534     {
535       if (unformat (input, "fifo-size %d", &bpm->fifo_size))
536         bpm->fifo_size <<= 10;
537       else if (unformat (input, "rcv-buf-size %d", &bpm->rcv_buffer_size))
538         ;
539       else if (unformat (input, "prealloc-fifos %d", &bpm->prealloc_fifos))
540         ;
541       else if (unformat (input, "private-segment-count %d",
542                          &bpm->private_segment_count))
543         ;
544       else if (unformat (input, "private-segment-size %U",
545                          unformat_memory_size, &tmp))
546         {
547           if (tmp >= 0x100000000ULL)
548             return clib_error_return
549               (0, "private segment size %lld (%llu) too large", tmp, tmp);
550           bpm->private_segment_size = tmp;
551         }
552       else
553         return clib_error_return (0, "unknown input `%U'",
554                                   format_unformat_error, input);
555     }
556
557   vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
558
559   rv = server_create (vm);
560   switch (rv)
561     {
562     case 0:
563       break;
564     default:
565       return clib_error_return (0, "server_create returned %d", rv);
566     }
567
568   return 0;
569 }
570
571 /* *INDENT-OFF* */
572 VLIB_CLI_COMMAND (server_create_command, static) =
573 {
574   .path = "test proxy server",
575   .short_help = "test proxy server",
576   .function = proxy_server_create_command_fn,
577 };
578 /* *INDENT-ON* */
579
580 clib_error_t *
581 builtin_tcp_proxy_main_init (vlib_main_t * vm)
582 {
583   builtin_proxy_main_t *bpm = &builtin_proxy_main;
584   bpm->server_client_index = ~0;
585   bpm->active_open_client_index = ~0;
586   bpm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
587   bpm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
588
589   return 0;
590 }
591
592 VLIB_INIT_FUNCTION (builtin_tcp_proxy_main_init);
593
594 /*
595 * fd.io coding-style-patch-verification: ON
596 *
597 * Local Variables:
598 * eval: (c-set-style "gnu")
599 * End:
600 */