hsa: fix handling active connection failures in the proxy
[vpp.git] / src / plugins / hs_apps / proxy.c
1 /*
2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <vnet/vnet.h>
17 #include <vlibmemory/api.h>
18 #include <vnet/session/application.h>
19 #include <vnet/session/application_interface.h>
20 #include <hs_apps/proxy.h>
21 #include <vnet/tcp/tcp.h>
22
23 proxy_main_t proxy_main;
24
25 #define TCP_MSS 1460
26
27 typedef struct
28 {
29   char uri[128];
30   u32 app_index;
31   u32 api_context;
32 } proxy_connect_args_t;
33
34 static void
35 proxy_cb_fn (void *data, u32 data_len)
36 {
37   proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
38   vnet_connect_args_t a;
39
40   memset (&a, 0, sizeof (a));
41   a.api_context = pa->api_context;
42   a.app_index = pa->app_index;
43   a.uri = pa->uri;
44   vnet_connect_uri (&a);
45 }
46
47 static void
48 proxy_call_main_thread (vnet_connect_args_t * a)
49 {
50   if (vlib_get_thread_index () == 0)
51     {
52       vnet_connect_uri (a);
53     }
54   else
55     {
56       proxy_connect_args_t args;
57       args.api_context = a->api_context;
58       args.app_index = a->app_index;
59       clib_memcpy (args.uri, a->uri, vec_len (a->uri));
60       vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
61     }
62 }
63
64 static proxy_session_t *
65 proxy_get_active_open (proxy_main_t * pm, session_handle_t handle)
66 {
67   proxy_session_t *ps = 0;
68   uword *p;
69
70   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
71   if (p)
72     ps = pool_elt_at_index (pm->sessions, p[0]);
73   return ps;
74 }
75
76 static proxy_session_t *
77 proxy_get_passive_open (proxy_main_t * pm, session_handle_t handle)
78 {
79   proxy_session_t *ps = 0;
80   uword *p;
81
82   p = hash_get (pm->proxy_session_by_server_handle, handle);
83   if (p)
84     ps = pool_elt_at_index (pm->sessions, p[0]);
85   return ps;
86 }
87
88 static void
89 proxy_try_close_session (session_t * s, int is_active_open)
90 {
91   proxy_main_t *pm = &proxy_main;
92   proxy_session_t *ps = 0;
93   vnet_disconnect_args_t _a, *a = &_a;
94   session_handle_t handle;
95
96   handle = session_handle (s);
97
98   clib_spinlock_lock_if_init (&pm->sessions_lock);
99
100   if (is_active_open)
101     {
102       ps = proxy_get_active_open (pm, handle);
103       ASSERT (ps != 0);
104
105       a->handle = ps->vpp_active_open_handle;
106       a->app_index = pm->active_open_app_index;
107       vnet_disconnect_session (a);
108       ps->ao_disconnected = 1;
109
110       if (!ps->po_disconnected)
111         {
112           ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE);
113           a->handle = ps->vpp_server_handle;
114           a->app_index = pm->server_app_index;
115           vnet_disconnect_session (a);
116           ps->po_disconnected = 1;
117         }
118     }
119   else
120     {
121       ps = proxy_get_passive_open (pm, handle);
122       ASSERT (ps != 0);
123
124       a->handle = ps->vpp_server_handle;
125       a->app_index = pm->server_app_index;
126       vnet_disconnect_session (a);
127       ps->po_disconnected = 1;
128
129       if (!ps->ao_disconnected && !ps->active_open_establishing)
130         {
131           /* Proxy session closed before active open */
132           if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)
133             {
134               a->handle = ps->vpp_active_open_handle;
135               a->app_index = pm->active_open_app_index;
136               vnet_disconnect_session (a);
137             }
138           ps->ao_disconnected = 1;
139         }
140     }
141   clib_spinlock_unlock_if_init (&pm->sessions_lock);
142 }
143
144 static void
145 proxy_session_free (proxy_session_t * ps)
146 {
147   proxy_main_t *pm = &proxy_main;
148   if (CLIB_DEBUG > 0)
149     clib_memset (ps, 0xFE, sizeof (*ps));
150   pool_put (pm->sessions, ps);
151 }
152
153 static void
154 proxy_try_delete_session (session_t * s, u8 is_active_open)
155 {
156   proxy_main_t *pm = &proxy_main;
157   proxy_session_t *ps = 0;
158   session_handle_t handle;
159
160   handle = session_handle (s);
161
162   clib_spinlock_lock_if_init (&pm->sessions_lock);
163
164   if (is_active_open)
165     {
166       ps = proxy_get_active_open (pm, handle);
167       ASSERT (ps != 0);
168
169       ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
170       hash_unset (pm->proxy_session_by_active_open_handle, handle);
171
172       if (ps->vpp_server_handle == SESSION_INVALID_HANDLE)
173         proxy_session_free (ps);
174     }
175   else
176     {
177       ps = proxy_get_passive_open (pm, handle);
178       ASSERT (ps != 0);
179
180       ps->vpp_server_handle = SESSION_INVALID_HANDLE;
181       hash_unset (pm->proxy_session_by_server_handle, handle);
182
183       if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
184         {
185           if (!ps->active_open_establishing)
186             proxy_session_free (ps);
187         }
188     }
189   clib_spinlock_unlock_if_init (&pm->sessions_lock);
190 }
191
192 static int
193 common_fifo_tuning_callback (session_t * s, svm_fifo_t * f,
194                              session_ft_action_t act, u32 bytes)
195 {
196   proxy_main_t *pm = &proxy_main;
197
198   segment_manager_t *sm = segment_manager_get (f->segment_manager);
199   fifo_segment_t *fs = segment_manager_get_segment (sm, f->segment_index);
200
201   u8 seg_usage = fifo_segment_get_mem_usage (fs);
202   u32 fifo_in_use = svm_fifo_max_dequeue_prod (f);
203   u32 fifo_size = svm_fifo_size (f);
204   u8 fifo_usage = fifo_in_use * 100 / fifo_size;
205   u8 update_size = 0;
206
207   ASSERT (act < SESSION_FT_ACTION_N_ACTIONS);
208
209   if (act == SESSION_FT_ACTION_ENQUEUED)
210     {
211       if (seg_usage < pm->low_watermark && fifo_usage > 50)
212         update_size = fifo_in_use;
213       else if (seg_usage < pm->high_watermark && fifo_usage > 80)
214         update_size = fifo_in_use;
215
216       update_size = clib_min (update_size, sm->max_fifo_size - fifo_size);
217       if (update_size)
218         svm_fifo_set_size (f, fifo_size + update_size);
219     }
220   else                          /* dequeued */
221     {
222       if (seg_usage > pm->high_watermark || fifo_usage < 20)
223         update_size = bytes;
224       else if (seg_usage > pm->low_watermark && fifo_usage < 50)
225         update_size = (bytes / 2);
226
227       ASSERT (fifo_size >= 4096);
228       update_size = clib_min (update_size, fifo_size - 4096);
229       if (update_size)
230         svm_fifo_set_size (f, fifo_size - update_size);
231     }
232
233   return 0;
234 }
235
236 static int
237 proxy_accept_callback (session_t * s)
238 {
239   proxy_main_t *pm = &proxy_main;
240   proxy_session_t *ps;
241
242   clib_spinlock_lock_if_init (&pm->sessions_lock);
243
244   pool_get_zero (pm->sessions, ps);
245   ps->vpp_server_handle = session_handle (s);
246   ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
247
248   hash_set (pm->proxy_session_by_server_handle, ps->vpp_server_handle,
249             ps - pm->sessions);
250
251   clib_spinlock_unlock_if_init (&pm->sessions_lock);
252
253   s->session_state = SESSION_STATE_READY;
254
255   return 0;
256 }
257
258 static void
259 proxy_disconnect_callback (session_t * s)
260 {
261   proxy_try_close_session (s, 0 /* is_active_open */ );
262 }
263
264 static void
265 proxy_reset_callback (session_t * s)
266 {
267   proxy_try_close_session (s, 0 /* is_active_open */ );
268 }
269
270 static int
271 proxy_connected_callback (u32 app_index, u32 api_context,
272                           session_t * s, session_error_t err)
273 {
274   clib_warning ("called...");
275   return -1;
276 }
277
278 static int
279 proxy_add_segment_callback (u32 client_index, u64 segment_handle)
280 {
281   clib_warning ("called...");
282   return -1;
283 }
284
285 static int
286 proxy_rx_callback (session_t * s)
287 {
288   proxy_main_t *pm = &proxy_main;
289   u32 thread_index = vlib_get_thread_index ();
290   svm_fifo_t *ao_tx_fifo;
291   proxy_session_t *ps;
292
293   ASSERT (s->thread_index == thread_index);
294
295   clib_spinlock_lock_if_init (&pm->sessions_lock);
296
297   ps = proxy_get_passive_open (pm, session_handle (s));
298   ASSERT (ps != 0);
299
300   if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE))
301     {
302       clib_spinlock_unlock_if_init (&pm->sessions_lock);
303
304       ao_tx_fifo = s->rx_fifo;
305
306       /*
307        * Send event for active open tx fifo
308        */
309       if (svm_fifo_set_event (ao_tx_fifo))
310         {
311           u32 ao_thread_index = ao_tx_fifo->master_thread_index;
312           u32 ao_session_index = ao_tx_fifo->master_session_index;
313           if (session_send_io_evt_to_thread_custom (&ao_session_index,
314                                                     ao_thread_index,
315                                                     SESSION_IO_EVT_TX))
316             clib_warning ("failed to enqueue tx evt");
317         }
318
319       if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
320         svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
321     }
322   else
323     {
324       vnet_connect_args_t _a, *a = &_a;
325       svm_fifo_t *tx_fifo, *rx_fifo;
326       u32 max_dequeue, proxy_index;
327       int actual_transfer __attribute__ ((unused));
328
329       rx_fifo = s->rx_fifo;
330       tx_fifo = s->tx_fifo;
331
332       ASSERT (rx_fifo->master_thread_index == thread_index);
333       ASSERT (tx_fifo->master_thread_index == thread_index);
334
335       max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
336
337       if (PREDICT_FALSE (max_dequeue == 0))
338         return 0;
339
340       max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
341       actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
342                                        max_dequeue, pm->rx_buf[thread_index]);
343
344       /* $$$ your message in this space: parse url, etc. */
345
346       clib_memset (a, 0, sizeof (*a));
347
348       ps->server_rx_fifo = rx_fifo;
349       ps->server_tx_fifo = tx_fifo;
350       ps->active_open_establishing = 1;
351       proxy_index = ps - pm->sessions;
352
353       clib_spinlock_unlock_if_init (&pm->sessions_lock);
354
355       a->uri = (char *) pm->client_uri;
356       a->api_context = proxy_index;
357       a->app_index = pm->active_open_app_index;
358       proxy_call_main_thread (a);
359     }
360
361   return 0;
362 }
363
364 static void
365 proxy_force_ack (void *handlep)
366 {
367   transport_connection_t *tc;
368   session_t *ao_s;
369
370   ao_s = session_get_from_handle (pointer_to_uword (handlep));
371   tc = session_get_transport (ao_s);
372   tcp_send_ack ((tcp_connection_t *) tc);
373 }
374
375 static int
376 proxy_tx_callback (session_t * proxy_s)
377 {
378   proxy_main_t *pm = &proxy_main;
379   proxy_session_t *ps;
380   u32 min_free;
381
382   min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
383   if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free)
384     {
385       svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
386       return 0;
387     }
388
389   clib_spinlock_lock_if_init (&pm->sessions_lock);
390
391   ps = proxy_get_passive_open (pm, session_handle (proxy_s));
392   ASSERT (ps != 0);
393
394   if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
395     return 0;
396
397   /* Force ack on active open side to update rcv wnd. Make sure it's done on
398    * the right thread */
399   void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *);
400   session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index,
401                                   proxy_force_ack, arg);
402
403   clib_spinlock_unlock_if_init (&pm->sessions_lock);
404
405   return 0;
406 }
407
408 static void
409 proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
410 {
411   if (ntf == SESSION_CLEANUP_TRANSPORT)
412     return;
413
414   proxy_try_delete_session (s, 0 /* is_active_open */ );
415 }
416
417 static session_cb_vft_t proxy_session_cb_vft = {
418   .session_accept_callback = proxy_accept_callback,
419   .session_disconnect_callback = proxy_disconnect_callback,
420   .session_connected_callback = proxy_connected_callback,
421   .add_segment_callback = proxy_add_segment_callback,
422   .builtin_app_rx_callback = proxy_rx_callback,
423   .builtin_app_tx_callback = proxy_tx_callback,
424   .session_reset_callback = proxy_reset_callback,
425   .session_cleanup_callback = proxy_cleanup_callback,
426   .fifo_tuning_callback = common_fifo_tuning_callback
427 };
428
429 static int
430 active_open_connected_callback (u32 app_index, u32 opaque,
431                                 session_t * s, session_error_t err)
432 {
433   proxy_main_t *pm = &proxy_main;
434   proxy_session_t *ps;
435   u8 thread_index = vlib_get_thread_index ();
436
437   /*
438    * Setup proxy session handle.
439    */
440   clib_spinlock_lock_if_init (&pm->sessions_lock);
441
442   ps = pool_elt_at_index (pm->sessions, opaque);
443
444   /* Connection failed */
445   if (err)
446     {
447       vnet_disconnect_args_t _a, *a = &_a;
448
449       a->handle = ps->vpp_server_handle;
450       a->app_index = pm->server_app_index;
451       vnet_disconnect_session (a);
452       ps->po_disconnected = 1;
453     }
454   else
455     {
456       ps->vpp_active_open_handle = session_handle (s);
457       ps->active_open_establishing = 0;
458     }
459
460   /* Passive open session was already closed! */
461   if (ps->po_disconnected)
462     {
463       /* Setup everything for the cleanup notification */
464       hash_set (pm->proxy_session_by_active_open_handle,
465                 ps->vpp_active_open_handle, opaque);
466       ps->ao_disconnected = 1;
467       clib_spinlock_unlock_if_init (&pm->sessions_lock);
468       return -1;
469     }
470
471   s->tx_fifo = ps->server_rx_fifo;
472   s->rx_fifo = ps->server_tx_fifo;
473
474   /*
475    * Reset the active-open tx-fifo master indices so the active-open session
476    * will receive data, etc.
477    */
478   s->tx_fifo->master_session_index = s->session_index;
479   s->tx_fifo->master_thread_index = s->thread_index;
480
481   /*
482    * Account for the active-open session's use of the fifos
483    * so they won't disappear until the last session which uses
484    * them disappears
485    */
486   s->tx_fifo->refcnt++;
487   s->rx_fifo->refcnt++;
488
489   svm_fifo_init_ooo_lookup (s->tx_fifo, 1 /* deq ooo */ );
490   svm_fifo_init_ooo_lookup (s->rx_fifo, 0 /* enq ooo */ );
491
492   hash_set (pm->proxy_session_by_active_open_handle,
493             ps->vpp_active_open_handle, opaque);
494
495   clib_spinlock_unlock_if_init (&pm->sessions_lock);
496
497   /*
498    * Send event for active open tx fifo
499    */
500   ASSERT (s->thread_index == thread_index);
501   if (svm_fifo_set_event (s->tx_fifo))
502     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
503
504   return 0;
505 }
506
507 static void
508 active_open_reset_callback (session_t * s)
509 {
510   proxy_try_close_session (s, 1 /* is_active_open */ );
511 }
512
513 static int
514 active_open_create_callback (session_t * s)
515 {
516   return 0;
517 }
518
519 static void
520 active_open_disconnect_callback (session_t * s)
521 {
522   proxy_try_close_session (s, 1 /* is_active_open */ );
523 }
524
525 static int
526 active_open_rx_callback (session_t * s)
527 {
528   svm_fifo_t *proxy_tx_fifo;
529
530   proxy_tx_fifo = s->rx_fifo;
531
532   /*
533    * Send event for server tx fifo
534    */
535   if (svm_fifo_set_event (proxy_tx_fifo))
536     {
537       u8 thread_index = proxy_tx_fifo->master_thread_index;
538       u32 session_index = proxy_tx_fifo->master_session_index;
539       return session_send_io_evt_to_thread_custom (&session_index,
540                                                    thread_index,
541                                                    SESSION_IO_EVT_TX);
542     }
543
544   if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS)
545     svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
546
547   return 0;
548 }
549
550 static int
551 active_open_tx_callback (session_t * ao_s)
552 {
553   proxy_main_t *pm = &proxy_main;
554   transport_connection_t *tc;
555   session_handle_t handle;
556   proxy_session_t *ps;
557   session_t *proxy_s;
558   u32 min_free;
559   uword *p;
560
561   min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
562   if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free)
563     {
564       svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
565       return 0;
566     }
567
568   clib_spinlock_lock_if_init (&pm->sessions_lock);
569
570   handle = session_handle (ao_s);
571   p = hash_get (pm->proxy_session_by_active_open_handle, handle);
572   if (!p)
573     return 0;
574
575   if (pool_is_free_index (pm->sessions, p[0]))
576     return 0;
577
578   ps = pool_elt_at_index (pm->sessions, p[0]);
579   if (ps->vpp_server_handle == ~0)
580     return 0;
581
582   proxy_s = session_get_from_handle (ps->vpp_server_handle);
583
584   /* Force ack on proxy side to update rcv wnd */
585   tc = session_get_transport (proxy_s);
586   tcp_send_ack ((tcp_connection_t *) tc);
587
588   clib_spinlock_unlock_if_init (&pm->sessions_lock);
589
590   return 0;
591 }
592
593 static void
594 active_open_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
595 {
596   if (ntf == SESSION_CLEANUP_TRANSPORT)
597     return;
598
599   proxy_try_delete_session (s, 1 /* is_active_open */ );
600 }
601
602 /* *INDENT-OFF* */
603 static session_cb_vft_t active_open_clients = {
604   .session_reset_callback = active_open_reset_callback,
605   .session_connected_callback = active_open_connected_callback,
606   .session_accept_callback = active_open_create_callback,
607   .session_disconnect_callback = active_open_disconnect_callback,
608   .session_cleanup_callback = active_open_cleanup_callback,
609   .builtin_app_rx_callback = active_open_rx_callback,
610   .builtin_app_tx_callback = active_open_tx_callback,
611   .fifo_tuning_callback = common_fifo_tuning_callback
612 };
613 /* *INDENT-ON* */
614
615 static int
616 proxy_server_attach ()
617 {
618   proxy_main_t *pm = &proxy_main;
619   u64 options[APP_OPTIONS_N_OPTIONS];
620   vnet_app_attach_args_t _a, *a = &_a;
621   u32 segment_size = 512 << 20;
622
623   clib_memset (a, 0, sizeof (*a));
624   clib_memset (options, 0, sizeof (options));
625
626   if (pm->private_segment_size)
627     segment_size = pm->private_segment_size;
628   a->name = format (0, "proxy-server");
629   a->api_client_index = pm->server_client_index;
630   a->session_cb_vft = &proxy_session_cb_vft;
631   a->options = options;
632   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
633   a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
634   a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
635   a->options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
636   a->options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
637   a->options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
638   a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
639   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
640     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
641
642   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
643
644   if (vnet_application_attach (a))
645     {
646       clib_warning ("failed to attach server");
647       return -1;
648     }
649   pm->server_app_index = a->app_index;
650
651   vec_free (a->name);
652   return 0;
653 }
654
655 static int
656 active_open_attach (void)
657 {
658   proxy_main_t *pm = &proxy_main;
659   vnet_app_attach_args_t _a, *a = &_a;
660   u64 options[APP_OPTIONS_N_OPTIONS];
661
662   clib_memset (a, 0, sizeof (*a));
663   clib_memset (options, 0, sizeof (options));
664
665   a->api_client_index = pm->active_open_client_index;
666   a->session_cb_vft = &active_open_clients;
667   a->name = format (0, "proxy-active-open");
668
669   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
670   options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
671   options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
672   options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
673   options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
674   options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
675   options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
676   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
677   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
678     pm->prealloc_fifos ? pm->prealloc_fifos : 0;
679
680   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
681     | APP_OPTIONS_FLAGS_IS_PROXY;
682
683   a->options = options;
684
685   if (vnet_application_attach (a))
686     return -1;
687
688   pm->active_open_app_index = a->app_index;
689
690   vec_free (a->name);
691
692   return 0;
693 }
694
695 static int
696 proxy_server_listen ()
697 {
698   proxy_main_t *pm = &proxy_main;
699   vnet_listen_args_t _a, *a = &_a;
700   clib_memset (a, 0, sizeof (*a));
701   a->app_index = pm->server_app_index;
702   a->uri = (char *) pm->server_uri;
703   return vnet_bind_uri (a);
704 }
705
706 static int
707 proxy_server_create (vlib_main_t * vm)
708 {
709   proxy_main_t *pm = &proxy_main;
710   vlib_thread_main_t *vtm = vlib_get_thread_main ();
711   u32 num_threads;
712   int i;
713
714   num_threads = 1 /* main thread */  + vtm->n_threads;
715   vec_validate (proxy_main.server_event_queue, num_threads - 1);
716   vec_validate (proxy_main.active_open_event_queue, num_threads - 1);
717   vec_validate (pm->rx_buf, num_threads - 1);
718
719   for (i = 0; i < num_threads; i++)
720     vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
721
722   if (proxy_server_attach ())
723     {
724       clib_warning ("failed to attach server app");
725       return -1;
726     }
727   if (proxy_server_listen ())
728     {
729       clib_warning ("failed to start listening");
730       return -1;
731     }
732   if (active_open_attach ())
733     {
734       clib_warning ("failed to attach active open app");
735       return -1;
736     }
737
738   for (i = 0; i < num_threads; i++)
739     {
740       pm->active_open_event_queue[i] = session_main_get_vpp_event_queue (i);
741
742       ASSERT (pm->active_open_event_queue[i]);
743
744       pm->server_event_queue[i] = session_main_get_vpp_event_queue (i);
745     }
746
747   return 0;
748 }
749
750 static clib_error_t *
751 proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
752                                 vlib_cli_command_t * cmd)
753 {
754   proxy_main_t *pm = &proxy_main;
755   char *default_server_uri = "tcp://0.0.0.0/23";
756   char *default_client_uri = "tcp://6.0.2.2/23";
757   int rv, tmp32;
758   u64 tmp64;
759
760   pm->fifo_size = 64 << 10;
761   pm->max_fifo_size = 128 << 20;
762   pm->high_watermark = 80;
763   pm->low_watermark = 50;
764   pm->rcv_buffer_size = 1024;
765   pm->prealloc_fifos = 0;
766   pm->private_segment_count = 0;
767   pm->private_segment_size = 0;
768   pm->server_uri = 0;
769   pm->client_uri = 0;
770   if (vlib_num_workers ())
771     clib_spinlock_init (&pm->sessions_lock);
772
773   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
774     {
775       if (unformat (input, "fifo-size %U",
776                     unformat_memory_size, &pm->fifo_size))
777         ;
778       else if (unformat (input, "max-fifo-size %U",
779                          unformat_memory_size, &pm->max_fifo_size))
780         ;
781       else if (unformat (input, "high-watermark %d", &tmp32))
782         pm->high_watermark = (u8) tmp32;
783       else if (unformat (input, "low-watermark %d", &tmp32))
784         pm->low_watermark = (u8) tmp32;
785       else if (unformat (input, "rcv-buf-size %d", &pm->rcv_buffer_size))
786         ;
787       else if (unformat (input, "prealloc-fifos %d", &pm->prealloc_fifos))
788         ;
789       else if (unformat (input, "private-segment-count %d",
790                          &pm->private_segment_count))
791         ;
792       else if (unformat (input, "private-segment-size %U",
793                          unformat_memory_size, &tmp64))
794         {
795           if (tmp64 >= 0x100000000ULL)
796             return clib_error_return
797               (0, "private segment size %lld (%llu) too large", tmp64, tmp64);
798           pm->private_segment_size = tmp64;
799         }
800       else if (unformat (input, "server-uri %s", &pm->server_uri))
801         vec_add1 (pm->server_uri, 0);
802       else if (unformat (input, "client-uri %s", &pm->client_uri))
803         vec_add1 (pm->client_uri, 0);
804       else
805         return clib_error_return (0, "unknown input `%U'",
806                                   format_unformat_error, input);
807     }
808
809   if (!pm->server_uri)
810     {
811       clib_warning ("No server-uri provided, Using default: %s",
812                     default_server_uri);
813       pm->server_uri = format (0, "%s%c", default_server_uri, 0);
814     }
815   if (!pm->client_uri)
816     {
817       clib_warning ("No client-uri provided, Using default: %s",
818                     default_client_uri);
819       pm->client_uri = format (0, "%s%c", default_client_uri, 0);
820     }
821
822   vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
823
824   rv = proxy_server_create (vm);
825   switch (rv)
826     {
827     case 0:
828       break;
829     default:
830       return clib_error_return (0, "server_create returned %d", rv);
831     }
832
833   return 0;
834 }
835
836 /* *INDENT-OFF* */
837 VLIB_CLI_COMMAND (proxy_create_command, static) =
838 {
839   .path = "test proxy server",
840   .short_help = "test proxy server [server-uri <tcp://ip/port>]"
841       "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
842       "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
843       "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
844       "[private-segment-size <mem>][private-segment-count <nn>]",
845   .function = proxy_server_create_command_fn,
846 };
847 /* *INDENT-ON* */
848
849 clib_error_t *
850 proxy_main_init (vlib_main_t * vm)
851 {
852   proxy_main_t *pm = &proxy_main;
853   pm->server_client_index = ~0;
854   pm->active_open_client_index = ~0;
855   pm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword));
856   pm->proxy_session_by_server_handle = hash_create (0, sizeof (uword));
857
858   return 0;
859 }
860
861 VLIB_INIT_FUNCTION (proxy_main_init);
862
863 /*
864 * fd.io coding-style-patch-verification: ON
865 *
866 * Local Variables:
867 * eval: (c-set-style "gnu")
868 * End:
869 */