7fdf71d9aeeb680fc97a1ac8b4eab1941c1e7a22
[vpp.git] / src / vnet / session / application_worker.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application.h>
17 #include <vnet/session/application_interface.h>
18 #include <vnet/session/session.h>
19
20 /**
21  * Pool of workers associated to apps
22  */
23 static app_worker_t *app_workers;
24
25 static inline u64
26 application_client_local_connect_key (local_session_t * ls)
27 {
28   return (((u64) ls->app_wrk_index) << 32 | (u64) ls->session_index);
29 }
30
31 static inline void
32 application_client_local_connect_key_parse (u64 key, u32 * app_wrk_index,
33                                             u32 * session_index)
34 {
35   *app_wrk_index = key >> 32;
36   *session_index = key & 0xFFFFFFFF;
37 }
38
39 local_session_t *
40 app_worker_local_session_alloc (app_worker_t * app_wrk)
41 {
42   local_session_t *s;
43   pool_get (app_wrk->local_sessions, s);
44   clib_memset (s, 0, sizeof (*s));
45   s->app_wrk_index = app_wrk->wrk_index;
46   s->session_index = s - app_wrk->local_sessions;
47   s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
48   return s;
49 }
50
51 void
52 app_worker_local_session_free (app_worker_t * app_wrk, local_session_t * s)
53 {
54   pool_put (app_wrk->local_sessions, s);
55   if (CLIB_DEBUG)
56     clib_memset (s, 0xfc, sizeof (*s));
57 }
58
59 local_session_t *
60 app_worker_get_local_session (app_worker_t * app_wrk, u32 session_index)
61 {
62   if (pool_is_free_index (app_wrk->local_sessions, session_index))
63     return 0;
64   return pool_elt_at_index (app_wrk->local_sessions, session_index);
65 }
66
67 local_session_t *
68 app_worker_get_local_session_from_handle (session_handle_t handle)
69 {
70   app_worker_t *server_wrk;
71   u32 session_index, server_wrk_index;
72   local_session_parse_handle (handle, &server_wrk_index, &session_index);
73   server_wrk = app_worker_get_if_valid (server_wrk_index);
74   if (!server_wrk)
75     return 0;
76   return app_worker_get_local_session (server_wrk, session_index);
77 }
78
79 void
80 app_worker_local_sessions_free (app_worker_t * app_wrk)
81 {
82   u32 index, server_wrk_index, session_index;
83   u64 handle, *handles = 0;
84   app_worker_t *server_wrk;
85   segment_manager_t *sm;
86   local_session_t *ls;
87   int i;
88
89   /*
90    * Local sessions
91    */
92   if (app_wrk->local_sessions)
93     {
94       /* *INDENT-OFF* */
95       pool_foreach (ls, app_wrk->local_sessions, ({
96         app_worker_local_session_disconnect (app_wrk->wrk_index, ls);
97       }));
98       /* *INDENT-ON* */
99     }
100
101   /*
102    * Local connects
103    */
104   vec_reset_length (handles);
105   /* *INDENT-OFF* */
106   hash_foreach (handle, index, app_wrk->local_connects, ({
107     vec_add1 (handles, handle);
108   }));
109   /* *INDENT-ON* */
110
111   for (i = 0; i < vec_len (handles); i++)
112     {
113       application_client_local_connect_key_parse (handles[i],
114                                                   &server_wrk_index,
115                                                   &session_index);
116       server_wrk = app_worker_get_if_valid (server_wrk_index);
117       if (server_wrk)
118         {
119           ls = app_worker_get_local_session (server_wrk, session_index);
120           app_worker_local_session_disconnect (app_wrk->wrk_index, ls);
121         }
122     }
123
124   sm = segment_manager_get (app_wrk->local_segment_manager);
125   sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
126   segment_manager_del (sm);
127 }
128
129 app_worker_t *
130 app_worker_alloc (application_t * app)
131 {
132   app_worker_t *app_wrk;
133   pool_get (app_workers, app_wrk);
134   clib_memset (app_wrk, 0, sizeof (*app_wrk));
135   app_wrk->wrk_index = app_wrk - app_workers;
136   app_wrk->app_index = app->app_index;
137   app_wrk->wrk_map_index = ~0;
138   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
139   app_wrk->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
140   app_wrk->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
141   APP_DBG ("New app %v worker %u", app_get_name (app), app_wrk->wrk_index);
142   return app_wrk;
143 }
144
145 app_worker_t *
146 app_worker_get (u32 wrk_index)
147 {
148   return pool_elt_at_index (app_workers, wrk_index);
149 }
150
151 app_worker_t *
152 app_worker_get_if_valid (u32 wrk_index)
153 {
154   if (pool_is_free_index (app_workers, wrk_index))
155     return 0;
156   return pool_elt_at_index (app_workers, wrk_index);
157 }
158
159 void
160 app_worker_free (app_worker_t * app_wrk)
161 {
162   application_t *app = application_get (app_wrk->app_index);
163   vnet_unlisten_args_t _a, *a = &_a;
164   u64 handle, *handles = 0;
165   segment_manager_t *sm;
166   u32 sm_index;
167   int i;
168   app_listener_t *al;
169   session_t *ls;
170
171   /*
172    *  Listener cleanup
173    */
174
175   /* *INDENT-OFF* */
176   hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
177     ls = listen_session_get_from_handle (handle);
178     al = app_listener_get (app, ls->al_index);
179     vec_add1 (handles, app_listener_handle (al));
180     sm = segment_manager_get (sm_index);
181     sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
182   }));
183   /* *INDENT-ON* */
184
185   for (i = 0; i < vec_len (handles); i++)
186     {
187       a->app_index = app->app_index;
188       a->wrk_map_index = app_wrk->wrk_map_index;
189       a->handle = handles[i];
190       /* seg manager is removed when unbind completes */
191       (void) vnet_unlisten (a);
192     }
193
194   /*
195    * Connects segment manager cleanup
196    */
197
198   if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
199     {
200       sm = segment_manager_get (app_wrk->connects_seg_manager);
201       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
202       segment_manager_init_del (sm);
203     }
204
205   /* If first segment manager is used by a listener */
206   if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
207       && app_wrk->first_segment_manager != app_wrk->connects_seg_manager)
208     {
209       sm = segment_manager_get (app_wrk->first_segment_manager);
210       sm->first_is_protected = 0;
211       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
212       /* .. and has no fifos, e.g. it might be used for redirected sessions,
213        * remove it */
214       if (!segment_manager_has_fifos (sm))
215         segment_manager_del (sm);
216     }
217
218   /*
219    * Local sessions
220    */
221   app_worker_local_sessions_free (app_wrk);
222
223   pool_put (app_workers, app_wrk);
224   if (CLIB_DEBUG)
225     clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
226 }
227
228 application_t *
229 app_worker_get_app (u32 wrk_index)
230 {
231   app_worker_t *app_wrk;
232   app_wrk = app_worker_get_if_valid (wrk_index);
233   if (!app_wrk)
234     return 0;
235   return application_get_if_valid (app_wrk->app_index);
236 }
237
238 static segment_manager_t *
239 app_worker_alloc_segment_manager (app_worker_t * app_wrk)
240 {
241   segment_manager_t *sm = 0;
242
243   /* If the first segment manager is not in use, don't allocate a new one */
244   if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
245       && app_wrk->first_segment_manager_in_use == 0)
246     {
247       sm = segment_manager_get (app_wrk->first_segment_manager);
248       app_wrk->first_segment_manager_in_use = 1;
249       return sm;
250     }
251
252   sm = segment_manager_new ();
253   sm->app_wrk_index = app_wrk->wrk_index;
254
255   return sm;
256 }
257
258 int
259 app_worker_start_listen (app_worker_t * app_wrk,
260                          app_listener_t * app_listener)
261 {
262   segment_manager_t *sm;
263   session_t *ls;
264
265   if (clib_bitmap_get (app_listener->workers, app_wrk->wrk_map_index))
266     return VNET_API_ERROR_ADDRESS_IN_USE;
267
268   app_listener->workers = clib_bitmap_set (app_listener->workers,
269                                            app_wrk->wrk_map_index, 1);
270
271   if (app_listener->session_index == SESSION_INVALID_INDEX)
272     return 0;
273
274   ls = session_get (app_listener->session_index, 0);
275
276   /* Allocate segment manager. All sessions derived out of a listen session
277    * have fifos allocated by the same segment manager. */
278   if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
279     return -1;
280
281   /* Keep track of the segment manager for the listener or this worker */
282   hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
283             segment_manager_index (sm));
284
285   if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL)
286     {
287       if (!ls->rx_fifo && session_alloc_fifos (sm, ls))
288         return -1;
289     }
290   return 0;
291 }
292
293 int
294 app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al)
295 {
296   session_handle_t handle;
297   segment_manager_t *sm;
298   uword *sm_indexp;
299
300   if (!clib_bitmap_get (al->workers, app_wrk->wrk_map_index))
301     return 0;
302
303   if (al->session_index != SESSION_INVALID_INDEX)
304     {
305       session_t *ls;
306
307       ls = listen_session_get (al->session_index);
308       handle = listen_session_get_handle (ls);
309
310       sm_indexp = hash_get (app_wrk->listeners_table, handle);
311       if (PREDICT_FALSE (!sm_indexp))
312         {
313           clib_warning ("listener handle was removed %llu!", handle);
314           return -1;
315         }
316
317       sm = segment_manager_get (*sm_indexp);
318       if (app_wrk->first_segment_manager == *sm_indexp)
319         {
320           /* Delete sessions but don't remove segment manager */
321           app_wrk->first_segment_manager_in_use = 0;
322           segment_manager_del_sessions (sm);
323         }
324       else
325         {
326           segment_manager_init_del (sm);
327         }
328       hash_unset (app_wrk->listeners_table, handle);
329     }
330
331   if (al->local_index != SESSION_INVALID_INDEX)
332     {
333       local_session_t *ll, *ls;
334       application_t *app;
335
336       app = application_get (app_wrk->app_index);
337       ll = application_get_local_listen_session (app, al->local_index);
338
339       /* *INDENT-OFF* */
340       pool_foreach (ls, app_wrk->local_sessions, ({
341         if (ls->listener_index == ll->session_index)
342           app_worker_local_session_disconnect (app_wrk->app_index, ls);
343       }));
344       /* *INDENT-ON* */
345     }
346
347   clib_bitmap_set_no_check (al->workers, app_wrk->wrk_map_index, 0);
348   if (clib_bitmap_is_zero (al->workers))
349     app_listener_cleanup (al);
350
351   return 0;
352 }
353
354 int
355 app_worker_own_session (app_worker_t * app_wrk, session_t * s)
356 {
357   segment_manager_t *sm;
358   svm_fifo_t *rxf, *txf;
359
360   if (s->session_state == SESSION_STATE_LISTENING)
361     return application_change_listener_owner (s, app_wrk);
362
363   s->app_wrk_index = app_wrk->wrk_index;
364
365   rxf = s->rx_fifo;
366   txf = s->tx_fifo;
367
368   if (!rxf || !txf)
369     return 0;
370
371   s->rx_fifo = 0;
372   s->tx_fifo = 0;
373
374   sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk);
375   if (session_alloc_fifos (sm, s))
376     return -1;
377
378   if (!svm_fifo_is_empty (rxf))
379     {
380       clib_memcpy_fast (s->rx_fifo->data, rxf->data, rxf->nitems);
381       s->rx_fifo->head = rxf->head;
382       s->rx_fifo->tail = rxf->tail;
383       s->rx_fifo->cursize = rxf->cursize;
384     }
385
386   if (!svm_fifo_is_empty (txf))
387     {
388       clib_memcpy_fast (s->tx_fifo->data, txf->data, txf->nitems);
389       s->tx_fifo->head = txf->head;
390       s->tx_fifo->tail = txf->tail;
391       s->tx_fifo->cursize = txf->cursize;
392     }
393
394   segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf);
395
396   return 0;
397 }
398
399 int
400 app_worker_connect_session (app_worker_t * app, session_endpoint_t * sep,
401                             u32 api_context)
402 {
403   int rv;
404
405   /* Make sure we have a segment manager for connects */
406   app_worker_alloc_connects_segment_manager (app);
407
408   if ((rv = session_open (app->wrk_index, sep, api_context)))
409     return rv;
410
411   return 0;
412 }
413
414 int
415 app_worker_alloc_connects_segment_manager (app_worker_t * app_wrk)
416 {
417   segment_manager_t *sm;
418
419   if (app_wrk->connects_seg_manager == APP_INVALID_SEGMENT_MANAGER_INDEX)
420     {
421       sm = app_worker_alloc_segment_manager (app_wrk);
422       if (sm == 0)
423         return -1;
424       app_wrk->connects_seg_manager = segment_manager_index (sm);
425     }
426   return 0;
427 }
428
429 segment_manager_t *
430 app_worker_get_connect_segment_manager (app_worker_t * app)
431 {
432   ASSERT (app->connects_seg_manager != (u32) ~ 0);
433   return segment_manager_get (app->connects_seg_manager);
434 }
435
436 segment_manager_t *
437 app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk)
438 {
439   if (app_wrk->connects_seg_manager == (u32) ~ 0)
440     app_worker_alloc_connects_segment_manager (app_wrk);
441   return segment_manager_get (app_wrk->connects_seg_manager);
442 }
443
444 segment_manager_t *
445 app_worker_get_listen_segment_manager (app_worker_t * app,
446                                        session_t * listener)
447 {
448   uword *smp;
449   smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
450   ASSERT (smp != 0);
451   return segment_manager_get (*smp);
452 }
453
454 session_t *
455 app_worker_first_listener (app_worker_t * app_wrk, u8 fib_proto,
456                            u8 transport_proto)
457 {
458   session_t *listener;
459   u64 handle;
460   u32 sm_index;
461   u8 sst;
462
463   sst = session_type_from_proto_and_ip (transport_proto,
464                                         fib_proto == FIB_PROTOCOL_IP4);
465
466   /* *INDENT-OFF* */
467    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
468      listener = listen_session_get_from_handle (handle);
469      if (listener->session_type == sst
470          && listener->enqueue_epoch != SESSION_PROXY_LISTENER_INDEX)
471        return listener;
472    }));
473   /* *INDENT-ON* */
474
475   return 0;
476 }
477
478 session_t *
479 app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
480                            u8 transport_proto)
481 {
482   session_t *listener;
483   u64 handle;
484   u32 sm_index;
485   u8 sst;
486
487   sst = session_type_from_proto_and_ip (transport_proto,
488                                         fib_proto == FIB_PROTOCOL_IP4);
489
490   /* *INDENT-OFF* */
491    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
492      listener = listen_session_get_from_handle (handle);
493      if (listener->session_type == sst
494          && listener->enqueue_epoch == SESSION_PROXY_LISTENER_INDEX)
495        return listener;
496    }));
497   /* *INDENT-ON* */
498
499   return 0;
500 }
501
502 /**
503  * Send an API message to the external app, to map new segment
504  */
505 int
506 app_worker_add_segment_notify (u32 app_wrk_index, u64 segment_handle)
507 {
508   app_worker_t *app_wrk = app_worker_get (app_wrk_index);
509   application_t *app = application_get (app_wrk->app_index);
510   return app->cb_fns.add_segment_callback (app_wrk->api_client_index,
511                                            segment_handle);
512 }
513
514 u8
515 app_worker_application_is_builtin (app_worker_t * app_wrk)
516 {
517   return app_wrk->app_is_builtin;
518 }
519
520 static inline int
521 app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
522 {
523   if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
524     {
525       clib_warning ("evt q full");
526       svm_msg_q_free_msg (mq, msg);
527       if (lock)
528         svm_msg_q_unlock (mq);
529       return -1;
530     }
531
532   if (lock)
533     {
534       svm_msg_q_add_and_unlock (mq, msg);
535       return 0;
536     }
537
538   /* Even when not locking the ring, we must wait for queue mutex */
539   if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
540     {
541       clib_warning ("msg q add returned");
542       return -1;
543     }
544   return 0;
545 }
546
547 static inline int
548 app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock)
549 {
550   session_event_t *evt;
551   svm_msg_q_msg_t msg;
552   svm_msg_q_t *mq;
553
554   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY
555                      && s->session_state != SESSION_STATE_LISTENING))
556     {
557       /* Session is closed so app will never clean up. Flush rx fifo */
558       if (s->session_state == SESSION_STATE_CLOSED)
559         svm_fifo_dequeue_drop_all (s->rx_fifo);
560       return 0;
561     }
562
563   if (app_worker_application_is_builtin (app_wrk))
564     {
565       application_t *app = application_get (app_wrk->app_index);
566       return app->cb_fns.builtin_app_rx_callback (s);
567     }
568
569   if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo))
570     return 0;
571
572   mq = app_wrk->event_queue;
573   if (lock)
574     svm_msg_q_lock (mq);
575
576   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
577     {
578       clib_warning ("evt q rings full");
579       if (lock)
580         svm_msg_q_unlock (mq);
581       return -1;
582     }
583
584   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
585   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
586
587   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
588   evt->fifo = s->rx_fifo;
589   evt->event_type = FIFO_EVENT_APP_RX;
590
591   (void) svm_fifo_set_event (s->rx_fifo);
592
593   if (app_enqueue_evt (mq, &msg, lock))
594     return -1;
595   return 0;
596 }
597
598 static inline int
599 app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock)
600 {
601   svm_msg_q_t *mq;
602   session_event_t *evt;
603   svm_msg_q_msg_t msg;
604
605   if (app_worker_application_is_builtin (app_wrk))
606     return 0;
607
608   mq = app_wrk->event_queue;
609   if (lock)
610     svm_msg_q_lock (mq);
611
612   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
613     {
614       clib_warning ("evt q rings full");
615       if (lock)
616         svm_msg_q_unlock (mq);
617       return -1;
618     }
619
620   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
621   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
622
623   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
624   evt->event_type = FIFO_EVENT_APP_TX;
625   evt->fifo = s->tx_fifo;
626
627   return app_enqueue_evt (mq, &msg, lock);
628 }
629
630 /* *INDENT-OFF* */
631 typedef int (app_send_evt_handler_fn) (app_worker_t *app,
632                                        session_t *s,
633                                        u8 lock);
634 static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
635     app_send_io_evt_rx,
636     0,
637     app_send_io_evt_tx,
638 };
639 /* *INDENT-ON* */
640
641 /**
642  * Send event to application
643  *
644  * Logic from queue perspective is non-blocking. If there's
645  * not enough space to enqueue a message, we return.
646  */
647 int
648 app_worker_send_event (app_worker_t * app, session_t * s, u8 evt_type)
649 {
650   ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
651   return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
652 }
653
654 /**
655  * Send event to application
656  *
657  * Logic from queue perspective is blocking. However, if queue is full,
658  * we return.
659  */
660 int
661 app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
662                                 u8 evt_type)
663 {
664   return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
665 }
666
667 segment_manager_t *
668 app_worker_get_local_segment_manager (app_worker_t * app_worker)
669 {
670   return segment_manager_get (app_worker->local_segment_manager);
671 }
672
673 segment_manager_t *
674 app_worker_get_local_segment_manager_w_session (app_worker_t * app_wrk,
675                                                 local_session_t * ls)
676 {
677   session_t *listener;
678   if (application_local_session_listener_has_transport (ls))
679     {
680       listener = listen_session_get (ls->listener_index);
681       return app_worker_get_listen_segment_manager (app_wrk, listener);
682     }
683   return segment_manager_get (app_wrk->local_segment_manager);
684 }
685
686 int
687 app_worker_local_session_cleanup (app_worker_t * client_wrk,
688                                   app_worker_t * server_wrk,
689                                   local_session_t * ls)
690 {
691   svm_fifo_segment_private_t *seg;
692   session_t *listener;
693   segment_manager_t *sm;
694   u64 client_key;
695   u8 has_transport;
696
697   /* Retrieve listener transport type as it is the one that decides where
698    * the fifos are allocated */
699   has_transport = application_local_session_listener_has_transport (ls);
700   if (!has_transport)
701     sm = app_worker_get_local_segment_manager_w_session (server_wrk, ls);
702   else
703     {
704       listener = listen_session_get (ls->listener_index);
705       sm = app_worker_get_listen_segment_manager (server_wrk, listener);
706     }
707
708   seg = segment_manager_get_segment (sm, ls->svm_segment_index);
709   if (client_wrk)
710     {
711       client_key = application_client_local_connect_key (ls);
712       hash_unset (client_wrk->local_connects, client_key);
713     }
714
715   if (!has_transport)
716     {
717       application_t *server = application_get (server_wrk->app_index);
718       u64 segment_handle = segment_manager_segment_handle (sm, seg);
719       server->cb_fns.del_segment_callback (server_wrk->api_client_index,
720                                            segment_handle);
721       if (client_wrk)
722         {
723           application_t *client = application_get (client_wrk->app_index);
724           client->cb_fns.del_segment_callback (client_wrk->api_client_index,
725                                                segment_handle);
726         }
727       segment_manager_del_segment (sm, seg);
728     }
729
730   app_worker_local_session_free (server_wrk, ls);
731
732   return 0;
733 }
734
735 static void
736 application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq)
737 {
738   int fd;
739
740   /*
741    * segment manager initializes only the producer eventds, since vpp is
742    * typically the producer. But for local sessions, we also pass to the
743    * apps the mqs they listen on for events from peer apps, so they are also
744    * consumer fds.
745    */
746   fd = svm_msg_q_get_producer_eventfd (sq);
747   svm_msg_q_set_consumer_eventfd (sq, fd);
748   fd = svm_msg_q_get_producer_eventfd (cq);
749   svm_msg_q_set_consumer_eventfd (cq, fd);
750 }
751
752 int
753 app_worker_local_session_connect (app_worker_t * client_wrk,
754                                   app_worker_t * server_wrk,
755                                   local_session_t * ll, u32 opaque)
756 {
757   u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
758   u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index;
759   segment_manager_properties_t *props, *cprops;
760   int rv, has_transport, seg_index;
761   svm_fifo_segment_private_t *seg;
762   application_t *server, *client;
763   segment_manager_t *sm;
764   local_session_t *ls;
765   svm_msg_q_t *sq, *cq;
766   u64 segment_handle;
767
768   ls = app_worker_local_session_alloc (server_wrk);
769   server = application_get (server_wrk->app_index);
770   client = application_get (client_wrk->app_index);
771
772   props = application_segment_manager_properties (server);
773   cprops = application_segment_manager_properties (client);
774   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
775   evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
776   round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
777   round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
778   seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
779
780   has_transport = session_has_transport ((session_t *) ll);
781   if (!has_transport)
782     {
783       /* Local sessions don't have backing transport */
784       ls->port = ll->port;
785       sm = app_worker_get_local_segment_manager (server_wrk);
786     }
787   else
788     {
789       session_t *sl = (session_t *) ll;
790       transport_connection_t *tc;
791       tc = listen_session_get_transport (sl);
792       ls->port = tc->lcl_port;
793       sm = app_worker_get_listen_segment_manager (server_wrk, sl);
794     }
795
796   seg_index = segment_manager_add_segment (sm, seg_size);
797   if (seg_index < 0)
798     {
799       clib_warning ("failed to add new cut-through segment");
800       return seg_index;
801     }
802   seg = segment_manager_get_segment_w_lock (sm, seg_index);
803   sq = segment_manager_alloc_queue (seg, props);
804   cq = segment_manager_alloc_queue (seg, cprops);
805
806   if (props->use_mq_eventfd)
807     application_local_session_fix_eventds (sq, cq);
808
809   ls->server_evt_q = pointer_to_uword (sq);
810   ls->client_evt_q = pointer_to_uword (cq);
811   rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
812                                         props->tx_fifo_size,
813                                         &ls->rx_fifo, &ls->tx_fifo);
814   if (rv)
815     {
816       clib_warning ("failed to add fifos in cut-through segment");
817       segment_manager_segment_reader_unlock (sm);
818       goto failed;
819     }
820   sm_index = segment_manager_index (sm);
821   ls->rx_fifo->ct_session_index = ls->session_index;
822   ls->tx_fifo->ct_session_index = ls->session_index;
823   ls->rx_fifo->segment_manager = sm_index;
824   ls->tx_fifo->segment_manager = sm_index;
825   ls->rx_fifo->segment_index = seg_index;
826   ls->tx_fifo->segment_index = seg_index;
827   ls->svm_segment_index = seg_index;
828   ls->listener_index = ll->session_index;
829   ls->client_wrk_index = client_wrk->wrk_index;
830   ls->client_opaque = opaque;
831   ls->listener_session_type = ll->session_type;
832   ls->session_state = SESSION_STATE_READY;
833
834   segment_handle = segment_manager_segment_handle (sm, seg);
835   if ((rv = server->cb_fns.add_segment_callback (server_wrk->api_client_index,
836                                                  segment_handle)))
837     {
838       clib_warning ("failed to notify server of new segment");
839       segment_manager_segment_reader_unlock (sm);
840       goto failed;
841     }
842   segment_manager_segment_reader_unlock (sm);
843   if ((rv = server->cb_fns.session_accept_callback ((session_t *) ls)))
844     {
845       clib_warning ("failed to send accept cut-through notify to server");
846       goto failed;
847     }
848   if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN)
849     app_worker_local_session_connect_notify (ls);
850
851   return 0;
852
853 failed:
854   if (!has_transport)
855     segment_manager_del_segment (sm, seg);
856   return rv;
857 }
858
859 int
860 app_worker_local_session_connect_notify (local_session_t * ls)
861 {
862   svm_fifo_segment_private_t *seg;
863   app_worker_t *client_wrk, *server_wrk;
864   segment_manager_t *sm;
865   application_t *client;
866   int rv, is_fail = 0;
867   u64 segment_handle;
868   u64 client_key;
869
870   client_wrk = app_worker_get (ls->client_wrk_index);
871   server_wrk = app_worker_get (ls->app_wrk_index);
872   client = application_get (client_wrk->app_index);
873
874   sm = app_worker_get_local_segment_manager_w_session (server_wrk, ls);
875   seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index);
876   segment_handle = segment_manager_segment_handle (sm, seg);
877   if ((rv = client->cb_fns.add_segment_callback (client_wrk->api_client_index,
878                                                  segment_handle)))
879     {
880       clib_warning ("failed to notify client %u of new segment",
881                     ls->client_wrk_index);
882       segment_manager_segment_reader_unlock (sm);
883       app_worker_local_session_disconnect (ls->client_wrk_index, ls);
884       is_fail = 1;
885     }
886   else
887     {
888       segment_manager_segment_reader_unlock (sm);
889     }
890
891   client->cb_fns.session_connected_callback (client_wrk->wrk_index,
892                                              ls->client_opaque,
893                                              (session_t *) ls, is_fail);
894
895   client_key = application_client_local_connect_key (ls);
896   hash_set (client_wrk->local_connects, client_key, client_key);
897   return 0;
898 }
899
900 int
901 app_worker_local_session_disconnect (u32 app_index, local_session_t * ls)
902 {
903   app_worker_t *client_wrk, *server_wrk;
904   u8 is_server = 0, is_client = 0;
905   application_t *app;
906
907   app = application_get_if_valid (app_index);
908   if (!app)
909     return 0;
910
911   client_wrk = app_worker_get_if_valid (ls->client_wrk_index);
912   server_wrk = app_worker_get (ls->app_wrk_index);
913
914   if (server_wrk->app_index == app_index)
915     is_server = 1;
916   else if (client_wrk && client_wrk->app_index == app_index)
917     is_client = 1;
918
919   if (!is_server && !is_client)
920     {
921       clib_warning ("app %u is neither client nor server for session 0x%lx",
922                     app_index, application_local_session_handle (ls));
923       return VNET_API_ERROR_INVALID_VALUE;
924     }
925
926   if (ls->session_state == SESSION_STATE_CLOSED)
927     return app_worker_local_session_cleanup (client_wrk, server_wrk, ls);
928
929   if (app_index == ls->client_wrk_index)
930     {
931       mq_send_local_session_disconnected_cb (ls->app_wrk_index, ls);
932     }
933   else
934     {
935       if (!client_wrk)
936         {
937           return app_worker_local_session_cleanup (client_wrk, server_wrk,
938                                                    ls);
939         }
940       else if (ls->session_state < SESSION_STATE_READY)
941         {
942           application_t *client = application_get (client_wrk->app_index);
943           client->cb_fns.session_connected_callback (client_wrk->wrk_index,
944                                                      ls->client_opaque,
945                                                      (session_t *) ls,
946                                                      1 /* is_fail */ );
947           ls->session_state = SESSION_STATE_CLOSED;
948           return app_worker_local_session_cleanup (client_wrk, server_wrk,
949                                                    ls);
950         }
951       else
952         {
953           mq_send_local_session_disconnected_cb (client_wrk->wrk_index, ls);
954         }
955     }
956
957   ls->session_state = SESSION_STATE_CLOSED;
958
959   return 0;
960 }
961
962 int
963 app_worker_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index)
964 {
965   app_worker_t *app_wrk;
966   local_session_t *ls;
967   app_wrk = app_worker_get (app_wrk_index);
968   ls = app_worker_get_local_session (app_wrk, ls_index);
969   return app_worker_local_session_disconnect (app_wrk_index, ls);
970 }
971
972 u8 *
973 format_app_worker_listener (u8 * s, va_list * args)
974 {
975   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
976   u64 handle = va_arg (*args, u64);
977   u32 sm_index = va_arg (*args, u32);
978   int verbose = va_arg (*args, int);
979   session_t *listener;
980   const u8 *app_name;
981   u8 *str;
982
983   if (!app_wrk)
984     {
985       if (verbose)
986         s = format (s, "%-40s%-25s%=10s%-15s%-15s%-10s", "Connection", "App",
987                     "Wrk", "API Client", "ListenerID", "SegManager");
988       else
989         s = format (s, "%-40s%-25s%=10s", "Connection", "App", "Wrk");
990
991       return s;
992     }
993
994   app_name = application_name_from_index (app_wrk->app_index);
995   listener = listen_session_get_from_handle (handle);
996   str = format (0, "%U", format_stream_session, listener, verbose);
997
998   if (verbose)
999     {
1000       char buf[32];
1001       sprintf (buf, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index);
1002       s = format (s, "%-40s%-25s%=10s%-15u%-15u%-10u", str, app_name,
1003                   buf, app_wrk->api_client_index, handle, sm_index);
1004     }
1005   else
1006     s = format (s, "%-40s%-25s%=10u", str, app_name, app_wrk->wrk_map_index);
1007
1008   return s;
1009 }
1010
1011 u8 *
1012 format_app_worker (u8 * s, va_list * args)
1013 {
1014   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
1015   u32 indent = 1;
1016
1017   s = format (s, "%U wrk-index %u app-index %u map-index %u "
1018               "api-client-index %d\n", format_white_space, indent,
1019               app_wrk->wrk_index, app_wrk->app_index, app_wrk->wrk_map_index,
1020               app_wrk->api_client_index);
1021   return s;
1022 }
1023
1024 void
1025 app_worker_format_connects (app_worker_t * app_wrk, int verbose)
1026 {
1027   svm_fifo_segment_private_t *fifo_segment;
1028   vlib_main_t *vm = vlib_get_main ();
1029   segment_manager_t *sm;
1030   const u8 *app_name;
1031   u8 *s = 0;
1032
1033   /* Header */
1034   if (!app_wrk)
1035     {
1036       if (verbose)
1037         vlib_cli_output (vm, "%-40s%-20s%-15s%-10s", "Connection", "App",
1038                          "API Client", "SegManager");
1039       else
1040         vlib_cli_output (vm, "%-40s%-20s", "Connection", "App");
1041       return;
1042     }
1043
1044   if (app_wrk->connects_seg_manager == (u32) ~ 0)
1045     return;
1046
1047   app_name = application_name_from_index (app_wrk->app_index);
1048
1049   /* Across all fifo segments */
1050   sm = segment_manager_get (app_wrk->connects_seg_manager);
1051
1052   /* *INDENT-OFF* */
1053   segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({
1054     svm_fifo_t *fifo;
1055     u8 *str;
1056
1057     fifo = svm_fifo_segment_get_fifo_list (fifo_segment);
1058     while (fifo)
1059       {
1060         u32 session_index, thread_index;
1061         session_t *session;
1062
1063         session_index = fifo->master_session_index;
1064         thread_index = fifo->master_thread_index;
1065
1066         session = session_get (session_index, thread_index);
1067         str = format (0, "%U", format_stream_session, session, verbose);
1068
1069         if (verbose)
1070           s = format (s, "%-40s%-20s%-15u%-10u", str, app_name,
1071                       app_wrk->api_client_index, app_wrk->connects_seg_manager);
1072         else
1073           s = format (s, "%-40s%-20s", str, app_name);
1074
1075         vlib_cli_output (vm, "%v", s);
1076         vec_reset_length (s);
1077         vec_free (str);
1078
1079         fifo = fifo->next;
1080       }
1081     vec_free (s);
1082   }));
1083   /* *INDENT-ON* */
1084 }
1085
1086 void
1087 app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose)
1088 {
1089   vlib_main_t *vm = vlib_get_main ();
1090   app_worker_t *client_wrk;
1091   local_session_t *ls;
1092   transport_proto_t tp;
1093   u8 *conn = 0;
1094
1095   /* Header */
1096   if (app_wrk == 0)
1097     {
1098       vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp",
1099                        "ClientApp");
1100       return;
1101     }
1102
1103   if (!pool_elts (app_wrk->local_sessions)
1104       && !pool_elts (app_wrk->local_connects))
1105     return;
1106
1107   /* *INDENT-OFF* */
1108   pool_foreach (ls, app_wrk->local_sessions, ({
1109     tp = session_type_transport_proto(ls->listener_session_type);
1110     conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
1111                    ls->port);
1112     client_wrk = app_worker_get (ls->client_wrk_index);
1113     vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_index,
1114                      client_wrk->app_index);
1115     vec_reset_length (conn);
1116   }));
1117   /* *INDENT-ON* */
1118
1119   vec_free (conn);
1120 }
1121
1122 void
1123 app_worker_format_local_connects (app_worker_t * app, int verbose)
1124 {
1125   vlib_main_t *vm = vlib_get_main ();
1126   u32 app_wrk_index, session_index;
1127   app_worker_t *server_wrk;
1128   local_session_t *ls;
1129   u64 client_key;
1130   u64 value;
1131
1132   /* Header */
1133   if (app == 0)
1134     {
1135       if (verbose)
1136         vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App",
1137                          "Peer App", "SegManager");
1138       else
1139         vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App",
1140                          "Peer App");
1141       return;
1142     }
1143
1144   if (!app->local_connects)
1145     return;
1146
1147   /* *INDENT-OFF* */
1148   hash_foreach (client_key, value, app->local_connects, ({
1149     application_client_local_connect_key_parse (client_key, &app_wrk_index,
1150                                                 &session_index);
1151     server_wrk = app_worker_get (app_wrk_index);
1152     ls = app_worker_get_local_session (server_wrk, session_index);
1153     vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_wrk_index,
1154                      ls->client_wrk_index);
1155   }));
1156   /* *INDENT-ON* */
1157 }
1158
1159 /*
1160  * fd.io coding-style-patch-verification: ON
1161  *
1162  * Local Variables:
1163  * eval: (c-set-style "gnu")
1164  * End:
1165  */