hsa: proxy listener support addition of fifo segments
[vpp.git] / src / plugins / hs_apps / proxy.c
1 /*
2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <hs_apps/proxy.h>
21 #include <vnet/tcp/tcp.h>
22
23 proxy_main_t proxy_main;
24
25 #define TCP_MSS 1460
26
27 typedef struct
28 {
29   session_endpoint_cfg_t sep;
30   u32 app_index;
31   u32 api_context;
32 } proxy_connect_args_t;
33
34 static void
35 proxy_cb_fn (void *data, u32 data_len)
36 {
37   proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
38   vnet_connect_args_t a;
39
40   clib_memset (&a, 0, sizeof (a));
41   a.api_context = pa->api_context;
42   a.app_index = pa->app_index;
43   clib_memcpy (&a.sep_ext, &pa->sep, sizeof (pa->sep));
44   vnet_connect (&a);
45   if (a.sep_ext.ext_cfg)
46     clib_mem_free (a.sep_ext.ext_cfg);
47 }
48
49 static void
50 proxy_call_main_thread (vnet_connect_args_t * a)
51 {
52   if (vlib_get_thread_index () == 0)
53     {
54       vnet_connect (a);
55       if (a->sep_ext.ext_cfg)
56         clib_mem_free (a->sep_ext.ext_cfg);
57     }
58   else
59     {
60       proxy_connect_args_t args;
61       args.api_context = a->api_context;
62       args.app_index = a->app_index;
63       clib_memcpy (&args.sep, &a->sep_ext, sizeof (a->sep_ext));
64       vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
65     }
66 }
67
68 static proxy_session_t *
69 proxy_get_active_open (proxy_main_t * pm, session_handle_t handle)
70 {
71   proxy_session_t *ps = 0;
72   uword *p;
73
74   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
75   if (p)
76     ps = pool_elt_at_index (pm->sessions, p[0]);
77   return ps;
78 }
79
80 static proxy_session_t *
81 proxy_get_passive_open (proxy_main_t * pm, session_handle_t handle)
82 {
83   proxy_session_t *ps = 0;
84   uword *p;
85
86   p = hash_get (pm->proxy_session_by_server_handle, handle);
87   if (p)
88     ps = pool_elt_at_index (pm->sessions, p[0]);
89   return ps;
90 }
91
92 static void
93 proxy_try_close_session (session_t * s, int is_active_open)
94 {
95   proxy_main_t *pm = &proxy_main;
96   proxy_session_t *ps = 0;
97   vnet_disconnect_args_t _a, *a = &_a;
98   session_handle_t handle;
99
100   handle = session_handle (s);
101
102   clib_spinlock_lock_if_init (&pm->sessions_lock);
103
104   if (is_active_open)
105     {
106       ps = proxy_get_active_open (pm, handle);
107       ASSERT (ps != 0);
108
109       a->handle = ps->vpp_active_open_handle;
110       a->app_index = pm->active_open_app_index;
111       vnet_disconnect_session (a);
112       ps->ao_disconnected = 1;
113
114       if (!ps->po_disconnected)
115         {
116           ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE);
117           a->handle = ps->vpp_server_handle;
118           a->app_index = pm->server_app_index;
119           vnet_disconnect_session (a);
120           ps->po_disconnected = 1;
121         }
122     }
123   else
124     {
125       ps = proxy_get_passive_open (pm, handle);
126       ASSERT (ps != 0);
127
128       a->handle = ps->vpp_server_handle;
129       a->app_index = pm->server_app_index;
130       vnet_disconnect_session (a);
131       ps->po_disconnected = 1;
132
133       if (!ps->ao_disconnected && !ps->active_open_establishing)
134         {
135           /* Proxy session closed before active open */
136           if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)
137             {
138               a->handle = ps->vpp_active_open_handle;
139               a->app_index = pm->active_open_app_index;
140               vnet_disconnect_session (a);
141             }
142           ps->ao_disconnected = 1;
143         }
144     }
145   clib_spinlock_unlock_if_init (&pm->sessions_lock);
146 }
147
148 static void
149 proxy_session_free (proxy_session_t * ps)
150 {
151   proxy_main_t *pm = &proxy_main;
152   if (CLIB_DEBUG > 0)
153     clib_memset (ps, 0xFE, sizeof (*ps));
154   pool_put (pm->sessions, ps);
155 }
156
157 static void
158 proxy_try_delete_session (session_t * s, u8 is_active_open)
159 {
160   proxy_main_t *pm = &proxy_main;
161   proxy_session_t *ps = 0;
162   session_handle_t handle;
163
164   handle = session_handle (s);
165
166   clib_spinlock_lock_if_init (&pm->sessions_lock);
167
168   if (is_active_open)
169     {
170       ps = proxy_get_active_open (pm, handle);
171       ASSERT (ps != 0);
172
173       ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
174       hash_unset (pm->proxy_session_by_active_open_handle, handle);
175
176       if (ps->vpp_server_handle == SESSION_INVALID_HANDLE)
177         proxy_session_free (ps);
178     }
179   else
180     {
181       ps = proxy_get_passive_open (pm, handle);
182       ASSERT (ps != 0);
183
184       ps->vpp_server_handle = SESSION_INVALID_HANDLE;
185       hash_unset (pm->proxy_session_by_server_handle, handle);
186
187       if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
188         {
189           if (!ps->active_open_establishing)
190             proxy_session_free (ps);
191         }
192     }
193   clib_spinlock_unlock_if_init (&pm->sessions_lock);
194 }
195
196 static int
197 common_fifo_tuning_callback (session_t * s, svm_fifo_t * f,
198                              session_ft_action_t act, u32 bytes)
199 {
200   proxy_main_t *pm = &proxy_main;
201
202   segment_manager_t *sm = segment_manager_get (f->segment_manager);
203   fifo_segment_t *fs = segment_manager_get_segment (sm, f->segment_index);
204
205   u8 seg_usage = fifo_segment_get_mem_usage (fs);
206   u32 fifo_in_use = svm_fifo_max_dequeue_prod (f);
207   u32 fifo_size = svm_fifo_size (f);
208   u8 fifo_usage = fifo_in_use * 100 / fifo_size;
209   u8 update_size = 0;
210
211   ASSERT (act < SESSION_FT_ACTION_N_ACTIONS);
212
213   if (act == SESSION_FT_ACTION_ENQUEUED)
214     {
215       if (seg_usage < pm->low_watermark && fifo_usage > 50)
216         update_size = fifo_in_use;
217       else if (seg_usage < pm->high_watermark && fifo_usage > 80)
218         update_size = fifo_in_use;
219
220       update_size = clib_min (update_size, sm->max_fifo_size - fifo_size);
221       if (update_size)
222         svm_fifo_set_size (f, fifo_size + update_size);
223     }
224   else                          /* dequeued */
225     {
226       if (seg_usage > pm->high_watermark || fifo_usage < 20)
227         update_size = bytes;
228       else if (seg_usage > pm->low_watermark && fifo_usage < 50)
229         update_size = (bytes / 2);
230
231       ASSERT (fifo_size >= 4096);
232       update_size = clib_min (update_size, fifo_size - 4096);
233       if (update_size)
234         svm_fifo_set_size (f, fifo_size - update_size);
235     }
236
237   return 0;
238 }
239
240 static int
241 proxy_accept_callback (session_t * s)
242 {
243   proxy_main_t *pm = &proxy_main;
244   proxy_session_t *ps;
245
246   clib_spinlock_lock_if_init (&pm->sessions_lock);
247
248   pool_get_zero (pm->sessions, ps);
249   ps->vpp_server_handle = session_handle (s);
250   ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
251
252   hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
253             ps - pm->sessions);
254
255   clib_spinlock_unlock_if_init (&pm->sessions_lock);
256
257   s->session_state = SESSION_STATE_READY;
258
259   return 0;
260 }
261
262 static void
263 proxy_disconnect_callback (session_t * s)
264 {
265   proxy_try_close_session (s, 0 /* is_active_open */ );
266 }
267
268 static void
269 proxy_reset_callback (session_t * s)
270 {
271   proxy_try_close_session (s, 0 /* is_active_open */ );
272 }
273
274 static int
275 proxy_connected_callback (u32 app_index, u32 api_context,
276                           session_t * s, session_error_t err)
277 {
278   clib_warning ("called...");
279   return -1;
280 }
281
282 static int
283 proxy_add_segment_callback (u32 client_index, u64 segment_handle)
284 {
285   return 0;
286 }
287
288 static int
289 proxy_transport_needs_crypto (transport_proto_t proto)
290 {
291   return proto == TRANSPORT_PROTO_TLS;
292 }
293
294 static int
295 proxy_rx_callback (session_t * s)
296 {
297   proxy_main_t *pm = &proxy_main;
298   u32 thread_index = vlib_get_thread_index ();
299   svm_fifo_t *ao_tx_fifo;
300   proxy_session_t *ps;
301
302   ASSERT (s->thread_index == thread_index);
303
304   clib_spinlock_lock_if_init (&pm->sessions_lock);
305
306   ps = proxy_get_passive_open (pm, session_handle (s));
307   ASSERT (ps != 0);
308
309   if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE))
310     {
311       clib_spinlock_unlock_if_init (&pm->sessions_lock);
312
313       ao_tx_fifo = s->rx_fifo;
314
315       /*
316        * Send event for active open tx fifo
317        */
318       if (svm_fifo_set_event (ao_tx_fifo))
319         {
320           u32 ao_thread_index = ao_tx_fifo->master_thread_index;
321           u32 ao_session_index = ao_tx_fifo->shr->master_session_index;
322           if (session_send_io_evt_to_thread_custom (&ao_session_index,
323                                                     ao_thread_index,
324                                                     SESSION_IO_EVT_TX))
325             clib_warning ("failed to enqueue tx evt");
326         }
327
328       if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
329         svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
330     }
331   else
332     {
333       vnet_connect_args_t _a, *a = &_a;
334       svm_fifo_t *tx_fifo, *rx_fifo;
335       u32 max_dequeue, proxy_index;
336       int actual_transfer __attribute__ ((unused));
337
338       rx_fifo = s->rx_fifo;
339       tx_fifo = s->tx_fifo;
340
341       ASSERT (rx_fifo->master_thread_index == thread_index);
342       ASSERT (tx_fifo->master_thread_index == thread_index);
343
344       max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
345
346       if (PREDICT_FALSE (max_dequeue == 0))
347         return 0;
348
349       max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
350       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
351                                        max_dequeue, pm->rx_buf[thread_index]);
352
353       /* $$$ your message in this space: parse url, etc. */
354
355       clib_memset (a, 0, sizeof (*a));
356
357       ps->server_rx_fifo = rx_fifo;
358       ps->server_tx_fifo = tx_fifo;
359       ps->active_open_establishing = 1;
360       proxy_index = ps - pm->sessions;
361
362       clib_spinlock_unlock_if_init (&pm->sessions_lock);
363
364       clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
365       a->api_context = proxy_index;
366       a->app_index = pm->active_open_app_index;
367
368       if (proxy_transport_needs_crypto (a->sep.transport_proto))
369         {
370           session_endpoint_alloc_ext_cfg (&a->sep_ext,
371                                           TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
372           a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
373         }
374
375       proxy_call_main_thread (a);
376     }
377
378   return 0;
379 }
380
381 static void
382 proxy_force_ack (void *handlep)
383 {
384   transport_connection_t *tc;
385   session_t *ao_s;
386
387   ao_s = session_get_from_handle (pointer_to_uword (handlep));
388   if (session_get_transport_proto (ao_s) != TRANSPORT_PROTO_TCP)
389     return;
390   tc = session_get_transport (ao_s);
391   tcp_send_ack ((tcp_connection_t *) tc);
392 }
393
394 static int
395 proxy_tx_callback (session_t * proxy_s)
396 {
397   proxy_main_t *pm = &proxy_main;
398   proxy_session_t *ps;
399   u32 min_free;
400
401   min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
402   if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free)
403     {
404       svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
405       return 0;
406     }
407
408   clib_spinlock_lock_if_init (&pm->sessions_lock);
409
410   ps = proxy_get_passive_open (pm, session_handle (proxy_s));
411   ASSERT (ps != 0);
412
413   if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
414     return 0;
415
416   /* Force ack on active open side to update rcv wnd. Make sure it's done on
417    * the right thread */
418   void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *);
419   session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index,
420                                   proxy_force_ack, arg);
421
422   clib_spinlock_unlock_if_init (&pm->sessions_lock);
423
424   return 0;
425 }
426
427 static void
428 proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
429 {
430   if (ntf == SESSION_CLEANUP_TRANSPORT)
431     return;
432
433   proxy_try_delete_session (s, 0 /* is_active_open */ );
434 }
435
436 static session_cb_vft_t proxy_session_cb_vft = {
437   .session_accept_callback = proxy_accept_callback,
438   .session_disconnect_callback = proxy_disconnect_callback,
439   .session_connected_callback = proxy_connected_callback,
440   .add_segment_callback = proxy_add_segment_callback,
441   .builtin_app_rx_callback = proxy_rx_callback,
442   .builtin_app_tx_callback = proxy_tx_callback,
443   .session_reset_callback = proxy_reset_callback,
444   .session_cleanup_callback = proxy_cleanup_callback,
445   .fifo_tuning_callback = common_fifo_tuning_callback
446 };
447
448 static int
449 active_open_connected_callback (u32 app_index, u32 opaque,
450                                 session_t * s, session_error_t err)
451 {
452   proxy_main_t *pm = &proxy_main;
453   proxy_session_t *ps;
454   u8 thread_index = vlib_get_thread_index ();
455
456   /*
457    * Setup proxy session handle.
458    */
459   clib_spinlock_lock_if_init (&pm->sessions_lock);
460
461   ps = pool_elt_at_index (pm->sessions, opaque);
462
463   /* Connection failed */
464   if (err)
465     {
466       vnet_disconnect_args_t _a, *a = &_a;
467
468       a->handle = ps->vpp_server_handle;
469       a->app_index = pm->server_app_index;
470       vnet_disconnect_session (a);
471       ps->po_disconnected = 1;
472     }
473   else
474     {
475       ps->vpp_active_open_handle = session_handle (s);
476       ps->active_open_establishing = 0;
477     }
478
479   /* Passive open session was already closed! */
480   if (ps->po_disconnected)
481     {
482       /* Setup everything for the cleanup notification */
483       hash_set (pm->proxy_session_by_active_open_handle,
484                 ps->vpp_active_open_handle, opaque);
485       ps->ao_disconnected = 1;
486       clib_spinlock_unlock_if_init (&pm->sessions_lock);
487       return -1;
488     }
489
490   s->tx_fifo = ps->server_rx_fifo;
491   s->rx_fifo = ps->server_tx_fifo;
492
493   /*
494    * Reset the active-open tx-fifo master indices so the active-open session
495    * will receive data, etc.
496    */
497   s->tx_fifo->shr->master_session_index = s->session_index;
498   s->tx_fifo->master_thread_index = s->thread_index;
499
500   /*
501    * Account for the active-open session's use of the fifos
502    * so they won't disappear until the last session which uses
503    * them disappears
504    */
505   s->tx_fifo->refcnt++;
506   s->rx_fifo->refcnt++;
507
508   hash_set (pm->proxy_session_by_active_open_handle,
509             ps->vpp_active_open_handle, opaque);
510
511   clib_spinlock_unlock_if_init (&pm->sessions_lock);
512
513   /*
514    * Send event for active open tx fifo
515    */
516   ASSERT (s->thread_index == thread_index);
517   if (svm_fifo_set_event (s->tx_fifo))
518     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
519
520   return 0;
521 }
522
523 static void
524 active_open_reset_callback (session_t * s)
525 {
526   proxy_try_close_session (s, 1 /* is_active_open */ );
527 }
528
529 static int
530 active_open_create_callback (session_t * s)
531 {
532   return 0;
533 }
534
535 static void
536 active_open_disconnect_callback (session_t * s)
537 {
538   proxy_try_close_session (s, 1 /* is_active_open */ );
539 }
540
541 static int
542 active_open_rx_callback (session_t * s)
543 {
544   svm_fifo_t *proxy_tx_fifo;
545
546   proxy_tx_fifo = s->rx_fifo;
547
548   /*
549    * Send event for server tx fifo
550    */
551   if (svm_fifo_set_event (proxy_tx_fifo))
552     {
553       u8 thread_index = proxy_tx_fifo->master_thread_index;
554       u32 session_index = proxy_tx_fifo->shr->master_session_index;
555       return session_send_io_evt_to_thread_custom (&session_index,
556                                                    thread_index,
557                                                    SESSION_IO_EVT_TX);
558     }
559
560   if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS)
561     svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
562
563   return 0;
564 }
565
566 static int
567 active_open_tx_callback (session_t * ao_s)
568 {
569   proxy_main_t *pm = &proxy_main;
570   transport_connection_t *tc;
571   session_handle_t handle;
572   proxy_session_t *ps;
573   session_t *proxy_s;
574   u32 min_free;
575   uword *p;
576
577   min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
578   if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free)
579     {
580       svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
581       return 0;
582     }
583
584   clib_spinlock_lock_if_init (&pm->sessions_lock);
585
586   handle = session_handle (ao_s);
587   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
588   if (!p)
589     return 0;
590
591   if (pool_is_free_index (pm->sessions, p[0]))
592     return 0;
593
594   ps = pool_elt_at_index (pm->sessions, p[0]);
595   if (ps->vpp_server_handle == ~0)
596     return 0;
597
598   proxy_s = session_get_from_handle (ps->vpp_server_handle);
599
600   /* Force ack on proxy side to update rcv wnd */
601   tc = session_get_transport (proxy_s);
602   tcp_send_ack ((tcp_connection_t *) tc);
603
604   clib_spinlock_unlock_if_init (&pm->sessions_lock);
605
606   return 0;
607 }
608
609 static void
610 active_open_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
611 {
612   if (ntf == SESSION_CLEANUP_TRANSPORT)
613     return;
614
615   proxy_try_delete_session (s, 1 /* is_active_open */ );
616 }
617
618 /* *INDENT-OFF* */
619 static session_cb_vft_t active_open_clients = {
620   .session_reset_callback = active_open_reset_callback,
621   .session_connected_callback = active_open_connected_callback,
622   .session_accept_callback = active_open_create_callback,
623   .session_disconnect_callback = active_open_disconnect_callback,
624   .session_cleanup_callback = active_open_cleanup_callback,
625   .builtin_app_rx_callback = active_open_rx_callback,
626   .builtin_app_tx_callback = active_open_tx_callback,
627   .fifo_tuning_callback = common_fifo_tuning_callback
628 };
629 /* *INDENT-ON* */
630
631 static int
632 proxy_server_attach ()
633 {
634   proxy_main_t *pm = &proxy_main;
635   u64 options[APP_OPTIONS_N_OPTIONS];
636   vnet_app_attach_args_t _a, *a = &_a;
637   u32 segment_size = 512 << 20;
638
639   clib_memset (a, 0, sizeof (*a));
640   clib_memset (options, 0, sizeof (options));
641
642   if (pm->private_segment_size)
643     segment_size = pm->private_segment_size;
644   a->name = format (0, "proxy-server");
645   a->api_client_index = pm->server_client_index;
646   a->session_cb_vft = &proxy_session_cb_vft;
647   a->options = options;
648   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
649   a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = segment_size;
650   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
651   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
652   a->options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
653   a->options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
654   a->options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
655   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
656   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
657     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
658
659   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
660
661   if (vnet_application_attach (a))
662     {
663       clib_warning ("failed to attach server");
664       return -1;
665     }
666   pm->server_app_index = a->app_index;
667
668   vec_free (a->name);
669   return 0;
670 }
671
672 static int
673 active_open_attach (void)
674 {
675   proxy_main_t *pm = &proxy_main;
676   vnet_app_attach_args_t _a, *a = &_a;
677   u64 options[APP_OPTIONS_N_OPTIONS];
678
679   clib_memset (a, 0, sizeof (*a));
680   clib_memset (options, 0, sizeof (options));
681
682   a->api_client_index = pm->active_open_client_index;
683   a->session_cb_vft = &active_open_clients;
684   a->name = format (0, "proxy-active-open");
685
686   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
687   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
688   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
689   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
690   options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
691   options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
692   options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
693   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
694   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
695     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
696
697   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
698     | APP_OPTIONS_FLAGS_IS_PROXY;
699
700   a->options = options;
701
702   if (vnet_application_attach (a))
703     return -1;
704
705   pm->active_open_app_index = a->app_index;
706
707   vec_free (a->name);
708
709   return 0;
710 }
711
712 static int
713 proxy_server_listen ()
714 {
715   proxy_main_t *pm = &proxy_main;
716   vnet_listen_args_t _a, *a = &_a;
717   int rv;
718
719   clib_memset (a, 0, sizeof (*a));
720
721   a->app_index = pm->server_app_index;
722   clib_memcpy (&a->sep_ext, &pm->server_sep, sizeof (pm->server_sep));
723   if (proxy_transport_needs_crypto (a->sep.transport_proto))
724     {
725       session_endpoint_alloc_ext_cfg (&a->sep_ext,
726                                       TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
727       a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
728     }
729
730   rv = vnet_listen (a);
731   if (a->sep_ext.ext_cfg)
732     clib_mem_free (a->sep_ext.ext_cfg);
733
734   return rv;
735 }
736
737 static void
738 proxy_server_add_ckpair (void)
739 {
740   vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair;
741   proxy_main_t *pm = &proxy_main;
742
743   clib_memset (ck_pair, 0, sizeof (*ck_pair));
744   ck_pair->cert = (u8 *) test_srv_crt_rsa;
745   ck_pair->key = (u8 *) test_srv_key_rsa;
746   ck_pair->cert_len = test_srv_crt_rsa_len;
747   ck_pair->key_len = test_srv_key_rsa_len;
748   vnet_app_add_cert_key_pair (ck_pair);
749
750   pm->ckpair_index = ck_pair->index;
751 }
752
753 static int
754 proxy_server_create (vlib_main_t * vm)
755 {
756   proxy_main_t *pm = &proxy_main;
757   vlib_thread_main_t *vtm = vlib_get_thread_main ();
758   u32 num_threads;
759   int i;
760
761   num_threads = 1 /* main thread */  + vtm->n_threads;
762   vec_validate (proxy_main.server_event_queue, num_threads - 1);
763   vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
764   vec_validate (pm->rx_buf, num_threads - 1);
765
766   for (i = 0; i < num_threads; i++)
767     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
768
769   proxy_server_add_ckpair ();
770
771   if (proxy_server_attach ())
772     {
773       clib_warning ("failed to attach server app");
774       return -1;
775     }
776   if (proxy_server_listen ())
777     {
778       clib_warning ("failed to start listening");
779       return -1;
780     }
781   if (active_open_attach ())
782     {
783       clib_warning ("failed to attach active open app");
784       return -1;
785     }
786
787   for (i = 0; i < num_threads; i++)
788     {
789       pm->active_open_event_queue[i] = session_main_get_vpp_event_queue (i);
790
791       ASSERT (pm->active_open_event_queue[i]);
792
793       pm->server_event_queue[i] = session_main_get_vpp_event_queue (i);
794     }
795
796   return 0;
797 }
798
799 static clib_error_t *
800 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
801                                 vlib_cli_command_t * cmd)
802 {
803   unformat_input_t _line_input, *line_input = &_line_input;
804   char *default_server_uri = "tcp://0.0.0.0/23";
805   char *default_client_uri = "tcp://6.0.2.2/23";
806   u8 *server_uri = 0, *client_uri = 0;
807   proxy_main_t *pm = &proxy_main;
808   clib_error_t *error = 0;
809   int rv, tmp32;
810   u64 tmp64;
811
812   pm->fifo_size = 64 << 10;
813   pm->max_fifo_size = 128 << 20;
814   pm->high_watermark = 80;
815   pm->low_watermark = 50;
816   pm->rcv_buffer_size = 1024;
817   pm->prealloc_fifos = 0;
818   pm->private_segment_count = 0;
819   pm->private_segment_size = 0;
820
821   if (vlib_num_workers ())
822     clib_spinlock_init (&pm->sessions_lock);
823
824   if (!unformat_user (input, unformat_line_input, line_input))
825     return 0;
826
827   while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
828     {
829       if (unformat (line_input, "fifo-size %U", unformat_memory_size,
830                     &pm->fifo_size))
831         ;
832       else if (unformat (line_input, "max-fifo-size %U", unformat_memory_size,
833                          &pm->max_fifo_size))
834         ;
835       else if (unformat (line_input, "high-watermark %d", &tmp32))
836         pm->high_watermark = (u8) tmp32;
837       else if (unformat (line_input, "low-watermark %d", &tmp32))
838         pm->low_watermark = (u8) tmp32;
839       else if (unformat (line_input, "rcv-buf-size %d", &pm->rcv_buffer_size))
840         ;
841       else if (unformat (line_input, "prealloc-fifos %d", &pm->prealloc_fifos))
842         ;
843       else if (unformat (line_input, "private-segment-count %d",
844                          &pm->private_segment_count))
845         ;
846       else if (unformat (line_input, "private-segment-size %U",
847                          unformat_memory_size, &tmp64))
848         {
849           if (tmp64 >= 0x100000000ULL)
850             {
851               error = clib_error_return (
852                 0, "private segment size %lld (%llu) too large", tmp64, tmp64);
853               goto done;
854             }
855           pm->private_segment_size = tmp64;
856         }
857       else if (unformat (line_input, "server-uri %s", &server_uri))
858         vec_add1 (server_uri, 0);
859       else if (unformat (line_input, "client-uri %s", &client_uri))
860         vec_add1 (client_uri, 0);
861       else
862         {
863           error = clib_error_return (0, "unknown input `%U'",
864                                      format_unformat_error, line_input);
865           goto done;
866         }
867     }
868
869   if (!server_uri)
870     {
871       clib_warning ("No server-uri provided, Using default: %s",
872                     default_server_uri);
873       server_uri = format (0, "%s%c", default_server_uri, 0);
874     }
875   if (!client_uri)
876     {
877       clib_warning ("No client-uri provided, Using default: %s",
878                     default_client_uri);
879       client_uri = format (0, "%s%c", default_client_uri, 0);
880     }
881
882   if (parse_uri ((char *) server_uri, &pm->server_sep))
883     {
884       error = clib_error_return (0, "Invalid server uri %v", server_uri);
885       goto done;
886     }
887   if (parse_uri ((char *) client_uri, &pm->client_sep))
888     {
889       error = clib_error_return (0, "Invalid client uri %v", client_uri);
890       goto done;
891     }
892
893   vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
894
895   rv = proxy_server_create (vm);
896   switch (rv)
897     {
898     case 0:
899       break;
900     default:
901       error = clib_error_return (0, "server_create returned %d", rv);
902     }
903
904 done:
905   unformat_free (line_input);
906   vec_free (client_uri);
907   vec_free (server_uri);
908   return error;
909 }
910
911 /* *INDENT-OFF* */
912 VLIB_CLI_COMMAND (proxy_create_command, static) =
913 {
914   .path = "test proxy server",
915   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
916       "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
917       "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
918       "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
919       "[private-segment-size <mem>][private-segment-count <nn>]",
920   .function = proxy_server_create_command_fn,
921 };
922 /* *INDENT-ON* */
923
924 clib_error_t *
925 proxy_main_init (vlib_main_t * vm)
926 {
927   proxy_main_t *pm = &proxy_main;
928   pm->server_client_index = ~0;
929   pm->active_open_client_index = ~0;
930   pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
931   pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
932
933   return 0;
934 }
935
936 VLIB_INIT_FUNCTION (proxy_main_init);
937
938 /*
939 * fd.io coding-style-patch-verification: ON
940 *
941 * Local Variables:
942 * eval: (c-set-style "gnu")
943 * End:
944 */