44e0d3994333a1a5165cf57e5afeccfbaa37eda3
[vpp.git] / src / vnet / session / application_worker.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application.h>
17 #include <vnet/session/application_interface.h>
18
19 /**
20  * Pool of workers associated to apps
21  */
22 static app_worker_t *app_workers;
23
24 static inline u64
25 application_client_local_connect_key (local_session_t * ls)
26 {
27   return (((u64) ls->app_wrk_index) << 32 | (u64) ls->session_index);
28 }
29
30 static inline void
31 application_client_local_connect_key_parse (u64 key, u32 * app_wrk_index,
32                                             u32 * session_index)
33 {
34   *app_wrk_index = key >> 32;
35   *session_index = key & 0xFFFFFFFF;
36 }
37
38 local_session_t *
39 app_worker_local_session_alloc (app_worker_t * app_wrk)
40 {
41   local_session_t *s;
42   pool_get (app_wrk->local_sessions, s);
43   clib_memset (s, 0, sizeof (*s));
44   s->app_wrk_index = app_wrk->wrk_index;
45   s->session_index = s - app_wrk->local_sessions;
46   s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
47   return s;
48 }
49
50 void
51 app_worker_local_session_free (app_worker_t * app_wrk, local_session_t * s)
52 {
53   pool_put (app_wrk->local_sessions, s);
54   if (CLIB_DEBUG)
55     clib_memset (s, 0xfc, sizeof (*s));
56 }
57
58 local_session_t *
59 app_worker_get_local_session (app_worker_t * app_wrk, u32 session_index)
60 {
61   if (pool_is_free_index (app_wrk->local_sessions, session_index))
62     return 0;
63   return pool_elt_at_index (app_wrk->local_sessions, session_index);
64 }
65
66 local_session_t *
67 app_worker_get_local_session_from_handle (session_handle_t handle)
68 {
69   app_worker_t *server_wrk;
70   u32 session_index, server_wrk_index;
71   local_session_parse_handle (handle, &server_wrk_index, &session_index);
72   server_wrk = app_worker_get_if_valid (server_wrk_index);
73   if (!server_wrk)
74     return 0;
75   return app_worker_get_local_session (server_wrk, session_index);
76 }
77
78 void
79 app_worker_local_sessions_free (app_worker_t * app_wrk)
80 {
81   u32 index, server_wrk_index, session_index;
82   u64 handle, *handles = 0;
83   app_worker_t *server_wrk;
84   segment_manager_t *sm;
85   local_session_t *ls;
86   int i;
87
88   /*
89    * Local sessions
90    */
91   if (app_wrk->local_sessions)
92     {
93       /* *INDENT-OFF* */
94       pool_foreach (ls, app_wrk->local_sessions, ({
95         app_worker_local_session_disconnect (app_wrk->wrk_index, ls);
96       }));
97       /* *INDENT-ON* */
98     }
99
100   /*
101    * Local connects
102    */
103   vec_reset_length (handles);
104   /* *INDENT-OFF* */
105   hash_foreach (handle, index, app_wrk->local_connects, ({
106     vec_add1 (handles, handle);
107   }));
108   /* *INDENT-ON* */
109
110   for (i = 0; i < vec_len (handles); i++)
111     {
112       application_client_local_connect_key_parse (handles[i],
113                                                   &server_wrk_index,
114                                                   &session_index);
115       server_wrk = app_worker_get_if_valid (server_wrk_index);
116       if (server_wrk)
117         {
118           ls = app_worker_get_local_session (server_wrk, session_index);
119           app_worker_local_session_disconnect (app_wrk->wrk_index, ls);
120         }
121     }
122
123   sm = segment_manager_get (app_wrk->local_segment_manager);
124   sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
125   segment_manager_del (sm);
126 }
127
128 app_worker_t *
129 app_worker_alloc (application_t * app)
130 {
131   app_worker_t *app_wrk;
132   pool_get (app_workers, app_wrk);
133   clib_memset (app_wrk, 0, sizeof (*app_wrk));
134   app_wrk->wrk_index = app_wrk - app_workers;
135   app_wrk->app_index = app->app_index;
136   app_wrk->wrk_map_index = ~0;
137   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
138   app_wrk->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
139   app_wrk->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
140   APP_DBG ("New app %v worker %u", app_get_name (app), app_wrk->wrk_index);
141   return app_wrk;
142 }
143
144 app_worker_t *
145 app_worker_get (u32 wrk_index)
146 {
147   return pool_elt_at_index (app_workers, wrk_index);
148 }
149
150 app_worker_t *
151 app_worker_get_if_valid (u32 wrk_index)
152 {
153   if (pool_is_free_index (app_workers, wrk_index))
154     return 0;
155   return pool_elt_at_index (app_workers, wrk_index);
156 }
157
158 void
159 app_worker_free (app_worker_t * app_wrk)
160 {
161   application_t *app = application_get (app_wrk->app_index);
162   vnet_unbind_args_t _a, *a = &_a;
163   u64 handle, *handles = 0;
164   segment_manager_t *sm;
165   u32 sm_index;
166   int i;
167   app_listener_t *al;
168   session_t *ls;
169
170   /*
171    *  Listener cleanup
172    */
173
174   /* *INDENT-OFF* */
175   hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
176     ls = listen_session_get_from_handle (handle);
177     al = app_listener_get (app, ls->al_index);
178     vec_add1 (handles, app_listener_handle (al));
179     sm = segment_manager_get (sm_index);
180     sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
181   }));
182   /* *INDENT-ON* */
183
184   for (i = 0; i < vec_len (handles); i++)
185     {
186       a->app_index = app->app_index;
187       a->wrk_map_index = app_wrk->wrk_map_index;
188       a->handle = handles[i];
189       /* seg manager is removed when unbind completes */
190       vnet_unlisten (a);
191     }
192
193   /*
194    * Connects segment manager cleanup
195    */
196
197   if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
198     {
199       sm = segment_manager_get (app_wrk->connects_seg_manager);
200       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
201       segment_manager_init_del (sm);
202     }
203
204   /* If first segment manager is used by a listener */
205   if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
206       && app_wrk->first_segment_manager != app_wrk->connects_seg_manager)
207     {
208       sm = segment_manager_get (app_wrk->first_segment_manager);
209       sm->first_is_protected = 0;
210       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
211       /* .. and has no fifos, e.g. it might be used for redirected sessions,
212        * remove it */
213       if (!segment_manager_has_fifos (sm))
214         segment_manager_del (sm);
215     }
216
217   /*
218    * Local sessions
219    */
220   app_worker_local_sessions_free (app_wrk);
221
222   pool_put (app_workers, app_wrk);
223   if (CLIB_DEBUG)
224     clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
225 }
226
227 application_t *
228 app_worker_get_app (u32 wrk_index)
229 {
230   app_worker_t *app_wrk;
231   app_wrk = app_worker_get_if_valid (wrk_index);
232   if (!app_wrk)
233     return 0;
234   return application_get_if_valid (app_wrk->app_index);
235 }
236
237 static segment_manager_t *
238 app_worker_alloc_segment_manager (app_worker_t * app_wrk)
239 {
240   segment_manager_t *sm = 0;
241
242   /* If the first segment manager is not in use, don't allocate a new one */
243   if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
244       && app_wrk->first_segment_manager_in_use == 0)
245     {
246       sm = segment_manager_get (app_wrk->first_segment_manager);
247       app_wrk->first_segment_manager_in_use = 1;
248       return sm;
249     }
250
251   sm = segment_manager_new ();
252   sm->app_wrk_index = app_wrk->wrk_index;
253
254   return sm;
255 }
256
257 int
258 app_worker_start_listen (app_worker_t * app_wrk,
259                          app_listener_t * app_listener)
260 {
261   segment_manager_t *sm;
262   session_t *ls;
263
264   if (clib_bitmap_get (app_listener->workers, app_wrk->wrk_map_index))
265     return VNET_API_ERROR_ADDRESS_IN_USE;
266
267   app_listener->workers = clib_bitmap_set (app_listener->workers,
268                                            app_wrk->wrk_map_index, 1);
269
270   if (app_listener->session_index == SESSION_INVALID_INDEX)
271     return 0;
272
273   ls = session_get (app_listener->session_index, 0);
274
275   /* Allocate segment manager. All sessions derived out of a listen session
276    * have fifos allocated by the same segment manager. */
277   if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
278     return -1;
279
280   /* Keep track of the segment manager for the listener or this worker */
281   hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
282             segment_manager_index (sm));
283
284   if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL)
285     {
286       if (!ls->rx_fifo && session_alloc_fifos (sm, ls))
287         return -1;
288     }
289   return 0;
290 }
291
292 int
293 app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al)
294 {
295   session_handle_t handle;
296   segment_manager_t *sm;
297   uword *sm_indexp;
298
299   if (!clib_bitmap_get (al->workers, app_wrk->wrk_map_index))
300     return 0;
301
302   if (al->session_index != SESSION_INVALID_INDEX)
303     {
304       session_t *ls;
305
306       ls = listen_session_get (al->session_index);
307       handle = listen_session_get_handle (ls);
308
309       sm_indexp = hash_get (app_wrk->listeners_table, handle);
310       if (PREDICT_FALSE (!sm_indexp))
311         {
312           clib_warning ("listener handle was removed %llu!", handle);
313           return -1;
314         }
315
316       sm = segment_manager_get (*sm_indexp);
317       if (app_wrk->first_segment_manager == *sm_indexp)
318         {
319           /* Delete sessions but don't remove segment manager */
320           app_wrk->first_segment_manager_in_use = 0;
321           segment_manager_del_sessions (sm);
322         }
323       else
324         {
325           segment_manager_init_del (sm);
326         }
327       hash_unset (app_wrk->listeners_table, handle);
328     }
329
330   if (al->local_index != SESSION_INVALID_INDEX)
331     {
332       local_session_t *ll, *ls;
333       application_t *app;
334
335       app = application_get (app_wrk->app_index);
336       ll = application_get_local_listen_session (app, al->local_index);
337
338       /* *INDENT-OFF* */
339       pool_foreach (ls, app_wrk->local_sessions, ({
340         if (ls->listener_index == ll->session_index)
341           app_worker_local_session_disconnect (app_wrk->app_index, ls);
342       }));
343       /* *INDENT-ON* */
344     }
345
346   clib_bitmap_set_no_check (al->workers, app_wrk->wrk_map_index, 0);
347   if (clib_bitmap_is_zero (al->workers))
348     app_listener_cleanup (al);
349
350   return 0;
351 }
352
353 int
354 app_worker_own_session (app_worker_t * app_wrk, session_t * s)
355 {
356   segment_manager_t *sm;
357   svm_fifo_t *rxf, *txf;
358
359   if (s->session_state == SESSION_STATE_LISTENING)
360     return application_change_listener_owner (s, app_wrk);
361
362   s->app_wrk_index = app_wrk->wrk_index;
363
364   rxf = s->rx_fifo;
365   txf = s->tx_fifo;
366
367   if (!rxf || !txf)
368     return 0;
369
370   s->rx_fifo = 0;
371   s->tx_fifo = 0;
372
373   sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk);
374   if (session_alloc_fifos (sm, s))
375     return -1;
376
377   if (!svm_fifo_is_empty (rxf))
378     {
379       clib_memcpy_fast (s->rx_fifo->data, rxf->data, rxf->nitems);
380       s->rx_fifo->head = rxf->head;
381       s->rx_fifo->tail = rxf->tail;
382       s->rx_fifo->cursize = rxf->cursize;
383     }
384
385   if (!svm_fifo_is_empty (txf))
386     {
387       clib_memcpy_fast (s->tx_fifo->data, txf->data, txf->nitems);
388       s->tx_fifo->head = txf->head;
389       s->tx_fifo->tail = txf->tail;
390       s->tx_fifo->cursize = txf->cursize;
391     }
392
393   segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf);
394
395   return 0;
396 }
397
398 int
399 app_worker_connect_session (app_worker_t * app, session_endpoint_t * sep,
400                             u32 api_context)
401 {
402   int rv;
403
404   /* Make sure we have a segment manager for connects */
405   app_worker_alloc_connects_segment_manager (app);
406
407   if ((rv = session_open (app->wrk_index, sep, api_context)))
408     return rv;
409
410   return 0;
411 }
412
413 int
414 app_worker_alloc_connects_segment_manager (app_worker_t * app_wrk)
415 {
416   segment_manager_t *sm;
417
418   if (app_wrk->connects_seg_manager == APP_INVALID_SEGMENT_MANAGER_INDEX)
419     {
420       sm = app_worker_alloc_segment_manager (app_wrk);
421       if (sm == 0)
422         return -1;
423       app_wrk->connects_seg_manager = segment_manager_index (sm);
424     }
425   return 0;
426 }
427
428 segment_manager_t *
429 app_worker_get_connect_segment_manager (app_worker_t * app)
430 {
431   ASSERT (app->connects_seg_manager != (u32) ~ 0);
432   return segment_manager_get (app->connects_seg_manager);
433 }
434
435 segment_manager_t *
436 app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk)
437 {
438   if (app_wrk->connects_seg_manager == (u32) ~ 0)
439     app_worker_alloc_connects_segment_manager (app_wrk);
440   return segment_manager_get (app_wrk->connects_seg_manager);
441 }
442
443 segment_manager_t *
444 app_worker_get_listen_segment_manager (app_worker_t * app,
445                                        session_t * listener)
446 {
447   uword *smp;
448   smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
449   ASSERT (smp != 0);
450   return segment_manager_get (*smp);
451 }
452
453 session_t *
454 app_worker_first_listener (app_worker_t * app_wrk, u8 fib_proto,
455                            u8 transport_proto)
456 {
457   session_t *listener;
458   u64 handle;
459   u32 sm_index;
460   u8 sst;
461
462   sst = session_type_from_proto_and_ip (transport_proto,
463                                         fib_proto == FIB_PROTOCOL_IP4);
464
465   /* *INDENT-OFF* */
466    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
467      listener = listen_session_get_from_handle (handle);
468      if (listener->session_type == sst
469          && listener->enqueue_epoch != SESSION_PROXY_LISTENER_INDEX)
470        return listener;
471    }));
472   /* *INDENT-ON* */
473
474   return 0;
475 }
476
477 session_t *
478 app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
479                            u8 transport_proto)
480 {
481   session_t *listener;
482   u64 handle;
483   u32 sm_index;
484   u8 sst;
485
486   sst = session_type_from_proto_and_ip (transport_proto,
487                                         fib_proto == FIB_PROTOCOL_IP4);
488
489   /* *INDENT-OFF* */
490    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
491      listener = listen_session_get_from_handle (handle);
492      if (listener->session_type == sst
493          && listener->enqueue_epoch == SESSION_PROXY_LISTENER_INDEX)
494        return listener;
495    }));
496   /* *INDENT-ON* */
497
498   return 0;
499 }
500
501 /**
502  * Send an API message to the external app, to map new segment
503  */
504 int
505 app_worker_add_segment_notify (u32 app_wrk_index, u64 segment_handle)
506 {
507   app_worker_t *app_wrk = app_worker_get (app_wrk_index);
508   application_t *app = application_get (app_wrk->app_index);
509   return app->cb_fns.add_segment_callback (app_wrk->api_client_index,
510                                            segment_handle);
511 }
512
513 u8
514 app_worker_application_is_builtin (app_worker_t * app_wrk)
515 {
516   return app_wrk->app_is_builtin;
517 }
518
519 static inline int
520 app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
521 {
522   if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
523     {
524       clib_warning ("evt q full");
525       svm_msg_q_free_msg (mq, msg);
526       if (lock)
527         svm_msg_q_unlock (mq);
528       return -1;
529     }
530
531   if (lock)
532     {
533       svm_msg_q_add_and_unlock (mq, msg);
534       return 0;
535     }
536
537   /* Even when not locking the ring, we must wait for queue mutex */
538   if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
539     {
540       clib_warning ("msg q add returned");
541       return -1;
542     }
543   return 0;
544 }
545
546 static inline int
547 app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock)
548 {
549   session_event_t *evt;
550   svm_msg_q_msg_t msg;
551   svm_msg_q_t *mq;
552
553   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY
554                      && s->session_state != SESSION_STATE_LISTENING))
555     {
556       /* Session is closed so app will never clean up. Flush rx fifo */
557       if (s->session_state == SESSION_STATE_CLOSED)
558         svm_fifo_dequeue_drop_all (s->rx_fifo);
559       return 0;
560     }
561
562   if (app_worker_application_is_builtin (app_wrk))
563     {
564       application_t *app = application_get (app_wrk->app_index);
565       return app->cb_fns.builtin_app_rx_callback (s);
566     }
567
568   if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo))
569     return 0;
570
571   mq = app_wrk->event_queue;
572   if (lock)
573     svm_msg_q_lock (mq);
574
575   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
576     {
577       clib_warning ("evt q rings full");
578       if (lock)
579         svm_msg_q_unlock (mq);
580       return -1;
581     }
582
583   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
584   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
585
586   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
587   evt->fifo = s->rx_fifo;
588   evt->event_type = FIFO_EVENT_APP_RX;
589
590   (void) svm_fifo_set_event (s->rx_fifo);
591
592   if (app_enqueue_evt (mq, &msg, lock))
593     return -1;
594   return 0;
595 }
596
597 static inline int
598 app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock)
599 {
600   svm_msg_q_t *mq;
601   session_event_t *evt;
602   svm_msg_q_msg_t msg;
603
604   if (app_worker_application_is_builtin (app_wrk))
605     return 0;
606
607   mq = app_wrk->event_queue;
608   if (lock)
609     svm_msg_q_lock (mq);
610
611   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
612     {
613       clib_warning ("evt q rings full");
614       if (lock)
615         svm_msg_q_unlock (mq);
616       return -1;
617     }
618
619   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
620   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
621
622   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
623   evt->event_type = FIFO_EVENT_APP_TX;
624   evt->fifo = s->tx_fifo;
625
626   return app_enqueue_evt (mq, &msg, lock);
627 }
628
629 /* *INDENT-OFF* */
630 typedef int (app_send_evt_handler_fn) (app_worker_t *app,
631                                        session_t *s,
632                                        u8 lock);
633 static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
634     app_send_io_evt_rx,
635     0,
636     app_send_io_evt_tx,
637 };
638 /* *INDENT-ON* */
639
640 /**
641  * Send event to application
642  *
643  * Logic from queue perspective is non-blocking. If there's
644  * not enough space to enqueue a message, we return.
645  */
646 int
647 app_worker_send_event (app_worker_t * app, session_t * s, u8 evt_type)
648 {
649   ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
650   return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
651 }
652
653 /**
654  * Send event to application
655  *
656  * Logic from queue perspective is blocking. However, if queue is full,
657  * we return.
658  */
659 int
660 app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
661                                 u8 evt_type)
662 {
663   return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
664 }
665
666 segment_manager_t *
667 app_worker_get_local_segment_manager (app_worker_t * app_worker)
668 {
669   return segment_manager_get (app_worker->local_segment_manager);
670 }
671
672 segment_manager_t *
673 app_worker_get_local_segment_manager_w_session (app_worker_t * app_wrk,
674                                                 local_session_t * ls)
675 {
676   session_t *listener;
677   if (application_local_session_listener_has_transport (ls))
678     {
679       listener = listen_session_get (ls->listener_index);
680       return app_worker_get_listen_segment_manager (app_wrk, listener);
681     }
682   return segment_manager_get (app_wrk->local_segment_manager);
683 }
684
685 int
686 app_worker_local_session_cleanup (app_worker_t * client_wrk,
687                                   app_worker_t * server_wrk,
688                                   local_session_t * ls)
689 {
690   svm_fifo_segment_private_t *seg;
691   session_t *listener;
692   segment_manager_t *sm;
693   u64 client_key;
694   u8 has_transport;
695
696   /* Retrieve listener transport type as it is the one that decides where
697    * the fifos are allocated */
698   has_transport = application_local_session_listener_has_transport (ls);
699   if (!has_transport)
700     sm = app_worker_get_local_segment_manager_w_session (server_wrk, ls);
701   else
702     {
703       listener = listen_session_get (ls->listener_index);
704       sm = app_worker_get_listen_segment_manager (server_wrk, listener);
705     }
706
707   seg = segment_manager_get_segment (sm, ls->svm_segment_index);
708   if (client_wrk)
709     {
710       client_key = application_client_local_connect_key (ls);
711       hash_unset (client_wrk->local_connects, client_key);
712     }
713
714   if (!has_transport)
715     {
716       application_t *server = application_get (server_wrk->app_index);
717       u64 segment_handle = segment_manager_segment_handle (sm, seg);
718       server->cb_fns.del_segment_callback (server_wrk->api_client_index,
719                                            segment_handle);
720       if (client_wrk)
721         {
722           application_t *client = application_get (client_wrk->app_index);
723           client->cb_fns.del_segment_callback (client_wrk->api_client_index,
724                                                segment_handle);
725         }
726       segment_manager_del_segment (sm, seg);
727     }
728
729   app_worker_local_session_free (server_wrk, ls);
730
731   return 0;
732 }
733
734 static void
735 application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq)
736 {
737   int fd;
738
739   /*
740    * segment manager initializes only the producer eventds, since vpp is
741    * typically the producer. But for local sessions, we also pass to the
742    * apps the mqs they listen on for events from peer apps, so they are also
743    * consumer fds.
744    */
745   fd = svm_msg_q_get_producer_eventfd (sq);
746   svm_msg_q_set_consumer_eventfd (sq, fd);
747   fd = svm_msg_q_get_producer_eventfd (cq);
748   svm_msg_q_set_consumer_eventfd (cq, fd);
749 }
750
751 int
752 app_worker_local_session_connect (app_worker_t * client_wrk,
753                                   app_worker_t * server_wrk,
754                                   local_session_t * ll, u32 opaque)
755 {
756   u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
757   u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index;
758   segment_manager_properties_t *props, *cprops;
759   int rv, has_transport, seg_index;
760   svm_fifo_segment_private_t *seg;
761   application_t *server, *client;
762   segment_manager_t *sm;
763   local_session_t *ls;
764   svm_msg_q_t *sq, *cq;
765   u64 segment_handle;
766
767   ls = app_worker_local_session_alloc (server_wrk);
768   server = application_get (server_wrk->app_index);
769   client = application_get (client_wrk->app_index);
770
771   props = application_segment_manager_properties (server);
772   cprops = application_segment_manager_properties (client);
773   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
774   evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
775   round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
776   round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
777   seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
778
779   has_transport = session_has_transport ((session_t *) ll);
780   if (!has_transport)
781     {
782       /* Local sessions don't have backing transport */
783       ls->port = ll->port;
784       sm = app_worker_get_local_segment_manager (server_wrk);
785     }
786   else
787     {
788       session_t *sl = (session_t *) ll;
789       transport_connection_t *tc;
790       tc = listen_session_get_transport (sl);
791       ls->port = tc->lcl_port;
792       sm = app_worker_get_listen_segment_manager (server_wrk, sl);
793     }
794
795   seg_index = segment_manager_add_segment (sm, seg_size);
796   if (seg_index < 0)
797     {
798       clib_warning ("failed to add new cut-through segment");
799       return seg_index;
800     }
801   seg = segment_manager_get_segment_w_lock (sm, seg_index);
802   sq = segment_manager_alloc_queue (seg, props);
803   cq = segment_manager_alloc_queue (seg, cprops);
804
805   if (props->use_mq_eventfd)
806     application_local_session_fix_eventds (sq, cq);
807
808   ls->server_evt_q = pointer_to_uword (sq);
809   ls->client_evt_q = pointer_to_uword (cq);
810   rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
811                                         props->tx_fifo_size,
812                                         &ls->rx_fifo, &ls->tx_fifo);
813   if (rv)
814     {
815       clib_warning ("failed to add fifos in cut-through segment");
816       segment_manager_segment_reader_unlock (sm);
817       goto failed;
818     }
819   sm_index = segment_manager_index (sm);
820   ls->rx_fifo->ct_session_index = ls->session_index;
821   ls->tx_fifo->ct_session_index = ls->session_index;
822   ls->rx_fifo->segment_manager = sm_index;
823   ls->tx_fifo->segment_manager = sm_index;
824   ls->rx_fifo->segment_index = seg_index;
825   ls->tx_fifo->segment_index = seg_index;
826   ls->svm_segment_index = seg_index;
827   ls->listener_index = ll->session_index;
828   ls->client_wrk_index = client_wrk->wrk_index;
829   ls->client_opaque = opaque;
830   ls->listener_session_type = ll->session_type;
831   ls->session_state = SESSION_STATE_READY;
832
833   segment_handle = segment_manager_segment_handle (sm, seg);
834   if ((rv = server->cb_fns.add_segment_callback (server_wrk->api_client_index,
835                                                  segment_handle)))
836     {
837       clib_warning ("failed to notify server of new segment");
838       segment_manager_segment_reader_unlock (sm);
839       goto failed;
840     }
841   segment_manager_segment_reader_unlock (sm);
842   if ((rv = server->cb_fns.session_accept_callback ((session_t *) ls)))
843     {
844       clib_warning ("failed to send accept cut-through notify to server");
845       goto failed;
846     }
847   if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN)
848     app_worker_local_session_connect_notify (ls);
849
850   return 0;
851
852 failed:
853   if (!has_transport)
854     segment_manager_del_segment (sm, seg);
855   return rv;
856 }
857
858 int
859 app_worker_local_session_connect_notify (local_session_t * ls)
860 {
861   svm_fifo_segment_private_t *seg;
862   app_worker_t *client_wrk, *server_wrk;
863   segment_manager_t *sm;
864   application_t *client;
865   int rv, is_fail = 0;
866   u64 segment_handle;
867   u64 client_key;
868
869   client_wrk = app_worker_get (ls->client_wrk_index);
870   server_wrk = app_worker_get (ls->app_wrk_index);
871   client = application_get (client_wrk->app_index);
872
873   sm = app_worker_get_local_segment_manager_w_session (server_wrk, ls);
874   seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index);
875   segment_handle = segment_manager_segment_handle (sm, seg);
876   if ((rv = client->cb_fns.add_segment_callback (client_wrk->api_client_index,
877                                                  segment_handle)))
878     {
879       clib_warning ("failed to notify client %u of new segment",
880                     ls->client_wrk_index);
881       segment_manager_segment_reader_unlock (sm);
882       app_worker_local_session_disconnect (ls->client_wrk_index, ls);
883       is_fail = 1;
884     }
885   else
886     {
887       segment_manager_segment_reader_unlock (sm);
888     }
889
890   client->cb_fns.session_connected_callback (client_wrk->wrk_index,
891                                              ls->client_opaque,
892                                              (session_t *) ls, is_fail);
893
894   client_key = application_client_local_connect_key (ls);
895   hash_set (client_wrk->local_connects, client_key, client_key);
896   return 0;
897 }
898
899 int
900 app_worker_local_session_disconnect (u32 app_index, local_session_t * ls)
901 {
902   app_worker_t *client_wrk, *server_wrk;
903   u8 is_server = 0, is_client = 0;
904   application_t *app;
905
906   app = application_get_if_valid (app_index);
907   if (!app)
908     return 0;
909
910   client_wrk = app_worker_get_if_valid (ls->client_wrk_index);
911   server_wrk = app_worker_get (ls->app_wrk_index);
912
913   if (server_wrk->app_index == app_index)
914     is_server = 1;
915   else if (client_wrk && client_wrk->app_index == app_index)
916     is_client = 1;
917
918   if (!is_server && !is_client)
919     {
920       clib_warning ("app %u is neither client nor server for session 0x%lx",
921                     app_index, application_local_session_handle (ls));
922       return VNET_API_ERROR_INVALID_VALUE;
923     }
924
925   if (ls->session_state == SESSION_STATE_CLOSED)
926     return app_worker_local_session_cleanup (client_wrk, server_wrk, ls);
927
928   if (app_index == ls->client_wrk_index)
929     {
930       mq_send_local_session_disconnected_cb (ls->app_wrk_index, ls);
931     }
932   else
933     {
934       if (!client_wrk)
935         {
936           return app_worker_local_session_cleanup (client_wrk, server_wrk,
937                                                    ls);
938         }
939       else if (ls->session_state < SESSION_STATE_READY)
940         {
941           application_t *client = application_get (client_wrk->app_index);
942           client->cb_fns.session_connected_callback (client_wrk->wrk_index,
943                                                      ls->client_opaque,
944                                                      (session_t *) ls,
945                                                      1 /* is_fail */ );
946           ls->session_state = SESSION_STATE_CLOSED;
947           return app_worker_local_session_cleanup (client_wrk, server_wrk,
948                                                    ls);
949         }
950       else
951         {
952           mq_send_local_session_disconnected_cb (client_wrk->wrk_index, ls);
953         }
954     }
955
956   ls->session_state = SESSION_STATE_CLOSED;
957
958   return 0;
959 }
960
961 int
962 app_worker_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index)
963 {
964   app_worker_t *app_wrk;
965   local_session_t *ls;
966   app_wrk = app_worker_get (app_wrk_index);
967   ls = app_worker_get_local_session (app_wrk, ls_index);
968   return app_worker_local_session_disconnect (app_wrk_index, ls);
969 }
970
971 u8 *
972 format_app_worker_listener (u8 * s, va_list * args)
973 {
974   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
975   u64 handle = va_arg (*args, u64);
976   u32 sm_index = va_arg (*args, u32);
977   int verbose = va_arg (*args, int);
978   session_t *listener;
979   const u8 *app_name;
980   u8 *str;
981
982   if (!app_wrk)
983     {
984       if (verbose)
985         s = format (s, "%-40s%-25s%=10s%-15s%-15s%-10s", "Connection", "App",
986                     "Wrk", "API Client", "ListenerID", "SegManager");
987       else
988         s = format (s, "%-40s%-25s%=10s", "Connection", "App", "Wrk");
989
990       return s;
991     }
992
993   app_name = application_name_from_index (app_wrk->app_index);
994   listener = listen_session_get_from_handle (handle);
995   str = format (0, "%U", format_stream_session, listener, verbose);
996
997   if (verbose)
998     {
999       char buf[32];
1000       sprintf (buf, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index);
1001       s = format (s, "%-40s%-25s%=10s%-15u%-15u%-10u", str, app_name,
1002                   buf, app_wrk->api_client_index, handle, sm_index);
1003     }
1004   else
1005     s = format (s, "%-40s%-25s%=10u", str, app_name, app_wrk->wrk_map_index);
1006
1007   return s;
1008 }
1009
1010 u8 *
1011 format_app_worker (u8 * s, va_list * args)
1012 {
1013   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
1014   u32 indent = 1;
1015
1016   s = format (s, "%U wrk-index %u app-index %u map-index %u "
1017               "api-client-index %d\n", format_white_space, indent,
1018               app_wrk->wrk_index, app_wrk->app_index, app_wrk->wrk_map_index,
1019               app_wrk->api_client_index);
1020   return s;
1021 }
1022
1023 void
1024 app_worker_format_connects (app_worker_t * app_wrk, int verbose)
1025 {
1026   svm_fifo_segment_private_t *fifo_segment;
1027   vlib_main_t *vm = vlib_get_main ();
1028   segment_manager_t *sm;
1029   const u8 *app_name;
1030   u8 *s = 0;
1031
1032   /* Header */
1033   if (!app_wrk)
1034     {
1035       if (verbose)
1036         vlib_cli_output (vm, "%-40s%-20s%-15s%-10s", "Connection", "App",
1037                          "API Client", "SegManager");
1038       else
1039         vlib_cli_output (vm, "%-40s%-20s", "Connection", "App");
1040       return;
1041     }
1042
1043   if (app_wrk->connects_seg_manager == (u32) ~ 0)
1044     return;
1045
1046   app_name = application_name_from_index (app_wrk->app_index);
1047
1048   /* Across all fifo segments */
1049   sm = segment_manager_get (app_wrk->connects_seg_manager);
1050
1051   /* *INDENT-OFF* */
1052   segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({
1053     svm_fifo_t *fifo;
1054     u8 *str;
1055
1056     fifo = svm_fifo_segment_get_fifo_list (fifo_segment);
1057     while (fifo)
1058       {
1059         u32 session_index, thread_index;
1060         session_t *session;
1061
1062         session_index = fifo->master_session_index;
1063         thread_index = fifo->master_thread_index;
1064
1065         session = session_get (session_index, thread_index);
1066         str = format (0, "%U", format_stream_session, session, verbose);
1067
1068         if (verbose)
1069           s = format (s, "%-40s%-20s%-15u%-10u", str, app_name,
1070                       app_wrk->api_client_index, app_wrk->connects_seg_manager);
1071         else
1072           s = format (s, "%-40s%-20s", str, app_name);
1073
1074         vlib_cli_output (vm, "%v", s);
1075         vec_reset_length (s);
1076         vec_free (str);
1077
1078         fifo = fifo->next;
1079       }
1080     vec_free (s);
1081   }));
1082   /* *INDENT-ON* */
1083 }
1084
1085 void
1086 app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose)
1087 {
1088   vlib_main_t *vm = vlib_get_main ();
1089   app_worker_t *client_wrk;
1090   local_session_t *ls;
1091   transport_proto_t tp;
1092   u8 *conn = 0;
1093
1094   /* Header */
1095   if (app_wrk == 0)
1096     {
1097       vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp",
1098                        "ClientApp");
1099       return;
1100     }
1101
1102   if (!pool_elts (app_wrk->local_sessions)
1103       && !pool_elts (app_wrk->local_connects))
1104     return;
1105
1106   /* *INDENT-OFF* */
1107   pool_foreach (ls, app_wrk->local_sessions, ({
1108     tp = session_type_transport_proto(ls->listener_session_type);
1109     conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
1110                    ls->port);
1111     client_wrk = app_worker_get (ls->client_wrk_index);
1112     vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_index,
1113                      client_wrk->app_index);
1114     vec_reset_length (conn);
1115   }));
1116   /* *INDENT-ON* */
1117
1118   vec_free (conn);
1119 }
1120
1121 void
1122 app_worker_format_local_connects (app_worker_t * app, int verbose)
1123 {
1124   vlib_main_t *vm = vlib_get_main ();
1125   u32 app_wrk_index, session_index;
1126   app_worker_t *server_wrk;
1127   local_session_t *ls;
1128   u64 client_key;
1129   u64 value;
1130
1131   /* Header */
1132   if (app == 0)
1133     {
1134       if (verbose)
1135         vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App",
1136                          "Peer App", "SegManager");
1137       else
1138         vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App",
1139                          "Peer App");
1140       return;
1141     }
1142
1143   if (!app->local_connects)
1144     return;
1145
1146   /* *INDENT-OFF* */
1147   hash_foreach (client_key, value, app->local_connects, ({
1148     application_client_local_connect_key_parse (client_key, &app_wrk_index,
1149                                                 &session_index);
1150     server_wrk = app_worker_get (app_wrk_index);
1151     ls = app_worker_get_local_session (server_wrk, session_index);
1152     vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_wrk_index,
1153                      ls->client_wrk_index);
1154   }));
1155   /* *INDENT-ON* */
1156 }
1157
1158 /*
1159  * fd.io coding-style-patch-verification: ON
1160  *
1161  * Local Variables:
1162  * eval: (c-set-style "gnu")
1163  * End:
1164  */