session tcp udp: consolidate transport snd apis
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25
26 session_main_t session_main;
27
28 static inline int
29 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
30                             session_evt_type_t evt_type)
31 {
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35
36   mq = session_main_get_vpp_event_queue (thread_index);
37   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
38     return -1;
39   if (PREDICT_FALSE (svm_msg_q_is_full (mq)
40                      || svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
41     {
42       svm_msg_q_unlock (mq);
43       return -2;
44     }
45   switch (evt_type)
46     {
47     case SESSION_CTRL_EVT_RPC:
48       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
49       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
50       evt->rpc_args.fp = data;
51       evt->rpc_args.arg = args;
52       break;
53     case SESSION_IO_EVT_RX:
54     case SESSION_IO_EVT_TX:
55     case SESSION_IO_EVT_TX_FLUSH:
56     case SESSION_IO_EVT_BUILTIN_RX:
57       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
58       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59       evt->session_index = *(u32 *) data;
60       break;
61     case SESSION_IO_EVT_BUILTIN_TX:
62     case SESSION_CTRL_EVT_CLOSE:
63     case SESSION_CTRL_EVT_RESET:
64       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
65       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
66       evt->session_handle = session_handle ((session_t *) data);
67       break;
68     default:
69       clib_warning ("evt unhandled!");
70       svm_msg_q_unlock (mq);
71       return -1;
72     }
73   evt->event_type = evt_type;
74
75   svm_msg_q_add_and_unlock (mq, &msg);
76   return 0;
77 }
78
79 int
80 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
81 {
82   return session_send_evt_to_thread (&f->master_session_index, 0,
83                                      f->master_thread_index, evt_type);
84 }
85
86 int
87 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
88                                       session_evt_type_t evt_type)
89 {
90   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
91 }
92
93 int
94 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
95 {
96   /* only events supported are disconnect and reset */
97   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
98           || evt_type == SESSION_CTRL_EVT_RESET);
99   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
100 }
101
102 void
103 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
104                                       void *rpc_args)
105 {
106   session_send_evt_to_thread (fp, rpc_args, thread_index,
107                               SESSION_CTRL_EVT_RPC);
108 }
109
110 void
111 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
112 {
113   if (thread_index != vlib_get_thread_index ())
114     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
115   else
116     {
117       void (*fnp) (void *) = fp;
118       fnp (rpc_args);
119     }
120 }
121
122 void
123 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
124 {
125   session_t *s;
126
127   s = session_get (tc->s_index, tc->thread_index);
128   ASSERT (s->thread_index == vlib_get_thread_index ());
129   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
130   if (!(s->flags & SESSION_F_CUSTOM_TX))
131     {
132       s->flags |= SESSION_F_CUSTOM_TX;
133       if (svm_fifo_set_event (s->tx_fifo))
134         {
135           session_worker_t *wrk;
136           session_evt_elt_t *elt;
137           wrk = session_main_get_worker (tc->thread_index);
138           if (has_prio)
139             elt = session_evt_alloc_new (wrk);
140           else
141             elt = session_evt_alloc_old (wrk);
142           elt->evt.session_index = tc->s_index;
143           elt->evt.event_type = SESSION_IO_EVT_TX;
144         }
145     }
146 }
147
148 void
149 sesssion_reschedule_tx (transport_connection_t * tc)
150 {
151   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
152   session_evt_elt_t *elt;
153
154   ASSERT (tc->thread_index == vlib_get_thread_index ());
155
156   elt = session_evt_alloc_new (wrk);
157   elt->evt.session_index = tc->s_index;
158   elt->evt.event_type = SESSION_IO_EVT_TX;
159 }
160
161 static void
162 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
163 {
164   u32 thread_index = vlib_get_thread_index ();
165   session_evt_elt_t *elt;
166   session_worker_t *wrk;
167
168   /* If we are in the handler thread, or being called with the worker barrier
169    * held, just append a new event to pending disconnects vector. */
170   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
171     {
172       wrk = session_main_get_worker (s->thread_index);
173       elt = session_evt_alloc_ctrl (wrk);
174       clib_memset (&elt->evt, 0, sizeof (session_event_t));
175       elt->evt.session_handle = session_handle (s);
176       elt->evt.event_type = evt;
177     }
178   else
179     session_send_ctrl_evt_to_thread (s, evt);
180 }
181
182 session_t *
183 session_alloc (u32 thread_index)
184 {
185   session_worker_t *wrk = &session_main.wrk[thread_index];
186   session_t *s;
187   u8 will_expand = 0;
188   pool_get_aligned_will_expand (wrk->sessions, will_expand,
189                                 CLIB_CACHE_LINE_BYTES);
190   /* If we have peekers, let them finish */
191   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
192     {
193       clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
194       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
195       clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
196     }
197   else
198     {
199       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
200     }
201   clib_memset (s, 0, sizeof (*s));
202   s->session_index = s - wrk->sessions;
203   s->thread_index = thread_index;
204   s->app_index = APP_INVALID_INDEX;
205   return s;
206 }
207
208 void
209 session_free (session_t * s)
210 {
211   if (CLIB_DEBUG)
212     {
213       u8 thread_index = s->thread_index;
214       clib_memset (s, 0xFA, sizeof (*s));
215       pool_put (session_main.wrk[thread_index].sessions, s);
216       return;
217     }
218   SESSION_EVT (SESSION_EVT_FREE, s);
219   pool_put (session_main.wrk[s->thread_index].sessions, s);
220 }
221
222 u8
223 session_is_valid (u32 si, u8 thread_index)
224 {
225   session_t *s;
226   transport_connection_t *tc;
227
228   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
229
230   if (!s)
231     return 1;
232
233   if (s->thread_index != thread_index || s->session_index != si)
234     return 0;
235
236   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
237       || s->session_state <= SESSION_STATE_LISTENING)
238     return 1;
239
240   tc = session_get_transport (s);
241   if (s->connection_index != tc->c_index
242       || s->thread_index != tc->thread_index || tc->s_index != si)
243     return 0;
244
245   return 1;
246 }
247
248 static void
249 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
250 {
251   app_worker_t *app_wrk;
252
253   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
254   if (!app_wrk)
255     return;
256   app_worker_cleanup_notify (app_wrk, s, ntf);
257 }
258
259 void
260 session_free_w_fifos (session_t * s)
261 {
262   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
263   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
264   session_free (s);
265 }
266
267 /**
268  * Cleans up session and lookup table.
269  *
270  * Transport connection must still be valid.
271  */
272 static void
273 session_delete (session_t * s)
274 {
275   int rv;
276
277   /* Delete from the main lookup table. */
278   if ((rv = session_lookup_del_session (s)))
279     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
280
281   session_free_w_fifos (s);
282 }
283
284 static session_t *
285 session_alloc_for_connection (transport_connection_t * tc)
286 {
287   session_t *s;
288   u32 thread_index = tc->thread_index;
289
290   ASSERT (thread_index == vlib_get_thread_index ()
291           || transport_protocol_is_cl (tc->proto));
292
293   s = session_alloc (thread_index);
294   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
295   s->session_state = SESSION_STATE_CLOSED;
296
297   /* Attach transport to session and vice versa */
298   s->connection_index = tc->c_index;
299   tc->s_index = s->session_index;
300   return s;
301 }
302
303 /**
304  * Discards bytes from buffer chain
305  *
306  * It discards n_bytes_to_drop starting at first buffer after chain_b
307  */
308 always_inline void
309 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
310                                      vlib_buffer_t ** chain_b,
311                                      u32 n_bytes_to_drop)
312 {
313   vlib_buffer_t *next = *chain_b;
314   u32 to_drop = n_bytes_to_drop;
315   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
316   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
317     {
318       next = vlib_get_buffer (vm, next->next_buffer);
319       if (next->current_length > to_drop)
320         {
321           vlib_buffer_advance (next, to_drop);
322           to_drop = 0;
323         }
324       else
325         {
326           to_drop -= next->current_length;
327           next->current_length = 0;
328         }
329     }
330   *chain_b = next;
331
332   if (to_drop == 0)
333     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
334 }
335
336 /**
337  * Enqueue buffer chain tail
338  */
339 always_inline int
340 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
341                             u32 offset, u8 is_in_order)
342 {
343   vlib_buffer_t *chain_b;
344   u32 chain_bi, len, diff;
345   vlib_main_t *vm = vlib_get_main ();
346   u8 *data;
347   u32 written = 0;
348   int rv = 0;
349
350   if (is_in_order && offset)
351     {
352       diff = offset - b->current_length;
353       if (diff > b->total_length_not_including_first_buffer)
354         return 0;
355       chain_b = b;
356       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
357       chain_bi = vlib_get_buffer_index (vm, chain_b);
358     }
359   else
360     chain_bi = b->next_buffer;
361
362   do
363     {
364       chain_b = vlib_get_buffer (vm, chain_bi);
365       data = vlib_buffer_get_current (chain_b);
366       len = chain_b->current_length;
367       if (!len)
368         continue;
369       if (is_in_order)
370         {
371           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
372           if (rv == len)
373             {
374               written += rv;
375             }
376           else if (rv < len)
377             {
378               return (rv > 0) ? (written + rv) : written;
379             }
380           else if (rv > len)
381             {
382               written += rv;
383
384               /* written more than what was left in chain */
385               if (written > b->total_length_not_including_first_buffer)
386                 return written;
387
388               /* drop the bytes that have already been delivered */
389               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
390             }
391         }
392       else
393         {
394           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
395           if (rv)
396             {
397               clib_warning ("failed to enqueue multi-buffer seg");
398               return -1;
399             }
400           offset += len;
401         }
402     }
403   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
404           ? chain_b->next_buffer : 0));
405
406   if (is_in_order)
407     return written;
408
409   return 0;
410 }
411
412 void
413 session_fifo_tuning (session_t * s, svm_fifo_t * f,
414                      session_ft_action_t act, u32 len)
415 {
416   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
417     {
418       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
419       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
420       if (CLIB_ASSERT_ENABLE)
421         {
422           segment_manager_t *sm;
423           sm = segment_manager_get (f->segment_manager);
424           ASSERT (f->size >= 4096);
425           ASSERT (f->size <= sm->max_fifo_size);
426         }
427     }
428 }
429
430 /*
431  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
432  * event but on request can queue notification events for later delivery by
433  * calling stream_server_flush_enqueue_events().
434  *
435  * @param tc Transport connection which is to be enqueued data
436  * @param b Buffer to be enqueued
437  * @param offset Offset at which to start enqueueing if out-of-order
438  * @param queue_event Flag to indicate if peer is to be notified or if event
439  *                    is to be queued. The former is useful when more data is
440  *                    enqueued and only one event is to be generated.
441  * @param is_in_order Flag to indicate if data is in order
442  * @return Number of bytes enqueued or a negative value if enqueueing failed.
443  */
444 int
445 session_enqueue_stream_connection (transport_connection_t * tc,
446                                    vlib_buffer_t * b, u32 offset,
447                                    u8 queue_event, u8 is_in_order)
448 {
449   session_t *s;
450   int enqueued = 0, rv, in_order_off;
451
452   s = session_get (tc->s_index, tc->thread_index);
453
454   if (is_in_order)
455     {
456       enqueued = svm_fifo_enqueue (s->rx_fifo,
457                                    b->current_length,
458                                    vlib_buffer_get_current (b));
459       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
460                          && enqueued >= 0))
461         {
462           in_order_off = enqueued > b->current_length ? enqueued : 0;
463           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
464           if (rv > 0)
465             enqueued += rv;
466         }
467     }
468   else
469     {
470       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
471                                          b->current_length,
472                                          vlib_buffer_get_current (b));
473       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
474         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
475       /* if something was enqueued, report even this as success for ooo
476        * segment handling */
477       return rv;
478     }
479
480   if (queue_event)
481     {
482       /* Queue RX event on this fifo. Eventually these will need to be flushed
483        * by calling stream_server_flush_enqueue_events () */
484       session_worker_t *wrk;
485
486       wrk = session_main_get_worker (s->thread_index);
487       if (!(s->flags & SESSION_F_RX_EVT))
488         {
489           s->flags |= SESSION_F_RX_EVT;
490           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
491         }
492
493       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
494     }
495
496   return enqueued;
497 }
498
499 int
500 session_enqueue_dgram_connection (session_t * s,
501                                   session_dgram_hdr_t * hdr,
502                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
503 {
504   int enqueued = 0, rv, in_order_off;
505
506   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
507           >= b->current_length + sizeof (*hdr));
508
509   svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
510   enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
511                                vlib_buffer_get_current (b));
512   if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
513     {
514       in_order_off = enqueued > b->current_length ? enqueued : 0;
515       rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
516       if (rv > 0)
517         enqueued += rv;
518     }
519   if (queue_event)
520     {
521       /* Queue RX event on this fifo. Eventually these will need to be flushed
522        * by calling stream_server_flush_enqueue_events () */
523       session_worker_t *wrk;
524
525       wrk = session_main_get_worker (s->thread_index);
526       if (!(s->flags & SESSION_F_RX_EVT))
527         {
528           s->flags |= SESSION_F_RX_EVT;
529           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
530         }
531
532       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
533     }
534   return enqueued;
535 }
536
537 int
538 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
539                             u32 offset, u32 max_bytes)
540 {
541   session_t *s = session_get (tc->s_index, tc->thread_index);
542   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
543 }
544
545 u32
546 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
547 {
548   session_t *s = session_get (tc->s_index, tc->thread_index);
549   u32 rv;
550
551   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
552   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
553
554   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
555     session_dequeue_notify (s);
556
557   return rv;
558 }
559
560 static inline int
561 session_notify_subscribers (u32 app_index, session_t * s,
562                             svm_fifo_t * f, session_evt_type_t evt_type)
563 {
564   app_worker_t *app_wrk;
565   application_t *app;
566   int i;
567
568   app = application_get (app_index);
569   if (!app)
570     return -1;
571
572   for (i = 0; i < f->n_subscribers; i++)
573     {
574       app_wrk = application_get_worker (app, f->subscribers[i]);
575       if (!app_wrk)
576         continue;
577       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
578         return -1;
579     }
580
581   return 0;
582 }
583
584 /**
585  * Notify session peer that new data has been enqueued.
586  *
587  * @param s     Stream session for which the event is to be generated.
588  * @param lock  Flag to indicate if call should lock message queue.
589  *
590  * @return 0 on success or negative number if failed to send notification.
591  */
592 static inline int
593 session_enqueue_notify_inline (session_t * s)
594 {
595   app_worker_t *app_wrk;
596   u32 session_index;
597   u8 n_subscribers;
598
599   session_index = s->session_index;
600   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
601
602   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
603   if (PREDICT_FALSE (!app_wrk))
604     {
605       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
606       return 0;
607     }
608
609   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
610
611   s->flags &= ~SESSION_F_RX_EVT;
612   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
613                                                      SESSION_IO_EVT_RX)))
614     return -1;
615
616   if (PREDICT_FALSE (n_subscribers))
617     {
618       s = session_get (session_index, vlib_get_thread_index ());
619       return session_notify_subscribers (app_wrk->app_index, s,
620                                          s->rx_fifo, SESSION_IO_EVT_RX);
621     }
622
623   return 0;
624 }
625
626 int
627 session_enqueue_notify (session_t * s)
628 {
629   return session_enqueue_notify_inline (s);
630 }
631
632 static void
633 session_enqueue_notify_rpc (void *arg)
634 {
635   u32 session_index = pointer_to_uword (arg);
636   session_t *s;
637
638   s = session_get_if_valid (session_index, vlib_get_thread_index ());
639   if (!s)
640     return;
641
642   session_enqueue_notify (s);
643 }
644
645 /**
646  * Like session_enqueue_notify, but can be called from a thread that does not
647  * own the session.
648  */
649 void
650 session_enqueue_notify_thread (session_handle_t sh)
651 {
652   u32 thread_index = session_thread_from_handle (sh);
653   u32 session_index = session_index_from_handle (sh);
654
655   /*
656    * Pass session index (u32) as opposed to handle (u64) in case pointers
657    * are not 64-bit.
658    */
659   session_send_rpc_evt_to_thread (thread_index,
660                                   session_enqueue_notify_rpc,
661                                   uword_to_pointer (session_index, void *));
662 }
663
664 int
665 session_dequeue_notify (session_t * s)
666 {
667   app_worker_t *app_wrk;
668
669   svm_fifo_clear_deq_ntf (s->tx_fifo);
670
671   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
672   if (PREDICT_FALSE (!app_wrk))
673     return -1;
674
675   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
676                                                      SESSION_IO_EVT_TX)))
677     return -1;
678
679   if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
680     return session_notify_subscribers (app_wrk->app_index, s,
681                                        s->tx_fifo, SESSION_IO_EVT_TX);
682
683   return 0;
684 }
685
686 /**
687  * Flushes queue of sessions that are to be notified of new data
688  * enqueued events.
689  *
690  * @param thread_index Thread index for which the flush is to be performed.
691  * @return 0 on success or a positive number indicating the number of
692  *         failures due to API queue being full.
693  */
694 int
695 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
696 {
697   session_worker_t *wrk = session_main_get_worker (thread_index);
698   session_t *s;
699   int i, errors = 0;
700   u32 *indices;
701
702   indices = wrk->session_to_enqueue[transport_proto];
703
704   for (i = 0; i < vec_len (indices); i++)
705     {
706       s = session_get_if_valid (indices[i], thread_index);
707       if (PREDICT_FALSE (!s))
708         {
709           errors++;
710           continue;
711         }
712
713       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
714                            0 /* TODO/not needed */ );
715
716       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
717         errors++;
718     }
719
720   vec_reset_length (indices);
721   wrk->session_to_enqueue[transport_proto] = indices;
722
723   return errors;
724 }
725
726 int
727 session_main_flush_all_enqueue_events (u8 transport_proto)
728 {
729   vlib_thread_main_t *vtm = vlib_get_thread_main ();
730   int i, errors = 0;
731   for (i = 0; i < 1 + vtm->n_threads; i++)
732     errors += session_main_flush_enqueue_events (transport_proto, i);
733   return errors;
734 }
735
736 static inline int
737 session_stream_connect_notify_inline (transport_connection_t * tc, u8 is_fail,
738                                       session_state_t opened_state)
739 {
740   u32 opaque = 0, new_ti, new_si;
741   app_worker_t *app_wrk;
742   session_t *s = 0;
743   u64 ho_handle;
744
745   /*
746    * Find connection handle and cleanup half-open table
747    */
748   ho_handle = session_lookup_half_open_handle (tc);
749   if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
750     {
751       SESSION_DBG ("half-open was removed!");
752       return -1;
753     }
754   session_lookup_del_half_open (tc);
755
756   /* Get the app's index from the handle we stored when opening connection
757    * and the opaque (api_context for external apps) from transport session
758    * index */
759   app_wrk = app_worker_get_if_valid (ho_handle >> 32);
760   if (!app_wrk)
761     return -1;
762
763   opaque = tc->s_index;
764
765   if (is_fail)
766     return app_worker_connect_notify (app_wrk, s, opaque);
767
768   s = session_alloc_for_connection (tc);
769   s->session_state = SESSION_STATE_CONNECTING;
770   s->app_wrk_index = app_wrk->wrk_index;
771   new_si = s->session_index;
772   new_ti = s->thread_index;
773
774   if (app_worker_init_connected (app_wrk, s))
775     {
776       session_free (s);
777       app_worker_connect_notify (app_wrk, 0, opaque);
778       return -1;
779     }
780
781   s = session_get (new_si, new_ti);
782   s->session_state = opened_state;
783   session_lookup_add_connection (tc, session_handle (s));
784
785   if (app_worker_connect_notify (app_wrk, s, opaque))
786     {
787       s = session_get (new_si, new_ti);
788       session_free_w_fifos (s);
789       return -1;
790     }
791
792   return 0;
793 }
794
795 int
796 session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
797 {
798   return session_stream_connect_notify_inline (tc, is_fail,
799                                                SESSION_STATE_READY);
800 }
801
802 int
803 session_ho_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
804 {
805   return session_stream_connect_notify_inline (tc, is_fail,
806                                                SESSION_STATE_OPENED);
807 }
808
809 typedef struct _session_switch_pool_args
810 {
811   u32 session_index;
812   u32 thread_index;
813   u32 new_thread_index;
814   u32 new_session_index;
815 } session_switch_pool_args_t;
816
817 /**
818  * Notify old thread of the session pool switch
819  */
820 static void
821 session_switch_pool (void *cb_args)
822 {
823   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
824   app_worker_t *app_wrk;
825   session_t *s;
826
827   ASSERT (args->thread_index == vlib_get_thread_index ());
828   s = session_get (args->session_index, args->thread_index);
829   s->tx_fifo->master_session_index = args->new_session_index;
830   s->tx_fifo->master_thread_index = args->new_thread_index;
831   transport_cleanup (session_get_transport_proto (s), s->connection_index,
832                      s->thread_index);
833
834   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
835   if (app_wrk)
836     {
837       session_handle_t new_sh;
838       new_sh = session_make_handle (args->new_session_index,
839                                     args->new_thread_index);
840       app_worker_migrate_notify (app_wrk, s, new_sh);
841
842       /* Trigger app read on the new thread */
843       session_enqueue_notify_thread (new_sh);
844     }
845
846   session_free (s);
847   clib_mem_free (cb_args);
848 }
849
850 /**
851  * Move dgram session to the right thread
852  */
853 int
854 session_dgram_connect_notify (transport_connection_t * tc,
855                               u32 old_thread_index, session_t ** new_session)
856 {
857   session_t *new_s;
858   session_switch_pool_args_t *rpc_args;
859
860   /*
861    * Clone half-open session to the right thread.
862    */
863   new_s = session_clone_safe (tc->s_index, old_thread_index);
864   new_s->connection_index = tc->c_index;
865   new_s->rx_fifo->master_session_index = new_s->session_index;
866   new_s->rx_fifo->master_thread_index = new_s->thread_index;
867   new_s->session_state = SESSION_STATE_READY;
868   new_s->flags |= SESSION_F_IS_MIGRATING;
869   session_lookup_add_connection (tc, session_handle (new_s));
870
871   /*
872    * Ask thread owning the old session to clean it up and make us the tx
873    * fifo owner
874    */
875   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
876   rpc_args->new_session_index = new_s->session_index;
877   rpc_args->new_thread_index = new_s->thread_index;
878   rpc_args->session_index = tc->s_index;
879   rpc_args->thread_index = old_thread_index;
880   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
881                                   rpc_args);
882
883   tc->s_index = new_s->session_index;
884   new_s->connection_index = tc->c_index;
885   *new_session = new_s;
886   return 0;
887 }
888
889 /**
890  * Notification from transport that connection is being closed.
891  *
892  * A disconnect is sent to application but state is not removed. Once
893  * disconnect is acknowledged by application, session disconnect is called.
894  * Ultimately this leads to close being called on transport (passive close).
895  */
896 void
897 session_transport_closing_notify (transport_connection_t * tc)
898 {
899   app_worker_t *app_wrk;
900   session_t *s;
901
902   s = session_get (tc->s_index, tc->thread_index);
903   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
904     return;
905   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
906   app_wrk = app_worker_get (s->app_wrk_index);
907   app_worker_close_notify (app_wrk, s);
908 }
909
910 /**
911  * Notification from transport that connection is being deleted
912  *
913  * This removes the session if it is still valid. It should be called only on
914  * previously fully established sessions. For instance failed connects should
915  * call stream_session_connect_notify and indicate that the connect has
916  * failed.
917  */
918 void
919 session_transport_delete_notify (transport_connection_t * tc)
920 {
921   session_t *s;
922
923   /* App might've been removed already */
924   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
925     return;
926
927   switch (s->session_state)
928     {
929     case SESSION_STATE_CREATED:
930       /* Session was created but accept notification was not yet sent to the
931        * app. Cleanup everything. */
932       session_lookup_del_session (s);
933       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
934       session_free (s);
935       break;
936     case SESSION_STATE_ACCEPTING:
937     case SESSION_STATE_TRANSPORT_CLOSING:
938     case SESSION_STATE_CLOSING:
939     case SESSION_STATE_TRANSPORT_CLOSED:
940       /* If transport finishes or times out before we get a reply
941        * from the app, mark transport as closed and wait for reply
942        * before removing the session. Cleanup session table in advance
943        * because transport will soon be closed and closed sessions
944        * are assumed to have been removed from the lookup table */
945       session_lookup_del_session (s);
946       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
947       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
948       svm_fifo_dequeue_drop_all (s->tx_fifo);
949       break;
950     case SESSION_STATE_APP_CLOSED:
951       /* Cleanup lookup table as transport needs to still be valid.
952        * Program transport close to ensure that all session events
953        * have been cleaned up. Once transport close is called, the
954        * session is just removed because both transport and app have
955        * confirmed the close*/
956       session_lookup_del_session (s);
957       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
958       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
959       svm_fifo_dequeue_drop_all (s->tx_fifo);
960       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
961       break;
962     case SESSION_STATE_TRANSPORT_DELETED:
963       break;
964     case SESSION_STATE_CLOSED:
965       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
966       session_delete (s);
967       break;
968     default:
969       clib_warning ("session state %u", s->session_state);
970       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
971       session_delete (s);
972       break;
973     }
974 }
975
976 /**
977  * Notification from transport that it is closed
978  *
979  * Should be called by transport, prior to calling delete notify, once it
980  * knows that no more data will be exchanged. This could serve as an
981  * early acknowledgment of an active close especially if transport delete
982  * can be delayed a long time, e.g., tcp time-wait.
983  */
984 void
985 session_transport_closed_notify (transport_connection_t * tc)
986 {
987   app_worker_t *app_wrk;
988   session_t *s;
989
990   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
991     return;
992
993   /* Transport thinks that app requested close but it actually didn't.
994    * Can happen for tcp if fin and rst are received in close succession. */
995   if (s->session_state == SESSION_STATE_READY)
996     {
997       session_transport_closing_notify (tc);
998       svm_fifo_dequeue_drop_all (s->tx_fifo);
999       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1000     }
1001   /* If app close has not been received or has not yet resulted in
1002    * a transport close, only mark the session transport as closed */
1003   else if (s->session_state <= SESSION_STATE_CLOSING)
1004     {
1005       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1006     }
1007   /* If app also closed, switch to closed */
1008   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1009     s->session_state = SESSION_STATE_CLOSED;
1010
1011   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1012   if (app_wrk)
1013     app_worker_transport_closed_notify (app_wrk, s);
1014 }
1015
1016 /**
1017  * Notify application that connection has been reset.
1018  */
1019 void
1020 session_transport_reset_notify (transport_connection_t * tc)
1021 {
1022   app_worker_t *app_wrk;
1023   session_t *s;
1024
1025   s = session_get (tc->s_index, tc->thread_index);
1026   svm_fifo_dequeue_drop_all (s->tx_fifo);
1027   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1028     return;
1029   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1030   app_wrk = app_worker_get (s->app_wrk_index);
1031   app_worker_reset_notify (app_wrk, s);
1032 }
1033
1034 int
1035 session_stream_accept_notify (transport_connection_t * tc)
1036 {
1037   app_worker_t *app_wrk;
1038   session_t *s;
1039
1040   s = session_get (tc->s_index, tc->thread_index);
1041   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1042   if (!app_wrk)
1043     return -1;
1044   s->session_state = SESSION_STATE_ACCEPTING;
1045   if (app_worker_accept_notify (app_wrk, s))
1046     {
1047       /* On transport delete, no notifications should be sent. Unless, the
1048        * accept is retried and successful. */
1049       s->session_state = SESSION_STATE_CREATED;
1050       return -1;
1051     }
1052   return 0;
1053 }
1054
1055 /**
1056  * Accept a stream session. Optionally ping the server by callback.
1057  */
1058 int
1059 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1060                        u32 thread_index, u8 notify)
1061 {
1062   session_t *s;
1063   int rv;
1064
1065   s = session_alloc_for_connection (tc);
1066   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1067   s->session_state = SESSION_STATE_CREATED;
1068
1069   if ((rv = app_worker_init_accepted (s)))
1070     return rv;
1071
1072   session_lookup_add_connection (tc, session_handle (s));
1073
1074   /* Shoulder-tap the server */
1075   if (notify)
1076     {
1077       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1078       return app_worker_accept_notify (app_wrk, s);
1079     }
1080
1081   return 0;
1082 }
1083
1084 int
1085 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1086 {
1087   transport_connection_t *tc;
1088   transport_endpoint_cfg_t *tep;
1089   app_worker_t *app_wrk;
1090   session_handle_t sh;
1091   session_t *s;
1092   int rv;
1093
1094   tep = session_endpoint_to_transport_cfg (rmt);
1095   rv = transport_connect (rmt->transport_proto, tep);
1096   if (rv < 0)
1097     {
1098       SESSION_DBG ("Transport failed to open connection.");
1099       return VNET_API_ERROR_SESSION_CONNECT;
1100     }
1101
1102   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1103
1104   /* For dgram type of service, allocate session and fifos now */
1105   app_wrk = app_worker_get (app_wrk_index);
1106   s = session_alloc_for_connection (tc);
1107   s->app_wrk_index = app_wrk->wrk_index;
1108   s->session_state = SESSION_STATE_OPENED;
1109   if (app_worker_init_connected (app_wrk, s))
1110     {
1111       session_free (s);
1112       return -1;
1113     }
1114
1115   sh = session_handle (s);
1116   session_lookup_add_connection (tc, sh);
1117   return app_worker_connect_notify (app_wrk, s, opaque);
1118 }
1119
1120 int
1121 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1122 {
1123   transport_connection_t *tc;
1124   transport_endpoint_cfg_t *tep;
1125   u64 handle;
1126   int rv;
1127
1128   tep = session_endpoint_to_transport_cfg (rmt);
1129   rv = transport_connect (rmt->transport_proto, tep);
1130   if (rv < 0)
1131     {
1132       SESSION_DBG ("Transport failed to open connection.");
1133       return VNET_API_ERROR_SESSION_CONNECT;
1134     }
1135
1136   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1137
1138   /* If transport offers a stream service, only allocate session once the
1139    * connection has been established.
1140    * Add connection to half-open table and save app and tc index. The
1141    * latter is needed to help establish the connection while the former
1142    * is needed when the connect notify comes and we have to notify the
1143    * external app
1144    */
1145   handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
1146   session_lookup_add_half_open (tc, handle);
1147
1148   /* Store api_context (opaque) for when the reply comes. Not the nicest
1149    * thing but better than allocating a separate half-open pool.
1150    */
1151   tc->s_index = opaque;
1152   if (transport_half_open_has_fifos (rmt->transport_proto))
1153     return session_ho_stream_connect_notify (tc, 0 /* is_fail */ );
1154   return 0;
1155 }
1156
1157 int
1158 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1159 {
1160   session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
1161   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
1162
1163   sep->app_wrk_index = app_wrk_index;
1164   sep->opaque = opaque;
1165
1166   return transport_connect (rmt->transport_proto, tep_cfg);
1167 }
1168
1169 typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
1170
1171 /* *INDENT-OFF* */
1172 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1173   session_open_vc,
1174   session_open_cl,
1175   session_open_app,
1176 };
1177 /* *INDENT-ON* */
1178
1179 /**
1180  * Ask transport to open connection to remote transport endpoint.
1181  *
1182  * Stores handle for matching request with reply since the call can be
1183  * asynchronous. For instance, for TCP the 3-way handshake must complete
1184  * before reply comes. Session is only created once connection is established.
1185  *
1186  * @param app_index Index of the application requesting the connect
1187  * @param st Session type requested.
1188  * @param tep Remote transport endpoint
1189  * @param opaque Opaque data (typically, api_context) the application expects
1190  *               on open completion.
1191  */
1192 int
1193 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1194 {
1195   transport_service_type_t tst;
1196   tst = transport_protocol_service_type (rmt->transport_proto);
1197   return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1198 }
1199
1200 /**
1201  * Ask transport to listen on session endpoint.
1202  *
1203  * @param s Session for which listen will be called. Note that unlike
1204  *          established sessions, listen sessions are not associated to a
1205  *          thread.
1206  * @param sep Local endpoint to be listened on.
1207  */
1208 int
1209 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1210 {
1211   transport_endpoint_t *tep;
1212   u32 tc_index, s_index;
1213
1214   /* Transport bind/listen */
1215   tep = session_endpoint_to_transport (sep);
1216   s_index = ls->session_index;
1217   tc_index = transport_start_listen (session_get_transport_proto (ls),
1218                                      s_index, tep);
1219
1220   if (tc_index == (u32) ~ 0)
1221     return -1;
1222
1223   /* Attach transport to session. Lookup tables are populated by the app
1224    * worker because local tables (for ct sessions) are not backed by a fib */
1225   ls = listen_session_get (s_index);
1226   ls->connection_index = tc_index;
1227
1228   return 0;
1229 }
1230
1231 /**
1232  * Ask transport to stop listening on local transport endpoint.
1233  *
1234  * @param s Session to stop listening on. It must be in state LISTENING.
1235  */
1236 int
1237 session_stop_listen (session_t * s)
1238 {
1239   transport_proto_t tp = session_get_transport_proto (s);
1240   transport_connection_t *tc;
1241
1242   if (s->session_state != SESSION_STATE_LISTENING)
1243     return -1;
1244
1245   tc = transport_get_listener (tp, s->connection_index);
1246   if (!tc)
1247     return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1248
1249   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1250     session_lookup_del_connection (tc);
1251   transport_stop_listen (tp, s->connection_index);
1252   return 0;
1253 }
1254
1255 /**
1256  * Initialize session closing procedure.
1257  *
1258  * Request is always sent to session node to ensure that all outstanding
1259  * requests are served before transport is notified.
1260  */
1261 void
1262 session_close (session_t * s)
1263 {
1264   if (!s)
1265     return;
1266
1267   if (s->session_state >= SESSION_STATE_CLOSING)
1268     {
1269       /* Session will only be removed once both app and transport
1270        * acknowledge the close */
1271       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1272           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1273         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1274       return;
1275     }
1276
1277   s->session_state = SESSION_STATE_CLOSING;
1278   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1279 }
1280
1281 /**
1282  * Force a close without waiting for data to be flushed
1283  */
1284 void
1285 session_reset (session_t * s)
1286 {
1287   if (s->session_state >= SESSION_STATE_CLOSING)
1288     return;
1289   /* Drop all outstanding tx data */
1290   svm_fifo_dequeue_drop_all (s->tx_fifo);
1291   s->session_state = SESSION_STATE_CLOSING;
1292   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1293 }
1294
1295 /**
1296  * Notify transport the session can be disconnected. This should eventually
1297  * result in a delete notification that allows us to cleanup session state.
1298  * Called for both active/passive disconnects.
1299  *
1300  * Must be called from the session's thread.
1301  */
1302 void
1303 session_transport_close (session_t * s)
1304 {
1305   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1306     {
1307       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1308         s->session_state = SESSION_STATE_CLOSED;
1309       /* If transport is already deleted, just free the session */
1310       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1311         session_free_w_fifos (s);
1312       return;
1313     }
1314
1315   /* If the tx queue wasn't drained, the transport can continue to try
1316    * sending the outstanding data (in closed state it cannot). It MUST however
1317    * at one point, either after sending everything or after a timeout, call
1318    * delete notify. This will finally lead to the complete cleanup of the
1319    * session.
1320    */
1321   s->session_state = SESSION_STATE_APP_CLOSED;
1322
1323   transport_close (session_get_transport_proto (s), s->connection_index,
1324                    s->thread_index);
1325 }
1326
1327 /**
1328  * Force transport close
1329  */
1330 void
1331 session_transport_reset (session_t * s)
1332 {
1333   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1334     {
1335       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1336         s->session_state = SESSION_STATE_CLOSED;
1337       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1338         session_free_w_fifos (s);
1339       return;
1340     }
1341
1342   s->session_state = SESSION_STATE_APP_CLOSED;
1343   transport_reset (session_get_transport_proto (s), s->connection_index,
1344                    s->thread_index);
1345 }
1346
1347 /**
1348  * Cleanup transport and session state.
1349  *
1350  * Notify transport of the cleanup and free the session. This should
1351  * be called only if transport reported some error and is already
1352  * closed.
1353  */
1354 void
1355 session_transport_cleanup (session_t * s)
1356 {
1357   /* Delete from main lookup table before we axe the the transport */
1358   session_lookup_del_session (s);
1359   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1360     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1361                        s->thread_index);
1362   /* Since we called cleanup, no delete notification will come. So, make
1363    * sure the session is properly freed. */
1364   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1365   session_free (s);
1366 }
1367
1368 /**
1369  * Allocate event queues in the shared-memory segment
1370  *
1371  * That can either be a newly created memfd segment, that will need to be
1372  * mapped by all stack users, or the binary api's svm region. The latter is
1373  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1374  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1375  * vpp uses api svm region for event queues.
1376  */
1377 void
1378 session_vpp_event_queues_allocate (session_main_t * smm)
1379 {
1380   u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1381   ssvm_private_t *eqs = &smm->evt_qs_segment;
1382   uword eqs_size = 64 << 20;
1383   pid_t vpp_pid = getpid ();
1384   void *oldheap;
1385   int i;
1386
1387   if (smm->configured_event_queue_length)
1388     evt_q_length = smm->configured_event_queue_length;
1389
1390   if (smm->evt_qs_use_memfd_seg)
1391     {
1392       if (smm->evt_qs_segment_size)
1393         eqs_size = smm->evt_qs_segment_size;
1394
1395       eqs->ssvm_size = eqs_size;
1396       eqs->i_am_master = 1;
1397       eqs->my_pid = vpp_pid;
1398       eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1399       eqs->requested_va = smm->session_baseva;
1400
1401       if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
1402         {
1403           clib_warning ("failed to initialize queue segment");
1404           return;
1405         }
1406     }
1407
1408   if (smm->evt_qs_use_memfd_seg)
1409     oldheap = ssvm_push_heap (eqs->sh);
1410   else
1411     oldheap = vl_msg_push_heap ();
1412
1413   for (i = 0; i < vec_len (smm->wrk); i++)
1414     {
1415       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1416       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1417         {evt_q_length, evt_size, 0}
1418         ,
1419         {evt_q_length >> 1, 256, 0}
1420       };
1421       cfg->consumer_pid = 0;
1422       cfg->n_rings = 2;
1423       cfg->q_nitems = evt_q_length;
1424       cfg->ring_cfgs = rc;
1425       smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1426       if (smm->evt_qs_use_memfd_seg)
1427         {
1428           if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
1429             clib_warning ("eventfd returned");
1430         }
1431     }
1432
1433   if (smm->evt_qs_use_memfd_seg)
1434     ssvm_pop_heap (oldheap);
1435   else
1436     vl_msg_pop_heap (oldheap);
1437 }
1438
1439 ssvm_private_t *
1440 session_main_get_evt_q_segment (void)
1441 {
1442   session_main_t *smm = &session_main;
1443   if (smm->evt_qs_use_memfd_seg)
1444     return &smm->evt_qs_segment;
1445   return 0;
1446 }
1447
1448 u64
1449 session_segment_handle (session_t * s)
1450 {
1451   svm_fifo_t *f;
1452
1453   if (!s->rx_fifo)
1454     return SESSION_INVALID_HANDLE;
1455
1456   f = s->rx_fifo;
1457   return segment_manager_make_segment_handle (f->segment_manager,
1458                                               f->segment_index);
1459 }
1460
1461 /* *INDENT-OFF* */
1462 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1463     session_tx_fifo_peek_and_snd,
1464     session_tx_fifo_dequeue_and_snd,
1465     session_tx_fifo_dequeue_internal,
1466     session_tx_fifo_dequeue_and_snd
1467 };
1468 /* *INDENT-ON* */
1469
1470 /**
1471  * Initialize session layer for given transport proto and ip version
1472  *
1473  * Allocates per session type (transport proto + ip version) data structures
1474  * and adds arc from session queue node to session type output node.
1475  */
1476 void
1477 session_register_transport (transport_proto_t transport_proto,
1478                             const transport_proto_vft_t * vft, u8 is_ip4,
1479                             u32 output_node)
1480 {
1481   session_main_t *smm = &session_main;
1482   session_type_t session_type;
1483   u32 next_index = ~0;
1484
1485   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1486
1487   vec_validate (smm->session_type_to_next, session_type);
1488   vec_validate (smm->session_tx_fns, session_type);
1489
1490   /* *INDENT-OFF* */
1491   if (output_node != ~0)
1492     {
1493       foreach_vlib_main (({
1494           next_index = vlib_node_add_next (this_vlib_main,
1495                                            session_queue_node.index,
1496                                            output_node);
1497       }));
1498     }
1499   /* *INDENT-ON* */
1500
1501   smm->session_type_to_next[session_type] = next_index;
1502   smm->session_tx_fns[session_type] =
1503     session_tx_fns[vft->transport_options.tx_type];
1504 }
1505
1506 transport_connection_t *
1507 session_get_transport (session_t * s)
1508 {
1509   if (s->session_state != SESSION_STATE_LISTENING)
1510     return transport_get_connection (session_get_transport_proto (s),
1511                                      s->connection_index, s->thread_index);
1512   else
1513     return transport_get_listener (session_get_transport_proto (s),
1514                                    s->connection_index);
1515 }
1516
1517 void
1518 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1519 {
1520   if (s->session_state != SESSION_STATE_LISTENING)
1521     return transport_get_endpoint (session_get_transport_proto (s),
1522                                    s->connection_index, s->thread_index, tep,
1523                                    is_lcl);
1524   else
1525     return transport_get_listener_endpoint (session_get_transport_proto (s),
1526                                             s->connection_index, tep, is_lcl);
1527 }
1528
1529 transport_connection_t *
1530 listen_session_get_transport (session_t * s)
1531 {
1532   return transport_get_listener (session_get_transport_proto (s),
1533                                  s->connection_index);
1534 }
1535
1536 void
1537 session_flush_frames_main_thread (vlib_main_t * vm)
1538 {
1539   ASSERT (vlib_get_thread_index () == 0);
1540   vlib_process_signal_event_mt (vm, session_queue_process_node.index,
1541                                 SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
1542 }
1543
1544 static clib_error_t *
1545 session_manager_main_enable (vlib_main_t * vm)
1546 {
1547   segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1548   session_main_t *smm = &session_main;
1549   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1550   u32 num_threads, preallocated_sessions_per_worker;
1551   session_worker_t *wrk;
1552   int i;
1553
1554   num_threads = 1 /* main thread */  + vtm->n_threads;
1555
1556   if (num_threads < 1)
1557     return clib_error_return (0, "n_thread_stacks not set");
1558
1559   /* Allocate cache line aligned worker contexts */
1560   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1561
1562   for (i = 0; i < num_threads; i++)
1563     {
1564       wrk = &smm->wrk[i];
1565       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1566       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1567       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1568       wrk->vm = vlib_mains[i];
1569       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
1570       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1571
1572       if (num_threads > 1)
1573         clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1574     }
1575
1576   /* Allocate vpp event queues segment and queue */
1577   session_vpp_event_queues_allocate (smm);
1578
1579   /* Initialize fifo segment main baseva and timeout */
1580   sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1581   sm_args->size = smm->session_va_space_size;
1582   segment_manager_main_init (sm_args);
1583
1584   /* Preallocate sessions */
1585   if (smm->preallocated_sessions)
1586     {
1587       if (num_threads == 1)
1588         {
1589           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1590         }
1591       else
1592         {
1593           int j;
1594           preallocated_sessions_per_worker =
1595             (1.1 * (f64) smm->preallocated_sessions /
1596              (f64) (num_threads - 1));
1597
1598           for (j = 1; j < num_threads; j++)
1599             {
1600               pool_init_fixed (smm->wrk[j].sessions,
1601                                preallocated_sessions_per_worker);
1602             }
1603         }
1604     }
1605
1606   session_lookup_init ();
1607   app_namespaces_init ();
1608   transport_init ();
1609
1610   smm->is_enabled = 1;
1611
1612   /* Enable transports */
1613   transport_enable_disable (vm, 1);
1614   return 0;
1615 }
1616
1617 void
1618 session_node_enable_disable (u8 is_en)
1619 {
1620   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1621   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1622   u8 have_workers = vtm->n_threads != 0;
1623
1624   /* *INDENT-OFF* */
1625   foreach_vlib_main (({
1626     if (have_workers && ii == 0)
1627       {
1628         vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1629                              state);
1630         if (is_en)
1631           {
1632             vlib_node_t *n = vlib_get_node (this_vlib_main,
1633                                             session_queue_process_node.index);
1634             vlib_start_process (this_vlib_main, n->runtime_index);
1635           }
1636         else
1637           {
1638             vlib_process_signal_event_mt (this_vlib_main,
1639                                           session_queue_process_node.index,
1640                                           SESSION_Q_PROCESS_STOP, 0);
1641           }
1642
1643         continue;
1644       }
1645     vlib_node_set_state (this_vlib_main, session_queue_node.index,
1646                          state);
1647   }));
1648   /* *INDENT-ON* */
1649 }
1650
1651 clib_error_t *
1652 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1653 {
1654   clib_error_t *error = 0;
1655   if (is_en)
1656     {
1657       if (session_main.is_enabled)
1658         return 0;
1659
1660       error = session_manager_main_enable (vm);
1661       session_node_enable_disable (is_en);
1662     }
1663   else
1664     {
1665       session_main.is_enabled = 0;
1666       session_node_enable_disable (is_en);
1667     }
1668
1669   return error;
1670 }
1671
1672 clib_error_t *
1673 session_manager_main_init (vlib_main_t * vm)
1674 {
1675   session_main_t *smm = &session_main;
1676   smm->session_baseva = HIGH_SEGMENT_BASEVA;
1677 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1678   smm->session_va_space_size = 128ULL << 30;
1679   smm->evt_qs_segment_size = 64 << 20;
1680 #else
1681   smm->session_va_space_size = 128 << 20;
1682   smm->evt_qs_segment_size = 1 << 20;
1683 #endif
1684   smm->is_enabled = 0;
1685   smm->session_enable_asap = 0;
1686   return 0;
1687 }
1688
1689 static clib_error_t *
1690 session_main_init (vlib_main_t * vm)
1691 {
1692   session_main_t *smm = &session_main;
1693   if (smm->session_enable_asap)
1694     {
1695       vlib_worker_thread_barrier_sync (vm);
1696       vnet_session_enable_disable (vm, 1 /* is_en */ );
1697       vlib_worker_thread_barrier_release (vm);
1698     }
1699   return 0;
1700 }
1701
1702 VLIB_INIT_FUNCTION (session_manager_main_init);
1703 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_init);
1704
1705 static clib_error_t *
1706 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1707 {
1708   session_main_t *smm = &session_main;
1709   u32 nitems;
1710   uword tmp;
1711
1712   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1713     {
1714       if (unformat (input, "event-queue-length %d", &nitems))
1715         {
1716           if (nitems >= 2048)
1717             smm->configured_event_queue_length = nitems;
1718           else
1719             clib_warning ("event queue length %d too small, ignored", nitems);
1720         }
1721       else if (unformat (input, "preallocated-sessions %d",
1722                          &smm->preallocated_sessions))
1723         ;
1724       else if (unformat (input, "v4-session-table-buckets %d",
1725                          &smm->configured_v4_session_table_buckets))
1726         ;
1727       else if (unformat (input, "v4-halfopen-table-buckets %d",
1728                          &smm->configured_v4_halfopen_table_buckets))
1729         ;
1730       else if (unformat (input, "v6-session-table-buckets %d",
1731                          &smm->configured_v6_session_table_buckets))
1732         ;
1733       else if (unformat (input, "v6-halfopen-table-buckets %d",
1734                          &smm->configured_v6_halfopen_table_buckets))
1735         ;
1736       else if (unformat (input, "v4-session-table-memory %U",
1737                          unformat_memory_size, &tmp))
1738         {
1739           if (tmp >= 0x100000000)
1740             return clib_error_return (0, "memory size %llx (%lld) too large",
1741                                       tmp, tmp);
1742           smm->configured_v4_session_table_memory = tmp;
1743         }
1744       else if (unformat (input, "v4-halfopen-table-memory %U",
1745                          unformat_memory_size, &tmp))
1746         {
1747           if (tmp >= 0x100000000)
1748             return clib_error_return (0, "memory size %llx (%lld) too large",
1749                                       tmp, tmp);
1750           smm->configured_v4_halfopen_table_memory = tmp;
1751         }
1752       else if (unformat (input, "v6-session-table-memory %U",
1753                          unformat_memory_size, &tmp))
1754         {
1755           if (tmp >= 0x100000000)
1756             return clib_error_return (0, "memory size %llx (%lld) too large",
1757                                       tmp, tmp);
1758           smm->configured_v6_session_table_memory = tmp;
1759         }
1760       else if (unformat (input, "v6-halfopen-table-memory %U",
1761                          unformat_memory_size, &tmp))
1762         {
1763           if (tmp >= 0x100000000)
1764             return clib_error_return (0, "memory size %llx (%lld) too large",
1765                                       tmp, tmp);
1766           smm->configured_v6_halfopen_table_memory = tmp;
1767         }
1768       else if (unformat (input, "local-endpoints-table-memory %U",
1769                          unformat_memory_size, &tmp))
1770         {
1771           if (tmp >= 0x100000000)
1772             return clib_error_return (0, "memory size %llx (%lld) too large",
1773                                       tmp, tmp);
1774           smm->local_endpoints_table_memory = tmp;
1775         }
1776       else if (unformat (input, "local-endpoints-table-buckets %d",
1777                          &smm->local_endpoints_table_buckets))
1778         ;
1779       else if (unformat (input, "evt_qs_memfd_seg"))
1780         smm->evt_qs_use_memfd_seg = 1;
1781       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1782                          &smm->evt_qs_segment_size))
1783         ;
1784       else if (unformat (input, "enable"))
1785         smm->session_enable_asap = 1;
1786       else
1787         return clib_error_return (0, "unknown input `%U'",
1788                                   format_unformat_error, input);
1789     }
1790   return 0;
1791 }
1792
1793 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1794
1795 /*
1796  * fd.io coding-style-patch-verification: ON
1797  *
1798  * Local Variables:
1799  * eval: (c-set-style "gnu")
1800  * End:
1801  */