hsa: proxy session cleanup fixes
[vpp.git] / src / plugins / hs_apps / proxy.c
1 /*
2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <hs_apps/proxy.h>
21 #include <vnet/tcp/tcp.h>
22
23 proxy_main_t proxy_main;
24
25 #define TCP_MSS 1460
26
27 typedef struct
28 {
29   char uri[128];
30   u32 app_index;
31   u32 api_context;
32 } proxy_connect_args_t;
33
34 static void
35 proxy_cb_fn (void *data, u32 data_len)
36 {
37   proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
38   vnet_connect_args_t a;
39
40   memset (&a, 0, sizeof (a));
41   a.api_context = pa->api_context;
42   a.app_index = pa->app_index;
43   a.uri = pa->uri;
44   vnet_connect_uri (&a);
45 }
46
47 static void
48 proxy_call_main_thread (vnet_connect_args_t * a)
49 {
50   if (vlib_get_thread_index () == 0)
51     {
52       vnet_connect_uri (a);
53     }
54   else
55     {
56       proxy_connect_args_t args;
57       args.api_context = a->api_context;
58       args.app_index = a->app_index;
59       clib_memcpy (args.uri, a->uri, vec_len (a->uri));
60       vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
61     }
62 }
63
64 static proxy_session_t *
65 proxy_get_active_open (proxy_main_t * pm, session_handle_t handle)
66 {
67   proxy_session_t *ps = 0;
68   uword *p;
69
70   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
71   if (p)
72     ps = pool_elt_at_index (pm->sessions, p[0]);
73   return ps;
74 }
75
76 static proxy_session_t *
77 proxy_get_passive_open (proxy_main_t * pm, session_handle_t handle)
78 {
79   proxy_session_t *ps = 0;
80   uword *p;
81
82   p = hash_get (pm->proxy_session_by_server_handle, handle);
83   if (p)
84     ps = pool_elt_at_index (pm->sessions, p[0]);
85   return ps;
86 }
87
88 static void
89 proxy_try_close_session (session_t * s, int is_active_open)
90 {
91   proxy_main_t *pm = &proxy_main;
92   proxy_session_t *ps = 0;
93   vnet_disconnect_args_t _a, *a = &_a;
94   session_handle_t handle;
95
96   handle = session_handle (s);
97
98   clib_spinlock_lock_if_init (&pm->sessions_lock);
99
100   if (is_active_open)
101     {
102       ps = proxy_get_active_open (pm, handle);
103       ASSERT (ps != 0);
104
105       a->handle = ps->vpp_active_open_handle;
106       a->app_index = pm->active_open_app_index;
107       vnet_disconnect_session (a);
108       ps->ao_disconnected = 1;
109
110       if (!ps->po_disconnected)
111         {
112           ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE);
113           a->handle = ps->vpp_server_handle;
114           a->app_index = pm->server_app_index;
115           vnet_disconnect_session (a);
116           ps->po_disconnected = 1;
117         }
118     }
119   else
120     {
121       ps = proxy_get_passive_open (pm, handle);
122       ASSERT (ps != 0);
123
124       a->handle = ps->vpp_server_handle;
125       a->app_index = pm->server_app_index;
126       vnet_disconnect_session (a);
127       ps->po_disconnected = 1;
128
129       if (!ps->ao_disconnected && !ps->active_open_establishing)
130         {
131           /* Proxy session closed before active open */
132           if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)
133             {
134               a->handle = ps->vpp_active_open_handle;
135               a->app_index = pm->active_open_app_index;
136               vnet_disconnect_session (a);
137             }
138           ps->ao_disconnected = 1;
139         }
140     }
141   clib_spinlock_unlock_if_init (&pm->sessions_lock);
142 }
143
144 static void
145 proxy_session_free (proxy_session_t * ps)
146 {
147   proxy_main_t *pm = &proxy_main;
148   if (CLIB_DEBUG > 0)
149     clib_memset (ps, 0xFE, sizeof (*ps));
150   pool_put (pm->sessions, ps);
151 }
152
153 static void
154 proxy_try_delete_session (session_t * s, u8 is_active_open)
155 {
156   proxy_main_t *pm = &proxy_main;
157   proxy_session_t *ps = 0;
158   session_handle_t handle;
159
160   handle = session_handle (s);
161
162   clib_spinlock_lock_if_init (&pm->sessions_lock);
163
164   if (is_active_open)
165     {
166       ps = proxy_get_active_open (pm, handle);
167       ASSERT (ps != 0);
168
169       ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
170       hash_unset (pm->proxy_session_by_active_open_handle, handle);
171
172       if (ps->vpp_server_handle == SESSION_INVALID_HANDLE)
173         proxy_session_free (ps);
174     }
175   else
176     {
177       ps = proxy_get_passive_open (pm, handle);
178       ASSERT (ps != 0);
179
180       ps->vpp_server_handle = SESSION_INVALID_HANDLE;
181       hash_unset (pm->proxy_session_by_server_handle, handle);
182
183       if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
184         {
185           if (!ps->active_open_establishing)
186             proxy_session_free (ps);
187         }
188     }
189   clib_spinlock_unlock_if_init (&pm->sessions_lock);
190 }
191
192 static int
193 common_fifo_tuning_callback (session_t * s, svm_fifo_t * f,
194                              session_ft_action_t act, u32 bytes)
195 {
196   proxy_main_t *pm = &proxy_main;
197
198   segment_manager_t *sm = segment_manager_get (f->segment_manager);
199   fifo_segment_t *fs = segment_manager_get_segment (sm, f->segment_index);
200
201   u8 seg_usage = fifo_segment_get_mem_usage (fs);
202   u32 fifo_in_use = svm_fifo_max_dequeue_prod (f);
203   u32 fifo_size = svm_fifo_size (f);
204   u8 fifo_usage = fifo_in_use * 100 / fifo_size;
205   u8 update_size = 0;
206
207   ASSERT (act < SESSION_FT_ACTION_N_ACTIONS);
208
209   if (act == SESSION_FT_ACTION_ENQUEUED)
210     {
211       if (seg_usage < pm->low_watermark && fifo_usage > 50)
212         update_size = fifo_in_use;
213       else if (seg_usage < pm->high_watermark && fifo_usage > 80)
214         update_size = fifo_in_use;
215
216       update_size = clib_min (update_size, sm->max_fifo_size - fifo_size);
217       if (update_size)
218         svm_fifo_set_size (f, fifo_size + update_size);
219     }
220   else                          /* dequeued */
221     {
222       if (seg_usage > pm->high_watermark || fifo_usage < 20)
223         update_size = bytes;
224       else if (seg_usage > pm->low_watermark && fifo_usage < 50)
225         update_size = (bytes / 2);
226
227       ASSERT (fifo_size >= 4096);
228       update_size = clib_min (update_size, fifo_size - 4096);
229       if (update_size)
230         svm_fifo_set_size (f, fifo_size - update_size);
231     }
232
233   return 0;
234 }
235
236 static int
237 proxy_accept_callback (session_t * s)
238 {
239   proxy_main_t *pm = &proxy_main;
240   proxy_session_t *ps;
241
242   clib_spinlock_lock_if_init (&pm->sessions_lock);
243
244   pool_get_zero (pm->sessions, ps);
245   ps->vpp_server_handle = session_handle (s);
246   ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
247
248   hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
249             ps - pm->sessions);
250
251   clib_spinlock_unlock_if_init (&pm->sessions_lock);
252
253   s->session_state = SESSION_STATE_READY;
254
255   return 0;
256 }
257
258 static void
259 proxy_disconnect_callback (session_t * s)
260 {
261   proxy_try_close_session (s, 0 /* is_active_open */ );
262 }
263
264 static void
265 proxy_reset_callback (session_t * s)
266 {
267   proxy_try_close_session (s, 0 /* is_active_open */ );
268 }
269
270 static int
271 proxy_connected_callback (u32 app_index, u32 api_context,
272                           session_t * s, session_error_t err)
273 {
274   clib_warning ("called...");
275   return -1;
276 }
277
278 static int
279 proxy_add_segment_callback (u32 client_index, u64 segment_handle)
280 {
281   clib_warning ("called...");
282   return -1;
283 }
284
285 static int
286 proxy_rx_callback (session_t * s)
287 {
288   proxy_main_t *pm = &proxy_main;
289   u32 thread_index = vlib_get_thread_index ();
290   svm_fifo_t *ao_tx_fifo;
291   proxy_session_t *ps;
292
293   ASSERT (s->thread_index == thread_index);
294
295   clib_spinlock_lock_if_init (&pm->sessions_lock);
296
297   ps = proxy_get_passive_open (pm, session_handle (s));
298   ASSERT (ps != 0);
299
300   if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE))
301     {
302       clib_spinlock_unlock_if_init (&pm->sessions_lock);
303
304       ao_tx_fifo = s->rx_fifo;
305
306       /*
307        * Send event for active open tx fifo
308        */
309       if (svm_fifo_set_event (ao_tx_fifo))
310         {
311           u32 ao_thread_index = ao_tx_fifo->master_thread_index;
312           u32 ao_session_index = ao_tx_fifo->master_session_index;
313           if (session_send_io_evt_to_thread_custom (&ao_session_index,
314                                                     ao_thread_index,
315                                                     SESSION_IO_EVT_TX))
316             clib_warning ("failed to enqueue tx evt");
317         }
318
319       if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
320         svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
321     }
322   else
323     {
324       vnet_connect_args_t _a, *a = &_a;
325       svm_fifo_t *tx_fifo, *rx_fifo;
326       u32 max_dequeue, proxy_index;
327       int actual_transfer __attribute__ ((unused));
328
329       rx_fifo = s->rx_fifo;
330       tx_fifo = s->tx_fifo;
331
332       ASSERT (rx_fifo->master_thread_index == thread_index);
333       ASSERT (tx_fifo->master_thread_index == thread_index);
334
335       max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
336
337       if (PREDICT_FALSE (max_dequeue == 0))
338         return 0;
339
340       max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
341       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
342                                        max_dequeue, pm->rx_buf[thread_index]);
343
344       /* $$$ your message in this space: parse url, etc. */
345
346       clib_memset (a, 0, sizeof (*a));
347
348       ps->server_rx_fifo = rx_fifo;
349       ps->server_tx_fifo = tx_fifo;
350       ps->active_open_establishing = 1;
351       proxy_index = ps - pm->sessions;
352
353       clib_spinlock_unlock_if_init (&pm->sessions_lock);
354
355       a->uri = (char *) pm->client_uri;
356       a->api_context = proxy_index;
357       a->app_index = pm->active_open_app_index;
358       proxy_call_main_thread (a);
359     }
360
361   return 0;
362 }
363
364 static void
365 proxy_force_ack (void *handlep)
366 {
367   transport_connection_t *tc;
368   session_t *ao_s;
369
370   ao_s = session_get_from_handle (pointer_to_uword (handlep));
371   tc = session_get_transport (ao_s);
372   tcp_send_ack ((tcp_connection_t *) tc);
373 }
374
375 static int
376 proxy_tx_callback (session_t * proxy_s)
377 {
378   proxy_main_t *pm = &proxy_main;
379   proxy_session_t *ps;
380   u32 min_free;
381
382   min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
383   if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free)
384     {
385       svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
386       return 0;
387     }
388
389   clib_spinlock_lock_if_init (&pm->sessions_lock);
390
391   ps = proxy_get_passive_open (pm, session_handle (proxy_s));
392   ASSERT (ps != 0);
393
394   if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
395     return 0;
396
397   /* Force ack on active open side to update rcv wnd. Make sure it's done on
398    * the right thread */
399   void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *);
400   session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index,
401                                   proxy_force_ack, arg);
402
403   clib_spinlock_unlock_if_init (&pm->sessions_lock);
404
405   return 0;
406 }
407
408 static void
409 proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
410 {
411   if (ntf == SESSION_CLEANUP_TRANSPORT)
412     return;
413
414   proxy_try_delete_session (s, 0 /* is_active_open */ );
415 }
416
417 static session_cb_vft_t proxy_session_cb_vft = {
418   .session_accept_callback = proxy_accept_callback,
419   .session_disconnect_callback = proxy_disconnect_callback,
420   .session_connected_callback = proxy_connected_callback,
421   .add_segment_callback = proxy_add_segment_callback,
422   .builtin_app_rx_callback = proxy_rx_callback,
423   .builtin_app_tx_callback = proxy_tx_callback,
424   .session_reset_callback = proxy_reset_callback,
425   .session_cleanup_callback = proxy_cleanup_callback,
426   .fifo_tuning_callback = common_fifo_tuning_callback
427 };
428
429 static int
430 active_open_connected_callback (u32 app_index, u32 opaque,
431                                 session_t * s, session_error_t err)
432 {
433   proxy_main_t *pm = &proxy_main;
434   proxy_session_t *ps;
435   u8 thread_index = vlib_get_thread_index ();
436
437   if (err)
438     {
439       clib_warning ("connection %d failed!", opaque);
440       ASSERT (0);
441       return 0;
442     }
443
444   /*
445    * Setup proxy session handle.
446    */
447   clib_spinlock_lock_if_init (&pm->sessions_lock);
448
449   ps = pool_elt_at_index (pm->sessions, opaque);
450   ps->vpp_active_open_handle = session_handle (s);
451   ps->active_open_establishing = 0;
452
453   /* Passive open session was already closed! */
454   if (ps->po_disconnected)
455     {
456       /* Setup everything for the cleanup notification */
457       hash_set (pm->proxy_session_by_active_open_handle,
458                 ps->vpp_active_open_handle, opaque);
459       ps->ao_disconnected = 1;
460       clib_spinlock_unlock_if_init (&pm->sessions_lock);
461       return -1;
462     }
463
464   s->tx_fifo = ps->server_rx_fifo;
465   s->rx_fifo = ps->server_tx_fifo;
466
467   /*
468    * Reset the active-open tx-fifo master indices so the active-open session
469    * will receive data, etc.
470    */
471   s->tx_fifo->master_session_index = s->session_index;
472   s->tx_fifo->master_thread_index = s->thread_index;
473
474   /*
475    * Account for the active-open session's use of the fifos
476    * so they won't disappear until the last session which uses
477    * them disappears
478    */
479   s->tx_fifo->refcnt++;
480   s->rx_fifo->refcnt++;
481
482   svm_fifo_init_ooo_lookup (s->tx_fifo, 1 /* deq ooo */ );
483   svm_fifo_init_ooo_lookup (s->rx_fifo, 0 /* enq ooo */ );
484
485   hash_set (pm->proxy_session_by_active_open_handle,
486             ps->vpp_active_open_handle, opaque);
487
488   clib_spinlock_unlock_if_init (&pm->sessions_lock);
489
490   /*
491    * Send event for active open tx fifo
492    */
493   ASSERT (s->thread_index == thread_index);
494   if (svm_fifo_set_event (s->tx_fifo))
495     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
496
497   return 0;
498 }
499
500 static void
501 active_open_reset_callback (session_t * s)
502 {
503   proxy_try_close_session (s, 1 /* is_active_open */ );
504 }
505
506 static int
507 active_open_create_callback (session_t * s)
508 {
509   return 0;
510 }
511
512 static void
513 active_open_disconnect_callback (session_t * s)
514 {
515   proxy_try_close_session (s, 1 /* is_active_open */ );
516 }
517
518 static int
519 active_open_rx_callback (session_t * s)
520 {
521   svm_fifo_t *proxy_tx_fifo;
522
523   proxy_tx_fifo = s->rx_fifo;
524
525   /*
526    * Send event for server tx fifo
527    */
528   if (svm_fifo_set_event (proxy_tx_fifo))
529     {
530       u8 thread_index = proxy_tx_fifo->master_thread_index;
531       u32 session_index = proxy_tx_fifo->master_session_index;
532       return session_send_io_evt_to_thread_custom (&session_index,
533                                                    thread_index,
534                                                    SESSION_IO_EVT_TX);
535     }
536
537   if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS)
538     svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
539
540   return 0;
541 }
542
543 static int
544 active_open_tx_callback (session_t * ao_s)
545 {
546   proxy_main_t *pm = &proxy_main;
547   transport_connection_t *tc;
548   session_handle_t handle;
549   proxy_session_t *ps;
550   session_t *proxy_s;
551   u32 min_free;
552   uword *p;
553
554   min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
555   if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free)
556     {
557       svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
558       return 0;
559     }
560
561   clib_spinlock_lock_if_init (&pm->sessions_lock);
562
563   handle = session_handle (ao_s);
564   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
565   if (!p)
566     return 0;
567
568   if (pool_is_free_index (pm->sessions, p[0]))
569     return 0;
570
571   ps = pool_elt_at_index (pm->sessions, p[0]);
572   if (ps->vpp_server_handle == ~0)
573     return 0;
574
575   proxy_s = session_get_from_handle (ps->vpp_server_handle);
576
577   /* Force ack on proxy side to update rcv wnd */
578   tc = session_get_transport (proxy_s);
579   tcp_send_ack ((tcp_connection_t *) tc);
580
581   clib_spinlock_unlock_if_init (&pm->sessions_lock);
582
583   return 0;
584 }
585
586 static void
587 active_open_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
588 {
589   if (ntf == SESSION_CLEANUP_TRANSPORT)
590     return;
591
592   proxy_try_delete_session (s, 1 /* is_active_open */ );
593 }
594
595 /* *INDENT-OFF* */
596 static session_cb_vft_t active_open_clients = {
597   .session_reset_callback = active_open_reset_callback,
598   .session_connected_callback = active_open_connected_callback,
599   .session_accept_callback = active_open_create_callback,
600   .session_disconnect_callback = active_open_disconnect_callback,
601   .session_cleanup_callback = active_open_cleanup_callback,
602   .builtin_app_rx_callback = active_open_rx_callback,
603   .builtin_app_tx_callback = active_open_tx_callback,
604   .fifo_tuning_callback = common_fifo_tuning_callback
605 };
606 /* *INDENT-ON* */
607
608 static int
609 proxy_server_attach ()
610 {
611   proxy_main_t *pm = &proxy_main;
612   u64 options[APP_OPTIONS_N_OPTIONS];
613   vnet_app_attach_args_t _a, *a = &_a;
614   u32 segment_size = 512 << 20;
615
616   clib_memset (a, 0, sizeof (*a));
617   clib_memset (options, 0, sizeof (options));
618
619   if (pm->private_segment_size)
620     segment_size = pm->private_segment_size;
621   a->name = format (0, "proxy-server");
622   a->api_client_index = pm->server_client_index;
623   a->session_cb_vft = &proxy_session_cb_vft;
624   a->options = options;
625   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
626   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
627   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
628   a->options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
629   a->options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
630   a->options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
631   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
632   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
633     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
634
635   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
636
637   if (vnet_application_attach (a))
638     {
639       clib_warning ("failed to attach server");
640       return -1;
641     }
642   pm->server_app_index = a->app_index;
643
644   vec_free (a->name);
645   return 0;
646 }
647
648 static int
649 active_open_attach (void)
650 {
651   proxy_main_t *pm = &proxy_main;
652   vnet_app_attach_args_t _a, *a = &_a;
653   u64 options[APP_OPTIONS_N_OPTIONS];
654
655   clib_memset (a, 0, sizeof (*a));
656   clib_memset (options, 0, sizeof (options));
657
658   a->api_client_index = pm->active_open_client_index;
659   a->session_cb_vft = &active_open_clients;
660   a->name = format (0, "proxy-active-open");
661
662   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
663   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
664   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
665   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
666   options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
667   options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
668   options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
669   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
670   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
671     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
672
673   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
674     | APP_OPTIONS_FLAGS_IS_PROXY;
675
676   a->options = options;
677
678   if (vnet_application_attach (a))
679     return -1;
680
681   pm->active_open_app_index = a->app_index;
682
683   vec_free (a->name);
684
685   return 0;
686 }
687
688 static int
689 proxy_server_listen ()
690 {
691   proxy_main_t *pm = &proxy_main;
692   vnet_listen_args_t _a, *a = &_a;
693   clib_memset (a, 0, sizeof (*a));
694   a->app_index = pm->server_app_index;
695   a->uri = (char *) pm->server_uri;
696   return vnet_bind_uri (a);
697 }
698
699 static int
700 proxy_server_create (vlib_main_t * vm)
701 {
702   proxy_main_t *pm = &proxy_main;
703   vlib_thread_main_t *vtm = vlib_get_thread_main ();
704   u32 num_threads;
705   int i;
706
707   num_threads = 1 /* main thread */  + vtm->n_threads;
708   vec_validate (proxy_main.server_event_queue, num_threads - 1);
709   vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
710   vec_validate (pm->rx_buf, num_threads - 1);
711
712   for (i = 0; i < num_threads; i++)
713     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
714
715   if (proxy_server_attach ())
716     {
717       clib_warning ("failed to attach server app");
718       return -1;
719     }
720   if (proxy_server_listen ())
721     {
722       clib_warning ("failed to start listening");
723       return -1;
724     }
725   if (active_open_attach ())
726     {
727       clib_warning ("failed to attach active open app");
728       return -1;
729     }
730
731   for (i = 0; i < num_threads; i++)
732     {
733       pm->active_open_event_queue[i] = session_main_get_vpp_event_queue (i);
734
735       ASSERT (pm->active_open_event_queue[i]);
736
737       pm->server_event_queue[i] = session_main_get_vpp_event_queue (i);
738     }
739
740   return 0;
741 }
742
743 static clib_error_t *
744 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
745                                 vlib_cli_command_t * cmd)
746 {
747   proxy_main_t *pm = &proxy_main;
748   char *default_server_uri = "tcp://0.0.0.0/23";
749   char *default_client_uri = "tcp://6.0.2.2/23";
750   int rv, tmp32;
751   u64 tmp64;
752
753   pm->fifo_size = 64 << 10;
754   pm->max_fifo_size = 128 << 20;
755   pm->high_watermark = 80;
756   pm->low_watermark = 50;
757   pm->rcv_buffer_size = 1024;
758   pm->prealloc_fifos = 0;
759   pm->private_segment_count = 0;
760   pm->private_segment_size = 0;
761   pm->server_uri = 0;
762   pm->client_uri = 0;
763   if (vlib_num_workers ())
764     clib_spinlock_init (&pm->sessions_lock);
765
766   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
767     {
768       if (unformat (input, "fifo-size %U",
769                     unformat_memory_size, &pm->fifo_size))
770         ;
771       else if (unformat (input, "max-fifo-size %U",
772                          unformat_memory_size, &pm->max_fifo_size))
773         ;
774       else if (unformat (input, "high-watermark %d", &tmp32))
775         pm->high_watermark = (u8) tmp32;
776       else if (unformat (input, "low-watermark %d", &tmp32))
777         pm->low_watermark = (u8) tmp32;
778       else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
779         ;
780       else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
781         ;
782       else if (unformat (input, "private-segment-count %d",
783                          &pm->private_segment_count))
784         ;
785       else if (unformat (input, "private-segment-size %U",
786                          unformat_memory_size, &tmp64))
787         {
788           if (tmp64 >= 0x100000000ULL)
789             return clib_error_return
790               (0, "private segment size %lld (%llu) too large", tmp64, tmp64);
791           pm->private_segment_size = tmp64;
792         }
793       else if (unformat (input, "server-uri %s", &pm->server_uri))
794         vec_add1 (pm->server_uri, 0);
795       else if (unformat (input, "client-uri %s", &pm->client_uri))
796         vec_add1 (pm->client_uri, 0);
797       else
798         return clib_error_return (0, "unknown input `%U'",
799                                   format_unformat_error, input);
800     }
801
802   if (!pm->server_uri)
803     {
804       clib_warning ("No server-uri provided, Using default: %s",
805                     default_server_uri);
806       pm->server_uri = format (0, "%s%c", default_server_uri, 0);
807     }
808   if (!pm->client_uri)
809     {
810       clib_warning ("No client-uri provided, Using default: %s",
811                     default_client_uri);
812       pm->client_uri = format (0, "%s%c", default_client_uri, 0);
813     }
814
815   vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
816
817   rv = proxy_server_create (vm);
818   switch (rv)
819     {
820     case 0:
821       break;
822     default:
823       return clib_error_return (0, "server_create returned %d", rv);
824     }
825
826   return 0;
827 }
828
829 /* *INDENT-OFF* */
830 VLIB_CLI_COMMAND (proxy_create_command, static) =
831 {
832   .path = "test proxy server",
833   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
834       "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
835       "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
836       "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
837       "[private-segment-size <mem>][private-segment-count <nn>]",
838   .function = proxy_server_create_command_fn,
839 };
840 /* *INDENT-ON* */
841
842 clib_error_t *
843 proxy_main_init (vlib_main_t * vm)
844 {
845   proxy_main_t *pm = &proxy_main;
846   pm->server_client_index = ~0;
847   pm->active_open_client_index = ~0;
848   pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
849   pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
850
851   return 0;
852 }
853
854 VLIB_INIT_FUNCTION (proxy_main_init);
855
856 /*
857 * fd.io coding-style-patch-verification: ON
858 *
859 * Local Variables:
860 * eval: (c-set-style "gnu")
861 * End:
862 */