svm: split fifo into private and shared structs
[vpp.git] / src / plugins / hs_apps / proxy.c
1 /*
2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <hs_apps/proxy.h>
21 #include <vnet/tcp/tcp.h>
22
23 proxy_main_t proxy_main;
24
25 #define TCP_MSS 1460
26
27 typedef struct
28 {
29   char uri[128];
30   u32 app_index;
31   u32 api_context;
32 } proxy_connect_args_t;
33
34 static void
35 proxy_cb_fn (void *data, u32 data_len)
36 {
37   proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
38   vnet_connect_args_t a;
39
40   memset (&a, 0, sizeof (a));
41   a.api_context = pa->api_context;
42   a.app_index = pa->app_index;
43   a.uri = pa->uri;
44   vnet_connect_uri (&a);
45 }
46
47 static void
48 proxy_call_main_thread (vnet_connect_args_t * a)
49 {
50   if (vlib_get_thread_index () == 0)
51     {
52       vnet_connect_uri (a);
53     }
54   else
55     {
56       proxy_connect_args_t args;
57       args.api_context = a->api_context;
58       args.app_index = a->app_index;
59       clib_memcpy (args.uri, a->uri, vec_len (a->uri));
60       vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
61     }
62 }
63
64 static proxy_session_t *
65 proxy_get_active_open (proxy_main_t * pm, session_handle_t handle)
66 {
67   proxy_session_t *ps = 0;
68   uword *p;
69
70   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
71   if (p)
72     ps = pool_elt_at_index (pm->sessions, p[0]);
73   return ps;
74 }
75
76 static proxy_session_t *
77 proxy_get_passive_open (proxy_main_t * pm, session_handle_t handle)
78 {
79   proxy_session_t *ps = 0;
80   uword *p;
81
82   p = hash_get (pm->proxy_session_by_server_handle, handle);
83   if (p)
84     ps = pool_elt_at_index (pm->sessions, p[0]);
85   return ps;
86 }
87
88 static void
89 proxy_try_close_session (session_t * s, int is_active_open)
90 {
91   proxy_main_t *pm = &proxy_main;
92   proxy_session_t *ps = 0;
93   vnet_disconnect_args_t _a, *a = &_a;
94   session_handle_t handle;
95
96   handle = session_handle (s);
97
98   clib_spinlock_lock_if_init (&pm->sessions_lock);
99
100   if (is_active_open)
101     {
102       ps = proxy_get_active_open (pm, handle);
103       ASSERT (ps != 0);
104
105       a->handle = ps->vpp_active_open_handle;
106       a->app_index = pm->active_open_app_index;
107       vnet_disconnect_session (a);
108       ps->ao_disconnected = 1;
109
110       if (!ps->po_disconnected)
111         {
112           ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE);
113           a->handle = ps->vpp_server_handle;
114           a->app_index = pm->server_app_index;
115           vnet_disconnect_session (a);
116           ps->po_disconnected = 1;
117         }
118     }
119   else
120     {
121       ps = proxy_get_passive_open (pm, handle);
122       ASSERT (ps != 0);
123
124       a->handle = ps->vpp_server_handle;
125       a->app_index = pm->server_app_index;
126       vnet_disconnect_session (a);
127       ps->po_disconnected = 1;
128
129       if (!ps->ao_disconnected && !ps->active_open_establishing)
130         {
131           /* Proxy session closed before active open */
132           if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)
133             {
134               a->handle = ps->vpp_active_open_handle;
135               a->app_index = pm->active_open_app_index;
136               vnet_disconnect_session (a);
137             }
138           ps->ao_disconnected = 1;
139         }
140     }
141   clib_spinlock_unlock_if_init (&pm->sessions_lock);
142 }
143
144 static void
145 proxy_session_free (proxy_session_t * ps)
146 {
147   proxy_main_t *pm = &proxy_main;
148   if (CLIB_DEBUG > 0)
149     clib_memset (ps, 0xFE, sizeof (*ps));
150   pool_put (pm->sessions, ps);
151 }
152
153 static void
154 proxy_try_delete_session (session_t * s, u8 is_active_open)
155 {
156   proxy_main_t *pm = &proxy_main;
157   proxy_session_t *ps = 0;
158   session_handle_t handle;
159
160   handle = session_handle (s);
161
162   clib_spinlock_lock_if_init (&pm->sessions_lock);
163
164   if (is_active_open)
165     {
166       ps = proxy_get_active_open (pm, handle);
167       ASSERT (ps != 0);
168
169       ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
170       hash_unset (pm->proxy_session_by_active_open_handle, handle);
171
172       if (ps->vpp_server_handle == SESSION_INVALID_HANDLE)
173         proxy_session_free (ps);
174     }
175   else
176     {
177       ps = proxy_get_passive_open (pm, handle);
178       ASSERT (ps != 0);
179
180       ps->vpp_server_handle = SESSION_INVALID_HANDLE;
181       hash_unset (pm->proxy_session_by_server_handle, handle);
182
183       if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
184         {
185           if (!ps->active_open_establishing)
186             proxy_session_free (ps);
187         }
188     }
189   clib_spinlock_unlock_if_init (&pm->sessions_lock);
190 }
191
192 static int
193 common_fifo_tuning_callback (session_t * s, svm_fifo_t * f,
194                              session_ft_action_t act, u32 bytes)
195 {
196   proxy_main_t *pm = &proxy_main;
197
198   segment_manager_t *sm = segment_manager_get (f->segment_manager);
199   fifo_segment_t *fs = segment_manager_get_segment (sm, f->segment_index);
200
201   u8 seg_usage = fifo_segment_get_mem_usage (fs);
202   u32 fifo_in_use = svm_fifo_max_dequeue_prod (f);
203   u32 fifo_size = svm_fifo_size (f);
204   u8 fifo_usage = fifo_in_use * 100 / fifo_size;
205   u8 update_size = 0;
206
207   ASSERT (act < SESSION_FT_ACTION_N_ACTIONS);
208
209   if (act == SESSION_FT_ACTION_ENQUEUED)
210     {
211       if (seg_usage < pm->low_watermark && fifo_usage > 50)
212         update_size = fifo_in_use;
213       else if (seg_usage < pm->high_watermark && fifo_usage > 80)
214         update_size = fifo_in_use;
215
216       update_size = clib_min (update_size, sm->max_fifo_size - fifo_size);
217       if (update_size)
218         svm_fifo_set_size (f, fifo_size + update_size);
219     }
220   else                          /* dequeued */
221     {
222       if (seg_usage > pm->high_watermark || fifo_usage < 20)
223         update_size = bytes;
224       else if (seg_usage > pm->low_watermark && fifo_usage < 50)
225         update_size = (bytes / 2);
226
227       ASSERT (fifo_size >= 4096);
228       update_size = clib_min (update_size, fifo_size - 4096);
229       if (update_size)
230         svm_fifo_set_size (f, fifo_size - update_size);
231     }
232
233   return 0;
234 }
235
236 static int
237 proxy_accept_callback (session_t * s)
238 {
239   proxy_main_t *pm = &proxy_main;
240   proxy_session_t *ps;
241
242   clib_spinlock_lock_if_init (&pm->sessions_lock);
243
244   pool_get_zero (pm->sessions, ps);
245   ps->vpp_server_handle = session_handle (s);
246   ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
247
248   hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
249             ps - pm->sessions);
250
251   clib_spinlock_unlock_if_init (&pm->sessions_lock);
252
253   s->session_state = SESSION_STATE_READY;
254
255   return 0;
256 }
257
258 static void
259 proxy_disconnect_callback (session_t * s)
260 {
261   proxy_try_close_session (s, 0 /* is_active_open */ );
262 }
263
264 static void
265 proxy_reset_callback (session_t * s)
266 {
267   proxy_try_close_session (s, 0 /* is_active_open */ );
268 }
269
270 static int
271 proxy_connected_callback (u32 app_index, u32 api_context,
272                           session_t * s, session_error_t err)
273 {
274   clib_warning ("called...");
275   return -1;
276 }
277
278 static int
279 proxy_add_segment_callback (u32 client_index, u64 segment_handle)
280 {
281   clib_warning ("called...");
282   return -1;
283 }
284
285 static int
286 proxy_rx_callback (session_t * s)
287 {
288   proxy_main_t *pm = &proxy_main;
289   u32 thread_index = vlib_get_thread_index ();
290   svm_fifo_t *ao_tx_fifo;
291   proxy_session_t *ps;
292
293   ASSERT (s->thread_index == thread_index);
294
295   clib_spinlock_lock_if_init (&pm->sessions_lock);
296
297   ps = proxy_get_passive_open (pm, session_handle (s));
298   ASSERT (ps != 0);
299
300   if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE))
301     {
302       clib_spinlock_unlock_if_init (&pm->sessions_lock);
303
304       ao_tx_fifo = s->rx_fifo;
305
306       /*
307        * Send event for active open tx fifo
308        */
309       if (svm_fifo_set_event (ao_tx_fifo))
310         {
311           u32 ao_thread_index = ao_tx_fifo->master_thread_index;
312           u32 ao_session_index = ao_tx_fifo->shr->master_session_index;
313           if (session_send_io_evt_to_thread_custom (&ao_session_index,
314                                                     ao_thread_index,
315                                                     SESSION_IO_EVT_TX))
316             clib_warning ("failed to enqueue tx evt");
317         }
318
319       if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
320         svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
321     }
322   else
323     {
324       vnet_connect_args_t _a, *a = &_a;
325       svm_fifo_t *tx_fifo, *rx_fifo;
326       u32 max_dequeue, proxy_index;
327       int actual_transfer __attribute__ ((unused));
328
329       rx_fifo = s->rx_fifo;
330       tx_fifo = s->tx_fifo;
331
332       ASSERT (rx_fifo->master_thread_index == thread_index);
333       ASSERT (tx_fifo->master_thread_index == thread_index);
334
335       max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
336
337       if (PREDICT_FALSE (max_dequeue == 0))
338         return 0;
339
340       max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
341       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
342                                        max_dequeue, pm->rx_buf[thread_index]);
343
344       /* $$$ your message in this space: parse url, etc. */
345
346       clib_memset (a, 0, sizeof (*a));
347
348       ps->server_rx_fifo = rx_fifo;
349       ps->server_tx_fifo = tx_fifo;
350       ps->active_open_establishing = 1;
351       proxy_index = ps - pm->sessions;
352
353       clib_spinlock_unlock_if_init (&pm->sessions_lock);
354
355       a->uri = (char *) pm->client_uri;
356       a->api_context = proxy_index;
357       a->app_index = pm->active_open_app_index;
358       proxy_call_main_thread (a);
359     }
360
361   return 0;
362 }
363
364 static void
365 proxy_force_ack (void *handlep)
366 {
367   transport_connection_t *tc;
368   session_t *ao_s;
369
370   ao_s = session_get_from_handle (pointer_to_uword (handlep));
371   tc = session_get_transport (ao_s);
372   tcp_send_ack ((tcp_connection_t *) tc);
373 }
374
375 static int
376 proxy_tx_callback (session_t * proxy_s)
377 {
378   proxy_main_t *pm = &proxy_main;
379   proxy_session_t *ps;
380   u32 min_free;
381
382   min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
383   if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free)
384     {
385       svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
386       return 0;
387     }
388
389   clib_spinlock_lock_if_init (&pm->sessions_lock);
390
391   ps = proxy_get_passive_open (pm, session_handle (proxy_s));
392   ASSERT (ps != 0);
393
394   if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
395     return 0;
396
397   /* Force ack on active open side to update rcv wnd. Make sure it's done on
398    * the right thread */
399   void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *);
400   session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index,
401                                   proxy_force_ack, arg);
402
403   clib_spinlock_unlock_if_init (&pm->sessions_lock);
404
405   return 0;
406 }
407
408 static void
409 proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
410 {
411   if (ntf == SESSION_CLEANUP_TRANSPORT)
412     return;
413
414   proxy_try_delete_session (s, 0 /* is_active_open */ );
415 }
416
417 static session_cb_vft_t proxy_session_cb_vft = {
418   .session_accept_callback = proxy_accept_callback,
419   .session_disconnect_callback = proxy_disconnect_callback,
420   .session_connected_callback = proxy_connected_callback,
421   .add_segment_callback = proxy_add_segment_callback,
422   .builtin_app_rx_callback = proxy_rx_callback,
423   .builtin_app_tx_callback = proxy_tx_callback,
424   .session_reset_callback = proxy_reset_callback,
425   .session_cleanup_callback = proxy_cleanup_callback,
426   .fifo_tuning_callback = common_fifo_tuning_callback
427 };
428
429 static int
430 active_open_connected_callback (u32 app_index, u32 opaque,
431                                 session_t * s, session_error_t err)
432 {
433   proxy_main_t *pm = &proxy_main;
434   proxy_session_t *ps;
435   u8 thread_index = vlib_get_thread_index ();
436
437   /*
438    * Setup proxy session handle.
439    */
440   clib_spinlock_lock_if_init (&pm->sessions_lock);
441
442   ps = pool_elt_at_index (pm->sessions, opaque);
443
444   /* Connection failed */
445   if (err)
446     {
447       vnet_disconnect_args_t _a, *a = &_a;
448
449       a->handle = ps->vpp_server_handle;
450       a->app_index = pm->server_app_index;
451       vnet_disconnect_session (a);
452       ps->po_disconnected = 1;
453     }
454   else
455     {
456       ps->vpp_active_open_handle = session_handle (s);
457       ps->active_open_establishing = 0;
458     }
459
460   /* Passive open session was already closed! */
461   if (ps->po_disconnected)
462     {
463       /* Setup everything for the cleanup notification */
464       hash_set (pm->proxy_session_by_active_open_handle,
465                 ps->vpp_active_open_handle, opaque);
466       ps->ao_disconnected = 1;
467       clib_spinlock_unlock_if_init (&pm->sessions_lock);
468       return -1;
469     }
470
471   s->tx_fifo = ps->server_rx_fifo;
472   s->rx_fifo = ps->server_tx_fifo;
473
474   /*
475    * Reset the active-open tx-fifo master indices so the active-open session
476    * will receive data, etc.
477    */
478   s->tx_fifo->shr->master_session_index = s->session_index;
479   s->tx_fifo->master_thread_index = s->thread_index;
480
481   /*
482    * Account for the active-open session's use of the fifos
483    * so they won't disappear until the last session which uses
484    * them disappears
485    */
486   s->tx_fifo->refcnt++;
487   s->rx_fifo->refcnt++;
488
489   hash_set (pm->proxy_session_by_active_open_handle,
490             ps->vpp_active_open_handle, opaque);
491
492   clib_spinlock_unlock_if_init (&pm->sessions_lock);
493
494   /*
495    * Send event for active open tx fifo
496    */
497   ASSERT (s->thread_index == thread_index);
498   if (svm_fifo_set_event (s->tx_fifo))
499     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
500
501   return 0;
502 }
503
504 static void
505 active_open_reset_callback (session_t * s)
506 {
507   proxy_try_close_session (s, 1 /* is_active_open */ );
508 }
509
510 static int
511 active_open_create_callback (session_t * s)
512 {
513   return 0;
514 }
515
516 static void
517 active_open_disconnect_callback (session_t * s)
518 {
519   proxy_try_close_session (s, 1 /* is_active_open */ );
520 }
521
522 static int
523 active_open_rx_callback (session_t * s)
524 {
525   svm_fifo_t *proxy_tx_fifo;
526
527   proxy_tx_fifo = s->rx_fifo;
528
529   /*
530    * Send event for server tx fifo
531    */
532   if (svm_fifo_set_event (proxy_tx_fifo))
533     {
534       u8 thread_index = proxy_tx_fifo->master_thread_index;
535       u32 session_index = proxy_tx_fifo->shr->master_session_index;
536       return session_send_io_evt_to_thread_custom (&session_index,
537                                                    thread_index,
538                                                    SESSION_IO_EVT_TX);
539     }
540
541   if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS)
542     svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
543
544   return 0;
545 }
546
547 static int
548 active_open_tx_callback (session_t * ao_s)
549 {
550   proxy_main_t *pm = &proxy_main;
551   transport_connection_t *tc;
552   session_handle_t handle;
553   proxy_session_t *ps;
554   session_t *proxy_s;
555   u32 min_free;
556   uword *p;
557
558   min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
559   if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free)
560     {
561       svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
562       return 0;
563     }
564
565   clib_spinlock_lock_if_init (&pm->sessions_lock);
566
567   handle = session_handle (ao_s);
568   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
569   if (!p)
570     return 0;
571
572   if (pool_is_free_index (pm->sessions, p[0]))
573     return 0;
574
575   ps = pool_elt_at_index (pm->sessions, p[0]);
576   if (ps->vpp_server_handle == ~0)
577     return 0;
578
579   proxy_s = session_get_from_handle (ps->vpp_server_handle);
580
581   /* Force ack on proxy side to update rcv wnd */
582   tc = session_get_transport (proxy_s);
583   tcp_send_ack ((tcp_connection_t *) tc);
584
585   clib_spinlock_unlock_if_init (&pm->sessions_lock);
586
587   return 0;
588 }
589
590 static void
591 active_open_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
592 {
593   if (ntf == SESSION_CLEANUP_TRANSPORT)
594     return;
595
596   proxy_try_delete_session (s, 1 /* is_active_open */ );
597 }
598
599 /* *INDENT-OFF* */
600 static session_cb_vft_t active_open_clients = {
601   .session_reset_callback = active_open_reset_callback,
602   .session_connected_callback = active_open_connected_callback,
603   .session_accept_callback = active_open_create_callback,
604   .session_disconnect_callback = active_open_disconnect_callback,
605   .session_cleanup_callback = active_open_cleanup_callback,
606   .builtin_app_rx_callback = active_open_rx_callback,
607   .builtin_app_tx_callback = active_open_tx_callback,
608   .fifo_tuning_callback = common_fifo_tuning_callback
609 };
610 /* *INDENT-ON* */
611
612 static int
613 proxy_server_attach ()
614 {
615   proxy_main_t *pm = &proxy_main;
616   u64 options[APP_OPTIONS_N_OPTIONS];
617   vnet_app_attach_args_t _a, *a = &_a;
618   u32 segment_size = 512 << 20;
619
620   clib_memset (a, 0, sizeof (*a));
621   clib_memset (options, 0, sizeof (options));
622
623   if (pm->private_segment_size)
624     segment_size = pm->private_segment_size;
625   a->name = format (0, "proxy-server");
626   a->api_client_index = pm->server_client_index;
627   a->session_cb_vft = &proxy_session_cb_vft;
628   a->options = options;
629   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
630   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
631   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
632   a->options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
633   a->options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
634   a->options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
635   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
636   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
637     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
638
639   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
640
641   if (vnet_application_attach (a))
642     {
643       clib_warning ("failed to attach server");
644       return -1;
645     }
646   pm->server_app_index = a->app_index;
647
648   vec_free (a->name);
649   return 0;
650 }
651
652 static int
653 active_open_attach (void)
654 {
655   proxy_main_t *pm = &proxy_main;
656   vnet_app_attach_args_t _a, *a = &_a;
657   u64 options[APP_OPTIONS_N_OPTIONS];
658
659   clib_memset (a, 0, sizeof (*a));
660   clib_memset (options, 0, sizeof (options));
661
662   a->api_client_index = pm->active_open_client_index;
663   a->session_cb_vft = &active_open_clients;
664   a->name = format (0, "proxy-active-open");
665
666   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
667   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
668   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
669   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
670   options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
671   options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
672   options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
673   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
674   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
675     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
676
677   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
678     | APP_OPTIONS_FLAGS_IS_PROXY;
679
680   a->options = options;
681
682   if (vnet_application_attach (a))
683     return -1;
684
685   pm->active_open_app_index = a->app_index;
686
687   vec_free (a->name);
688
689   return 0;
690 }
691
692 static int
693 proxy_server_listen ()
694 {
695   proxy_main_t *pm = &proxy_main;
696   vnet_listen_args_t _a, *a = &_a;
697   clib_memset (a, 0, sizeof (*a));
698   a->app_index = pm->server_app_index;
699   a->uri = (char *) pm->server_uri;
700   return vnet_bind_uri (a);
701 }
702
703 static int
704 proxy_server_create (vlib_main_t * vm)
705 {
706   proxy_main_t *pm = &proxy_main;
707   vlib_thread_main_t *vtm = vlib_get_thread_main ();
708   u32 num_threads;
709   int i;
710
711   num_threads = 1 /* main thread */  + vtm->n_threads;
712   vec_validate (proxy_main.server_event_queue, num_threads - 1);
713   vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
714   vec_validate (pm->rx_buf, num_threads - 1);
715
716   for (i = 0; i < num_threads; i++)
717     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
718
719   if (proxy_server_attach ())
720     {
721       clib_warning ("failed to attach server app");
722       return -1;
723     }
724   if (proxy_server_listen ())
725     {
726       clib_warning ("failed to start listening");
727       return -1;
728     }
729   if (active_open_attach ())
730     {
731       clib_warning ("failed to attach active open app");
732       return -1;
733     }
734
735   for (i = 0; i < num_threads; i++)
736     {
737       pm->active_open_event_queue[i] = session_main_get_vpp_event_queue (i);
738
739       ASSERT (pm->active_open_event_queue[i]);
740
741       pm->server_event_queue[i] = session_main_get_vpp_event_queue (i);
742     }
743
744   return 0;
745 }
746
747 static clib_error_t *
748 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
749                                 vlib_cli_command_t * cmd)
750 {
751   proxy_main_t *pm = &proxy_main;
752   char *default_server_uri = "tcp://0.0.0.0/23";
753   char *default_client_uri = "tcp://6.0.2.2/23";
754   int rv, tmp32;
755   u64 tmp64;
756
757   pm->fifo_size = 64 << 10;
758   pm->max_fifo_size = 128 << 20;
759   pm->high_watermark = 80;
760   pm->low_watermark = 50;
761   pm->rcv_buffer_size = 1024;
762   pm->prealloc_fifos = 0;
763   pm->private_segment_count = 0;
764   pm->private_segment_size = 0;
765   pm->server_uri = 0;
766   pm->client_uri = 0;
767   if (vlib_num_workers ())
768     clib_spinlock_init (&pm->sessions_lock);
769
770   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
771     {
772       if (unformat (input, "fifo-size %U",
773                     unformat_memory_size, &pm->fifo_size))
774         ;
775       else if (unformat (input, "max-fifo-size %U",
776                          unformat_memory_size, &pm->max_fifo_size))
777         ;
778       else if (unformat (input, "high-watermark %d", &tmp32))
779         pm->high_watermark = (u8) tmp32;
780       else if (unformat (input, "low-watermark %d", &tmp32))
781         pm->low_watermark = (u8) tmp32;
782       else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
783         ;
784       else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
785         ;
786       else if (unformat (input, "private-segment-count %d",
787                          &pm->private_segment_count))
788         ;
789       else if (unformat (input, "private-segment-size %U",
790                          unformat_memory_size, &tmp64))
791         {
792           if (tmp64 >= 0x100000000ULL)
793             return clib_error_return
794               (0, "private segment size %lld (%llu) too large", tmp64, tmp64);
795           pm->private_segment_size = tmp64;
796         }
797       else if (unformat (input, "server-uri %s", &pm->server_uri))
798         vec_add1 (pm->server_uri, 0);
799       else if (unformat (input, "client-uri %s", &pm->client_uri))
800         vec_add1 (pm->client_uri, 0);
801       else
802         return clib_error_return (0, "unknown input `%U'",
803                                   format_unformat_error, input);
804     }
805
806   if (!pm->server_uri)
807     {
808       clib_warning ("No server-uri provided, Using default: %s",
809                     default_server_uri);
810       pm->server_uri = format (0, "%s%c", default_server_uri, 0);
811     }
812   if (!pm->client_uri)
813     {
814       clib_warning ("No client-uri provided, Using default: %s",
815                     default_client_uri);
816       pm->client_uri = format (0, "%s%c", default_client_uri, 0);
817     }
818
819   vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
820
821   rv = proxy_server_create (vm);
822   switch (rv)
823     {
824     case 0:
825       break;
826     default:
827       return clib_error_return (0, "server_create returned %d", rv);
828     }
829
830   return 0;
831 }
832
833 /* *INDENT-OFF* */
834 VLIB_CLI_COMMAND (proxy_create_command, static) =
835 {
836   .path = "test proxy server",
837   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
838       "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
839       "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
840       "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
841       "[private-segment-size <mem>][private-segment-count <nn>]",
842   .function = proxy_server_create_command_fn,
843 };
844 /* *INDENT-ON* */
845
846 clib_error_t *
847 proxy_main_init (vlib_main_t * vm)
848 {
849   proxy_main_t *pm = &proxy_main;
850   pm->server_client_index = ~0;
851   pm->active_open_client_index = ~0;
852   pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
853   pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
854
855   return 0;
856 }
857
858 VLIB_INIT_FUNCTION (proxy_main_init);
859
860 /*
861 * fd.io coding-style-patch-verification: ON
862 *
863 * Local Variables:
864 * eval: (c-set-style "gnu")
865 * End:
866 */