session: add support for memfd segments
[vpp.git] / src / vnet / tcp / builtin_proxy.c
1 /*
2 * Copyright (c) 2015-2017 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <vnet/tcp/builtin_proxy.h>
21
22 builtin_proxy_main_t builtin_proxy_main;
23
24 static void
25 delete_proxy_session (stream_session_t * s, int is_active_open)
26 {
27   builtin_proxy_main_t *bpm = &builtin_proxy_main;
28   proxy_session_t *ps = 0;
29   vnet_disconnect_args_t _a, *a = &_a;
30   stream_session_t *active_open_session = 0;
31   stream_session_t *server_session = 0;
32   uword *p;
33   u64 handle;
34
35   handle = session_handle (s);
36
37   clib_spinlock_lock_if_init (&bpm->sessions_lock);
38   if (is_active_open)
39     {
40       active_open_session = s;
41
42       p = hash_get (bpm->proxy_session_by_active_open_handle, handle);
43       if (p == 0)
44         {
45           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
46                         is_active_open ? "active open" : "server",
47                         handle, handle);
48         }
49       else
50         {
51           ps = pool_elt_at_index (bpm->sessions, p[0]);
52           if (ps->vpp_server_handle != ~0)
53             server_session = session_get_from_handle (ps->vpp_server_handle);
54           else
55             server_session = 0;
56         }
57     }
58   else
59     {
60       server_session = s;
61
62       p = hash_get (bpm->proxy_session_by_server_handle, handle);
63       if (p == 0)
64         {
65           clib_warning ("proxy session for %s handle %lld (%llx) AWOL",
66                         is_active_open ? "active open" : "server",
67                         handle, handle);
68         }
69       else
70         {
71           ps = pool_elt_at_index (bpm->sessions, p[0]);
72           if (ps->vpp_server_handle != ~0)
73             active_open_session = session_get_from_handle
74               (ps->vpp_server_handle);
75           else
76             active_open_session = 0;
77         }
78     }
79
80   if (ps)
81     {
82       if (CLIB_DEBUG > 0)
83         memset (ps, 0xFE, sizeof (*ps));
84       pool_put (bpm->sessions, ps);
85     }
86
87   clib_spinlock_unlock_if_init (&bpm->sessions_lock);
88
89   if (active_open_session)
90     {
91       a->handle = session_handle (active_open_session);
92       a->app_index = bpm->active_open_app_index;
93       hash_unset (bpm->proxy_session_by_active_open_handle,
94                   session_handle (active_open_session));
95       vnet_disconnect_session (a);
96     }
97
98   if (server_session)
99     {
100       a->handle = session_handle (server_session);
101       a->app_index = bpm->server_app_index;
102       hash_unset (bpm->proxy_session_by_server_handle,
103                   session_handle (server_session));
104       vnet_disconnect_session (a);
105     }
106 }
107
108 static int
109 server_accept_callback (stream_session_t * s)
110 {
111   builtin_proxy_main_t *bpm = &builtin_proxy_main;
112
113   s->session_state = SESSION_STATE_READY;
114
115   clib_spinlock_lock_if_init (&bpm->sessions_lock);
116
117   return 0;
118 }
119
120 static void
121 server_disconnect_callback (stream_session_t * s)
122 {
123   delete_proxy_session (s, 0 /* is_active_open */ );
124 }
125
126 static void
127 server_reset_callback (stream_session_t * s)
128 {
129   clib_warning ("Reset session %U", format_stream_session, s, 2);
130   delete_proxy_session (s, 0 /* is_active_open */ );
131 }
132
133 static int
134 server_connected_callback (u32 app_index, u32 api_context,
135                            stream_session_t * s, u8 is_fail)
136 {
137   clib_warning ("called...");
138   return -1;
139 }
140
141 static int
142 server_add_segment_callback (u32 client_index, const ssvm_private_t * sp)
143 {
144   clib_warning ("called...");
145   return -1;
146 }
147
148 static int
149 server_redirect_connect_callback (u32 client_index, void *mp)
150 {
151   clib_warning ("called...");
152   return -1;
153 }
154
155 static int
156 server_rx_callback (stream_session_t * s)
157 {
158   u32 max_dequeue;
159   int actual_transfer __attribute__ ((unused));
160   svm_fifo_t *tx_fifo, *rx_fifo;
161   builtin_proxy_main_t *bpm = &builtin_proxy_main;
162   u32 thread_index = vlib_get_thread_index ();
163   vnet_connect_args_t _a, *a = &_a;
164   proxy_session_t *ps;
165   int proxy_index;
166   uword *p;
167   svm_fifo_t *active_open_tx_fifo;
168   session_fifo_event_t evt;
169
170   ASSERT (s->thread_index == thread_index);
171
172   clib_spinlock_lock_if_init (&bpm->sessions_lock);
173   p = hash_get (bpm->proxy_session_by_server_handle, session_handle (s));
174
175   if (PREDICT_TRUE (p != 0))
176     {
177       clib_spinlock_unlock_if_init (&bpm->sessions_lock);
178       active_open_tx_fifo = s->server_rx_fifo;
179
180       /*
181        * Send event for active open tx fifo
182        */
183       if (svm_fifo_set_event (active_open_tx_fifo))
184         {
185           evt.fifo = active_open_tx_fifo;
186           evt.event_type = FIFO_EVENT_APP_TX;
187           if (svm_queue_add
188               (bpm->active_open_event_queue[thread_index], (u8 *) & evt,
189                0 /* do wait for mutex */ ))
190             clib_warning ("failed to enqueue tx evt");
191         }
192     }
193   else
194     {
195       rx_fifo = s->server_rx_fifo;
196       tx_fifo = s->server_tx_fifo;
197
198       ASSERT (rx_fifo->master_thread_index == thread_index);
199       ASSERT (tx_fifo->master_thread_index == thread_index);
200
201       max_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
202
203       if (PREDICT_FALSE (max_dequeue == 0))
204         return 0;
205
206       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
207                                        max_dequeue,
208                                        bpm->rx_buf[thread_index]);
209
210       /* $$$ your message in this space: parse url, etc. */
211
212       memset (a, 0, sizeof (*a));
213
214       clib_spinlock_lock_if_init (&bpm->sessions_lock);
215       pool_get (bpm->sessions, ps);
216       memset (ps, 0, sizeof (*ps));
217       ps->server_rx_fifo = rx_fifo;
218       ps->server_tx_fifo = tx_fifo;
219       ps->vpp_server_handle = session_handle (s);
220
221       proxy_index = ps - bpm->sessions;
222
223       hash_set (bpm->proxy_session_by_server_handle, ps->vpp_server_handle,
224                 proxy_index);
225
226       clib_spinlock_unlock_if_init (&bpm->sessions_lock);
227
228       a->uri = (char *) bpm->client_uri;
229       a->api_context = proxy_index;
230       a->app_index = bpm->active_open_app_index;
231       a->mp = 0;
232       vnet_connect_uri (a);
233     }
234
235   return 0;
236 }
237
238 static session_cb_vft_t builtin_session_cb_vft = {
239   .session_accept_callback = server_accept_callback,
240   .session_disconnect_callback = server_disconnect_callback,
241   .session_connected_callback = server_connected_callback,
242   .add_segment_callback = server_add_segment_callback,
243   .redirect_connect_callback = server_redirect_connect_callback,
244   .builtin_server_rx_callback = server_rx_callback,
245   .session_reset_callback = server_reset_callback
246 };
247
248 static int
249 active_open_connected_callback (u32 app_index, u32 opaque,
250                                 stream_session_t * s, u8 is_fail)
251 {
252   builtin_proxy_main_t *bpm = &builtin_proxy_main;
253   proxy_session_t *ps;
254   u8 thread_index = vlib_get_thread_index ();
255   session_fifo_event_t evt;
256
257   if (is_fail)
258     {
259       clib_warning ("connection %d failed!", opaque);
260       return 0;
261     }
262
263   /*
264    * Setup proxy session handle.
265    */
266   clib_spinlock_lock_if_init (&bpm->sessions_lock);
267
268   ps = pool_elt_at_index (bpm->sessions, opaque);
269   ps->vpp_active_open_handle = session_handle (s);
270
271   s->server_tx_fifo = ps->server_rx_fifo;
272   s->server_rx_fifo = ps->server_tx_fifo;
273
274   /*
275    * Reset the active-open tx-fifo master indices so the active-open session
276    * will receive data, etc.
277    */
278   s->server_tx_fifo->master_session_index = s->session_index;
279   s->server_tx_fifo->master_thread_index = s->thread_index;
280
281   /*
282    * Account for the active-open session's use of the fifos
283    * so they won't disappear until the last session which uses
284    * them disappears
285    */
286   s->server_tx_fifo->refcnt++;
287   s->server_rx_fifo->refcnt++;
288
289   hash_set (bpm->proxy_session_by_active_open_handle,
290             ps->vpp_active_open_handle, opaque);
291
292   clib_spinlock_unlock_if_init (&bpm->sessions_lock);
293
294   /*
295    * Send event for active open tx fifo
296    */
297   if (svm_fifo_set_event (s->server_tx_fifo))
298     {
299       evt.fifo = s->server_tx_fifo;
300       evt.event_type = FIFO_EVENT_APP_TX;
301       if (svm_queue_add
302           (bpm->active_open_event_queue[thread_index], (u8 *) & evt,
303            0 /* do wait for mutex */ ))
304         clib_warning ("failed to enqueue tx evt");
305     }
306
307   return 0;
308 }
309
310 static void
311 active_open_reset_callback (stream_session_t * s)
312 {
313   delete_proxy_session (s, 1 /* is_active_open */ );
314 }
315
316 static int
317 active_open_create_callback (stream_session_t * s)
318 {
319   return 0;
320 }
321
322 static void
323 active_open_disconnect_callback (stream_session_t * s)
324 {
325   delete_proxy_session (s, 1 /* is_active_open */ );
326 }
327
328 static int
329 active_open_rx_callback (stream_session_t * s)
330 {
331   builtin_proxy_main_t *bpm = &builtin_proxy_main;
332   session_fifo_event_t evt;
333   svm_fifo_t *server_rx_fifo;
334   u32 thread_index = vlib_get_thread_index ();
335
336   server_rx_fifo = s->server_rx_fifo;
337
338   /*
339    * Send event for server tx fifo
340    */
341   if (svm_fifo_set_event (server_rx_fifo))
342     {
343       evt.fifo = server_rx_fifo;
344       evt.event_type = FIFO_EVENT_APP_TX;
345       if (svm_queue_add
346           (bpm->server_event_queue[thread_index], (u8 *) & evt,
347            0 /* do wait for mutex */ ))
348         clib_warning ("failed to enqueue server rx evt");
349     }
350
351   return 0;
352 }
353
354 /* *INDENT-OFF* */
355 static session_cb_vft_t builtin_clients = {
356   .session_reset_callback = active_open_reset_callback,
357   .session_connected_callback = active_open_connected_callback,
358   .session_accept_callback = active_open_create_callback,
359   .session_disconnect_callback = active_open_disconnect_callback,
360   .builtin_server_rx_callback = active_open_rx_callback
361 };
362 /* *INDENT-ON* */
363
364
365 static void
366 create_api_loopbacks (vlib_main_t * vm)
367 {
368   builtin_proxy_main_t *bpm = &builtin_proxy_main;
369   api_main_t *am = &api_main;
370   vl_shmem_hdr_t *shmem_hdr;
371
372   shmem_hdr = am->shmem_hdr;
373   bpm->vl_input_queue = shmem_hdr->vl_input_queue;
374   bpm->server_client_index =
375     vl_api_memclnt_create_internal ("proxy_server", bpm->vl_input_queue);
376   bpm->active_open_client_index =
377     vl_api_memclnt_create_internal ("proxy_active_open", bpm->vl_input_queue);
378 }
379
380 static int
381 server_attach ()
382 {
383   builtin_proxy_main_t *bpm = &builtin_proxy_main;
384   u64 options[APP_OPTIONS_N_OPTIONS];
385   vnet_app_attach_args_t _a, *a = &_a;
386   u32 segment_size = 512 << 20;
387
388   memset (a, 0, sizeof (*a));
389   memset (options, 0, sizeof (options));
390
391   if (bpm->private_segment_size)
392     segment_size = bpm->private_segment_size;
393   a->api_client_index = bpm->server_client_index;
394   a->session_cb_vft = &builtin_session_cb_vft;
395   a->options = options;
396   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
397   a->options[APP_OPTIONS_RX_FIFO_SIZE] = bpm->fifo_size;
398   a->options[APP_OPTIONS_TX_FIFO_SIZE] = bpm->fifo_size;
399   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = bpm->private_segment_count;
400   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
401     bpm->prealloc_fifos ? bpm->prealloc_fifos : 1;
402
403   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
404
405   if (vnet_application_attach (a))
406     {
407       clib_warning ("failed to attach server");
408       return -1;
409     }
410   bpm->server_app_index = a->app_index;
411
412   return 0;
413 }
414
415 static int
416 active_open_attach (void)
417 {
418   builtin_proxy_main_t *bpm = &builtin_proxy_main;
419   vnet_app_attach_args_t _a, *a = &_a;
420   u64 options[16];
421
422   memset (a, 0, sizeof (*a));
423   memset (options, 0, sizeof (options));
424
425   a->api_client_index = bpm->active_open_client_index;
426   a->session_cb_vft = &builtin_clients;
427
428   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
429   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
430   options[APP_OPTIONS_RX_FIFO_SIZE] = bpm->fifo_size;
431   options[APP_OPTIONS_TX_FIFO_SIZE] = bpm->fifo_size;
432   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = bpm->private_segment_count;
433   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
434     bpm->prealloc_fifos ? bpm->prealloc_fifos : 1;
435
436   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
437     | APP_OPTIONS_FLAGS_IS_PROXY;
438
439   a->options = options;
440
441   if (vnet_application_attach (a))
442     return -1;
443
444   bpm->active_open_app_index = a->app_index;
445
446   return 0;
447 }
448
449 static int
450 server_listen ()
451 {
452   builtin_proxy_main_t *bpm = &builtin_proxy_main;
453   vnet_bind_args_t _a, *a = &_a;
454   memset (a, 0, sizeof (*a));
455   a->app_index = bpm->server_app_index;
456   a->uri = (char *) bpm->server_uri;
457   return vnet_bind_uri (a);
458 }
459
460 static int
461 server_create (vlib_main_t * vm)
462 {
463   builtin_proxy_main_t *bpm = &builtin_proxy_main;
464   vlib_thread_main_t *vtm = vlib_get_thread_main ();
465   u32 num_threads;
466   int i;
467
468   if (bpm->server_client_index == (u32) ~ 0)
469     create_api_loopbacks (vm);
470
471   num_threads = 1 /* main thread */  + vtm->n_threads;
472   vec_validate (builtin_proxy_main.server_event_queue, num_threads - 1);
473   vec_validate (builtin_proxy_main.active_open_event_queue, num_threads - 1);
474   vec_validate (bpm->rx_buf, num_threads - 1);
475
476   for (i = 0; i < num_threads; i++)
477     vec_validate (bpm->rx_buf[i], bpm->rcv_buffer_size);
478
479   if (server_attach ())
480     {
481       clib_warning ("failed to attach server app");
482       return -1;
483     }
484   if (server_listen ())
485     {
486       clib_warning ("failed to start listening");
487       return -1;
488     }
489   if (active_open_attach ())
490     {
491       clib_warning ("failed to attach active open app");
492       return -1;
493     }
494
495   for (i = 0; i < num_threads; i++)
496     {
497       bpm->active_open_event_queue[i] =
498         session_manager_get_vpp_event_queue (i);
499
500       ASSERT (bpm->active_open_event_queue[i]);
501
502       bpm->server_event_queue[i] = session_manager_get_vpp_event_queue (i);
503     }
504
505   return 0;
506 }
507
508 static clib_error_t *
509 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
510                                 vlib_cli_command_t * cmd)
511 {
512   builtin_proxy_main_t *bpm = &builtin_proxy_main;
513   int rv;
514   u64 tmp;
515
516   bpm->fifo_size = 64 << 10;
517   bpm->rcv_buffer_size = 1024;
518   bpm->prealloc_fifos = 0;
519   bpm->private_segment_count = 0;
520   bpm->private_segment_size = 0;
521   bpm->server_uri = 0;
522
523   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
524     {
525       if (unformat (input, "fifo-size %d", &bpm->fifo_size))
526         bpm->fifo_size <<= 10;
527       else if (unformat (input, "rcv-buf-size %d", &bpm->rcv_buffer_size))
528         ;
529       else if (unformat (input, "prealloc-fifos %d", &bpm->prealloc_fifos))
530         ;
531       else if (unformat (input, "private-segment-count %d",
532                          &bpm->private_segment_count))
533         ;
534       else if (unformat (input, "private-segment-size %U",
535                          unformat_memory_size, &tmp))
536         {
537           if (tmp >= 0x100000000ULL)
538             return clib_error_return
539               (0, "private segment size %lld (%llu) too large", tmp, tmp);
540           bpm->private_segment_size = tmp;
541         }
542       else if (unformat (input, "server-uri %s", &bpm->server_uri))
543         ;
544       else if (unformat (input, "client-uri %s", &bpm->client_uri))
545         ;
546       else
547         return clib_error_return (0, "unknown input `%U'",
548                                   format_unformat_error, input);
549     }
550
551   if (!bpm->server_uri)
552     bpm->server_uri = format (0, "%s%c", "tcp://0.0.0.0/23", 0);
553   if (!bpm->client_uri)
554     bpm->client_uri = format (0, "%s%c", "tcp://6.0.2.2/23", 0);
555
556   vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
557
558   rv = server_create (vm);
559   switch (rv)
560     {
561     case 0:
562       break;
563     default:
564       return clib_error_return (0, "server_create returned %d", rv);
565     }
566
567   return 0;
568 }
569
570 /* *INDENT-OFF* */
571 VLIB_CLI_COMMAND (server_create_command, static) =
572 {
573   .path = "test proxy server",
574   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
575       "[client-uri <tcp://ip/port>][fifo-size <nn>][rcv-buf-size <nn>]"
576       "[prealloc-fifos <nn>][private-segment-size <mem>]"
577       "[private-segment-count <nn>]",
578   .function = proxy_server_create_command_fn,
579 };
580 /* *INDENT-ON* */
581
582 clib_error_t *
583 builtin_tcp_proxy_main_init (vlib_main_t * vm)
584 {
585   builtin_proxy_main_t *bpm = &builtin_proxy_main;
586   bpm->server_client_index = ~0;
587   bpm->active_open_client_index = ~0;
588   bpm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
589   bpm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
590
591   return 0;
592 }
593
594 VLIB_INIT_FUNCTION (builtin_tcp_proxy_main_init);
595
596 /*
597 * fd.io coding-style-patch-verification: ON
598 *
599 * Local Variables:
600 * eval: (c-set-style "gnu")
601 * End:
602 */