520d329f742fcd79fa6ed3cca5a2274eafd5c5dd
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vlibmemory/api.h>
24 #include <vnet/dpo/load_balance.h>
25 #include <vnet/fib/ip4_fib.h>
26
27 session_manager_main_t session_manager_main;
28 extern transport_proto_vft_t *tp_vfts;
29
30 static inline int
31 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
32                             session_evt_type_t evt_type)
33 {
34   session_event_t *evt;
35   svm_msg_q_msg_t msg;
36   svm_msg_q_t *mq;
37   u32 tries = 0, max_tries;
38
39   mq = session_manager_get_vpp_event_queue (thread_index);
40   while (svm_msg_q_try_lock (mq))
41     {
42       max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
43       if (tries++ == max_tries)
44         {
45           SESSION_DBG ("failed to enqueue evt");
46           return -1;
47         }
48     }
49   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
50     {
51       svm_msg_q_unlock (mq);
52       return -2;
53     }
54   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
55   if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
56     {
57       svm_msg_q_unlock (mq);
58       return -2;
59     }
60   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
61   evt->event_type = evt_type;
62   switch (evt_type)
63     {
64     case FIFO_EVENT_RPC:
65       evt->rpc_args.fp = data;
66       evt->rpc_args.arg = args;
67       break;
68     case FIFO_EVENT_APP_TX:
69     case SESSION_IO_EVT_TX_FLUSH:
70     case FIFO_EVENT_BUILTIN_RX:
71       evt->fifo = data;
72       break;
73     case FIFO_EVENT_BUILTIN_TX:
74     case FIFO_EVENT_DISCONNECT:
75       evt->session_handle = session_handle ((session_t *) data);
76       break;
77     default:
78       clib_warning ("evt unhandled!");
79       svm_msg_q_unlock (mq);
80       return -1;
81     }
82
83   svm_msg_q_add_and_unlock (mq, &msg);
84   return 0;
85 }
86
87 int
88 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
89 {
90   return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type);
91 }
92
93 int
94 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
95                                       session_evt_type_t evt_type)
96 {
97   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
98 }
99
100 int
101 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
102 {
103   /* only event supported for now is disconnect */
104   ASSERT (evt_type == FIFO_EVENT_DISCONNECT);
105   return session_send_evt_to_thread (s, 0, s->thread_index,
106                                      FIFO_EVENT_DISCONNECT);
107 }
108
109 void
110 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
111 {
112   if (thread_index != vlib_get_thread_index ())
113     session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC);
114   else
115     {
116       void (*fnp) (void *) = fp;
117       fnp (rpc_args);
118     }
119 }
120
121 static void
122 session_program_transport_close (session_t * s)
123 {
124   u32 thread_index = vlib_get_thread_index ();
125   session_manager_worker_t *wrk;
126   session_event_t *evt;
127
128   /* If we are in the handler thread, or being called with the worker barrier
129    * held, just append a new event to pending disconnects vector. */
130   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
131     {
132       wrk = session_manager_get_worker (s->thread_index);
133       vec_add2 (wrk->pending_disconnects, evt, 1);
134       clib_memset (evt, 0, sizeof (*evt));
135       evt->session_handle = session_handle (s);
136       evt->event_type = FIFO_EVENT_DISCONNECT;
137     }
138   else
139     session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT);
140 }
141
142 session_t *
143 session_alloc (u32 thread_index)
144 {
145   session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index];
146   session_t *s;
147   u8 will_expand = 0;
148   pool_get_aligned_will_expand (wrk->sessions, will_expand,
149                                 CLIB_CACHE_LINE_BYTES);
150   /* If we have peekers, let them finish */
151   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
152     {
153       clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
154       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
155       clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
156     }
157   else
158     {
159       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
160     }
161   clib_memset (s, 0, sizeof (*s));
162   s->session_index = s - wrk->sessions;
163   s->thread_index = thread_index;
164   return s;
165 }
166
167 void
168 session_free (session_t * s)
169 {
170   pool_put (session_manager_main.wrk[s->thread_index].sessions, s);
171   if (CLIB_DEBUG)
172     clib_memset (s, 0xFA, sizeof (*s));
173 }
174
175 void
176 session_free_w_fifos (session_t * s)
177 {
178   segment_manager_dealloc_fifos (s->svm_segment_index, s->rx_fifo,
179                                  s->tx_fifo);
180   session_free (s);
181 }
182
183 /**
184  * Cleans up session and lookup table.
185  *
186  * Transport connection must still be valid.
187  */
188 static void
189 session_delete (session_t * s)
190 {
191   int rv;
192
193   /* Delete from the main lookup table. */
194   if ((rv = session_lookup_del_session (s)))
195     clib_warning ("hash delete error, rv %d", rv);
196
197   session_free_w_fifos (s);
198 }
199
200 int
201 session_alloc_fifos (segment_manager_t * sm, session_t * s)
202 {
203   svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
204   u32 fifo_segment_index;
205   int rv;
206
207   if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
208                                                  &server_tx_fifo,
209                                                  &fifo_segment_index)))
210     return rv;
211   /* Initialize backpointers */
212   server_rx_fifo->master_session_index = s->session_index;
213   server_rx_fifo->master_thread_index = s->thread_index;
214
215   server_tx_fifo->master_session_index = s->session_index;
216   server_tx_fifo->master_thread_index = s->thread_index;
217
218   s->rx_fifo = server_rx_fifo;
219   s->tx_fifo = server_tx_fifo;
220   s->svm_segment_index = fifo_segment_index;
221   return 0;
222 }
223
224 static session_t *
225 session_alloc_for_connection (transport_connection_t * tc)
226 {
227   session_t *s;
228   u32 thread_index = tc->thread_index;
229
230   ASSERT (thread_index == vlib_get_thread_index ()
231           || transport_protocol_is_cl (tc->proto));
232
233   s = session_alloc (thread_index);
234   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
235   s->enqueue_epoch = (u64) ~ 0;
236   s->session_state = SESSION_STATE_CLOSED;
237
238   /* Attach transport to session and vice versa */
239   s->connection_index = tc->c_index;
240   tc->s_index = s->session_index;
241   return s;
242 }
243
244 static int
245 session_alloc_and_init (segment_manager_t * sm, transport_connection_t * tc,
246                         u8 alloc_fifos, session_t ** ret_s)
247 {
248   session_t *s;
249   int rv;
250
251   s = session_alloc_for_connection (tc);
252   if (alloc_fifos && (rv = session_alloc_fifos (sm, s)))
253     {
254       session_free (s);
255       *ret_s = 0;
256       return rv;
257     }
258
259   /* Add to the main lookup table */
260   session_lookup_add_connection (tc, session_handle (s));
261
262   *ret_s = s;
263   return 0;
264 }
265
266 /**
267  * Discards bytes from buffer chain
268  *
269  * It discards n_bytes_to_drop starting at first buffer after chain_b
270  */
271 always_inline void
272 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
273                                      vlib_buffer_t ** chain_b,
274                                      u32 n_bytes_to_drop)
275 {
276   vlib_buffer_t *next = *chain_b;
277   u32 to_drop = n_bytes_to_drop;
278   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
279   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
280     {
281       next = vlib_get_buffer (vm, next->next_buffer);
282       if (next->current_length > to_drop)
283         {
284           vlib_buffer_advance (next, to_drop);
285           to_drop = 0;
286         }
287       else
288         {
289           to_drop -= next->current_length;
290           next->current_length = 0;
291         }
292     }
293   *chain_b = next;
294
295   if (to_drop == 0)
296     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
297 }
298
299 /**
300  * Enqueue buffer chain tail
301  */
302 always_inline int
303 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
304                             u32 offset, u8 is_in_order)
305 {
306   vlib_buffer_t *chain_b;
307   u32 chain_bi, len, diff;
308   vlib_main_t *vm = vlib_get_main ();
309   u8 *data;
310   u32 written = 0;
311   int rv = 0;
312
313   if (is_in_order && offset)
314     {
315       diff = offset - b->current_length;
316       if (diff > b->total_length_not_including_first_buffer)
317         return 0;
318       chain_b = b;
319       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
320       chain_bi = vlib_get_buffer_index (vm, chain_b);
321     }
322   else
323     chain_bi = b->next_buffer;
324
325   do
326     {
327       chain_b = vlib_get_buffer (vm, chain_bi);
328       data = vlib_buffer_get_current (chain_b);
329       len = chain_b->current_length;
330       if (!len)
331         continue;
332       if (is_in_order)
333         {
334           rv = svm_fifo_enqueue_nowait (s->rx_fifo, len, data);
335           if (rv == len)
336             {
337               written += rv;
338             }
339           else if (rv < len)
340             {
341               return (rv > 0) ? (written + rv) : written;
342             }
343           else if (rv > len)
344             {
345               written += rv;
346
347               /* written more than what was left in chain */
348               if (written > b->total_length_not_including_first_buffer)
349                 return written;
350
351               /* drop the bytes that have already been delivered */
352               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
353             }
354         }
355       else
356         {
357           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
358           if (rv)
359             {
360               clib_warning ("failed to enqueue multi-buffer seg");
361               return -1;
362             }
363           offset += len;
364         }
365     }
366   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
367           ? chain_b->next_buffer : 0));
368
369   if (is_in_order)
370     return written;
371
372   return 0;
373 }
374
375 /*
376  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
377  * event but on request can queue notification events for later delivery by
378  * calling stream_server_flush_enqueue_events().
379  *
380  * @param tc Transport connection which is to be enqueued data
381  * @param b Buffer to be enqueued
382  * @param offset Offset at which to start enqueueing if out-of-order
383  * @param queue_event Flag to indicate if peer is to be notified or if event
384  *                    is to be queued. The former is useful when more data is
385  *                    enqueued and only one event is to be generated.
386  * @param is_in_order Flag to indicate if data is in order
387  * @return Number of bytes enqueued or a negative value if enqueueing failed.
388  */
389 int
390 session_enqueue_stream_connection (transport_connection_t * tc,
391                                    vlib_buffer_t * b, u32 offset,
392                                    u8 queue_event, u8 is_in_order)
393 {
394   session_t *s;
395   int enqueued = 0, rv, in_order_off;
396
397   s = session_get (tc->s_index, tc->thread_index);
398
399   if (is_in_order)
400     {
401       enqueued = svm_fifo_enqueue_nowait (s->rx_fifo,
402                                           b->current_length,
403                                           vlib_buffer_get_current (b));
404       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
405                          && enqueued >= 0))
406         {
407           in_order_off = enqueued > b->current_length ? enqueued : 0;
408           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
409           if (rv > 0)
410             enqueued += rv;
411         }
412     }
413   else
414     {
415       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
416                                          b->current_length,
417                                          vlib_buffer_get_current (b));
418       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
419         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
420       /* if something was enqueued, report even this as success for ooo
421        * segment handling */
422       return rv;
423     }
424
425   if (queue_event)
426     {
427       /* Queue RX event on this fifo. Eventually these will need to be flushed
428        * by calling stream_server_flush_enqueue_events () */
429       session_manager_worker_t *wrk;
430
431       wrk = session_manager_get_worker (s->thread_index);
432       if (s->enqueue_epoch != wrk->current_enqueue_epoch[tc->proto])
433         {
434           s->enqueue_epoch = wrk->current_enqueue_epoch[tc->proto];
435           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
436         }
437     }
438
439   return enqueued;
440 }
441
442 int
443 session_enqueue_dgram_connection (session_t * s,
444                                   session_dgram_hdr_t * hdr,
445                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
446 {
447   int enqueued = 0, rv, in_order_off;
448
449   ASSERT (svm_fifo_max_enqueue (s->rx_fifo)
450           >= b->current_length + sizeof (*hdr));
451
452   svm_fifo_enqueue_nowait (s->rx_fifo, sizeof (session_dgram_hdr_t),
453                            (u8 *) hdr);
454   enqueued = svm_fifo_enqueue_nowait (s->rx_fifo, b->current_length,
455                                       vlib_buffer_get_current (b));
456   if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
457     {
458       in_order_off = enqueued > b->current_length ? enqueued : 0;
459       rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
460       if (rv > 0)
461         enqueued += rv;
462     }
463   if (queue_event)
464     {
465       /* Queue RX event on this fifo. Eventually these will need to be flushed
466        * by calling stream_server_flush_enqueue_events () */
467       session_manager_worker_t *wrk;
468
469       wrk = session_manager_get_worker (s->thread_index);
470       if (s->enqueue_epoch != wrk->current_enqueue_epoch[proto])
471         {
472           s->enqueue_epoch = wrk->current_enqueue_epoch[proto];
473           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
474         }
475     }
476   return enqueued;
477 }
478
479 /** Check if we have space in rx fifo to push more bytes */
480 u8
481 stream_session_no_space (transport_connection_t * tc, u32 thread_index,
482                          u16 data_len)
483 {
484   session_t *s = session_get (tc->s_index, thread_index);
485
486   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
487     return 1;
488
489   if (data_len > svm_fifo_max_enqueue (s->rx_fifo))
490     return 1;
491
492   return 0;
493 }
494
495 u32
496 session_tx_fifo_max_dequeue (transport_connection_t * tc)
497 {
498   session_t *s = session_get (tc->s_index, tc->thread_index);
499   if (!s->tx_fifo)
500     return 0;
501   return svm_fifo_max_dequeue (s->tx_fifo);
502 }
503
504 int
505 stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
506                            u32 offset, u32 max_bytes)
507 {
508   session_t *s = session_get (tc->s_index, tc->thread_index);
509   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
510 }
511
512 u32
513 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
514 {
515   session_t *s = session_get (tc->s_index, tc->thread_index);
516   return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
517 }
518
519 static inline int
520 session_notify_subscribers (u32 app_index, session_t * s,
521                             svm_fifo_t * f, session_evt_type_t evt_type)
522 {
523   app_worker_t *app_wrk;
524   application_t *app;
525   int i;
526
527   app = application_get (app_index);
528   if (!app)
529     return -1;
530
531   for (i = 0; i < f->n_subscribers; i++)
532     {
533       app_wrk = application_get_worker (app, f->subscribers[i]);
534       if (!app_wrk)
535         continue;
536       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
537         return -1;
538     }
539
540   return 0;
541 }
542
543 /**
544  * Notify session peer that new data has been enqueued.
545  *
546  * @param s     Stream session for which the event is to be generated.
547  * @param lock  Flag to indicate if call should lock message queue.
548  *
549  * @return 0 on success or negative number if failed to send notification.
550  */
551 static inline int
552 session_enqueue_notify (session_t * s)
553 {
554   app_worker_t *app_wrk;
555
556   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
557   if (PREDICT_FALSE (!app_wrk))
558     {
559       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
560       return 0;
561     }
562
563   /* *INDENT-OFF* */
564   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
565       ed->data[0] = FIFO_EVENT_APP_RX;
566       ed->data[1] = svm_fifo_max_dequeue (s->rx_fifo);
567   }));
568   /* *INDENT-ON* */
569
570   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
571                                                      FIFO_EVENT_APP_RX)))
572     return -1;
573
574   if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo)))
575     return session_notify_subscribers (app_wrk->app_index, s,
576                                        s->rx_fifo, FIFO_EVENT_APP_RX);
577
578   return 0;
579 }
580
581 int
582 session_dequeue_notify (session_t * s)
583 {
584   app_worker_t *app_wrk;
585
586   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
587   if (PREDICT_FALSE (!app_wrk))
588     return -1;
589
590   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
591                                                      FIFO_EVENT_APP_TX)))
592     return -1;
593
594   if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
595     return session_notify_subscribers (app_wrk->app_index, s,
596                                        s->tx_fifo, FIFO_EVENT_APP_TX);
597
598   svm_fifo_clear_tx_ntf (s->tx_fifo);
599
600   return 0;
601 }
602
603 /**
604  * Flushes queue of sessions that are to be notified of new data
605  * enqueued events.
606  *
607  * @param thread_index Thread index for which the flush is to be performed.
608  * @return 0 on success or a positive number indicating the number of
609  *         failures due to API queue being full.
610  */
611 int
612 session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
613 {
614   session_manager_worker_t *wrk = session_manager_get_worker (thread_index);
615   session_t *s;
616   int i, errors = 0;
617   u32 *indices;
618
619   indices = wrk->session_to_enqueue[transport_proto];
620
621   for (i = 0; i < vec_len (indices); i++)
622     {
623       s = session_get_if_valid (indices[i], thread_index);
624       if (PREDICT_FALSE (!s))
625         {
626           errors++;
627           continue;
628         }
629       if (PREDICT_FALSE (session_enqueue_notify (s)))
630         errors++;
631     }
632
633   vec_reset_length (indices);
634   wrk->session_to_enqueue[transport_proto] = indices;
635   wrk->current_enqueue_epoch[transport_proto]++;
636
637   return errors;
638 }
639
640 int
641 session_manager_flush_all_enqueue_events (u8 transport_proto)
642 {
643   vlib_thread_main_t *vtm = vlib_get_thread_main ();
644   int i, errors = 0;
645   for (i = 0; i < 1 + vtm->n_threads; i++)
646     errors += session_manager_flush_enqueue_events (transport_proto, i);
647   return errors;
648 }
649
650 /**
651  * Init fifo tail and head pointers
652  *
653  * Useful if transport uses absolute offsets for tracking ooo segments.
654  */
655 void
656 stream_session_init_fifos_pointers (transport_connection_t * tc,
657                                     u32 rx_pointer, u32 tx_pointer)
658 {
659   session_t *s;
660   s = session_get (tc->s_index, tc->thread_index);
661   svm_fifo_init_pointers (s->rx_fifo, rx_pointer);
662   svm_fifo_init_pointers (s->tx_fifo, tx_pointer);
663 }
664
665 int
666 session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
667 {
668   u32 opaque = 0, new_ti, new_si;
669   session_t *new_s = 0;
670   segment_manager_t *sm;
671   app_worker_t *app_wrk;
672   application_t *app;
673   u8 alloc_fifos;
674   int error = 0;
675   u64 handle;
676
677   /*
678    * Find connection handle and cleanup half-open table
679    */
680   handle = session_lookup_half_open_handle (tc);
681   if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
682     {
683       SESSION_DBG ("half-open was removed!");
684       return -1;
685     }
686   session_lookup_del_half_open (tc);
687
688   /* Get the app's index from the handle we stored when opening connection
689    * and the opaque (api_context for external apps) from transport session
690    * index */
691   app_wrk = app_worker_get_if_valid (handle >> 32);
692   if (!app_wrk)
693     return -1;
694   opaque = tc->s_index;
695   app = application_get (app_wrk->app_index);
696
697   /*
698    * Allocate new session with fifos (svm segments are allocated if needed)
699    */
700   if (!is_fail)
701     {
702       sm = app_worker_get_connect_segment_manager (app_wrk);
703       alloc_fifos = !application_is_builtin_proxy (app);
704       if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s))
705         {
706           is_fail = 1;
707           error = -1;
708         }
709       else
710         {
711           new_s->session_state = SESSION_STATE_CONNECTING;
712           new_s->app_wrk_index = app_wrk->wrk_index;
713           new_si = new_s->session_index;
714           new_ti = new_s->thread_index;
715         }
716     }
717
718   /*
719    * Notify client application
720    */
721   if (app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque,
722                                               new_s, is_fail))
723     {
724       SESSION_DBG ("failed to notify app");
725       if (!is_fail)
726         {
727           new_s = session_get (new_si, new_ti);
728           session_transport_close (new_s);
729         }
730     }
731   else
732     {
733       if (!is_fail)
734         {
735           new_s = session_get (new_si, new_ti);
736           new_s->session_state = SESSION_STATE_READY;
737         }
738     }
739
740   return error;
741 }
742
743 typedef struct _session_switch_pool_args
744 {
745   u32 session_index;
746   u32 thread_index;
747   u32 new_thread_index;
748   u32 new_session_index;
749 } session_switch_pool_args_t;
750
751 static void
752 session_switch_pool (void *cb_args)
753 {
754   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
755   transport_proto_t tp;
756   session_t *s;
757   ASSERT (args->thread_index == vlib_get_thread_index ());
758   s = session_get (args->session_index, args->thread_index);
759   s->tx_fifo->master_session_index = args->new_session_index;
760   s->tx_fifo->master_thread_index = args->new_thread_index;
761   tp = session_get_transport_proto (s);
762   tp_vfts[tp].cleanup (s->connection_index, s->thread_index);
763   session_free (s);
764   clib_mem_free (cb_args);
765 }
766
767 /**
768  * Move dgram session to the right thread
769  */
770 int
771 session_dgram_connect_notify (transport_connection_t * tc,
772                               u32 old_thread_index, session_t ** new_session)
773 {
774   session_t *new_s;
775   session_switch_pool_args_t *rpc_args;
776
777   /*
778    * Clone half-open session to the right thread.
779    */
780   new_s = session_clone_safe (tc->s_index, old_thread_index);
781   new_s->connection_index = tc->c_index;
782   new_s->rx_fifo->master_session_index = new_s->session_index;
783   new_s->rx_fifo->master_thread_index = new_s->thread_index;
784   new_s->session_state = SESSION_STATE_READY;
785   session_lookup_add_connection (tc, session_handle (new_s));
786
787   /*
788    * Ask thread owning the old session to clean it up and make us the tx
789    * fifo owner
790    */
791   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
792   rpc_args->new_session_index = new_s->session_index;
793   rpc_args->new_thread_index = new_s->thread_index;
794   rpc_args->session_index = tc->s_index;
795   rpc_args->thread_index = old_thread_index;
796   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
797                                   rpc_args);
798
799   tc->s_index = new_s->session_index;
800   new_s->connection_index = tc->c_index;
801   *new_session = new_s;
802   return 0;
803 }
804
805 int
806 stream_session_accept_notify (transport_connection_t * tc)
807 {
808   app_worker_t *app_wrk;
809   application_t *app;
810   session_t *s;
811
812   s = session_get (tc->s_index, tc->thread_index);
813   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
814   if (!app_wrk)
815     return -1;
816   s->session_state = SESSION_STATE_ACCEPTING;
817   app = application_get (app_wrk->app_index);
818   return app->cb_fns.session_accept_callback (s);
819 }
820
821 /**
822  * Notification from transport that connection is being closed.
823  *
824  * A disconnect is sent to application but state is not removed. Once
825  * disconnect is acknowledged by application, session disconnect is called.
826  * Ultimately this leads to close being called on transport (passive close).
827  */
828 void
829 session_transport_closing_notify (transport_connection_t * tc)
830 {
831   app_worker_t *app_wrk;
832   application_t *app;
833   session_t *s;
834
835   s = session_get (tc->s_index, tc->thread_index);
836   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
837     return;
838   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
839   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
840   if (!app_wrk)
841     return;
842   app = application_get (app_wrk->app_index);
843   app->cb_fns.session_disconnect_callback (s);
844 }
845
846 /**
847  * Notification from transport that connection is being deleted
848  *
849  * This removes the session if it is still valid. It should be called only on
850  * previously fully established sessions. For instance failed connects should
851  * call stream_session_connect_notify and indicate that the connect has
852  * failed.
853  */
854 void
855 session_transport_delete_notify (transport_connection_t * tc)
856 {
857   session_t *s;
858
859   /* App might've been removed already */
860   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
861     return;
862
863   /* Make sure we don't try to send anything more */
864   svm_fifo_dequeue_drop_all (s->tx_fifo);
865
866   switch (s->session_state)
867     {
868     case SESSION_STATE_ACCEPTING:
869     case SESSION_STATE_TRANSPORT_CLOSING:
870       /* If transport finishes or times out before we get a reply
871        * from the app, mark transport as closed and wait for reply
872        * before removing the session. Cleanup session table in advance
873        * because transport will soon be closed and closed sessions
874        * are assumed to have been removed from the lookup table */
875       session_lookup_del_session (s);
876       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
877       break;
878     case SESSION_STATE_CLOSING:
879     case SESSION_STATE_CLOSED_WAITING:
880       /* Cleanup lookup table as transport needs to still be valid.
881        * Program transport close to ensure that all session events
882        * have been cleaned up. Once transport close is called, the
883        * session is just removed because both transport and app have
884        * confirmed the close*/
885       session_lookup_del_session (s);
886       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
887       session_program_transport_close (s);
888       break;
889     case SESSION_STATE_TRANSPORT_CLOSED:
890       break;
891     case SESSION_STATE_CLOSED:
892       session_delete (s);
893       break;
894     default:
895       clib_warning ("session state %u", s->session_state);
896       session_delete (s);
897       break;
898     }
899 }
900
901 /**
902  * Notification from transport that session can be closed
903  *
904  * Should be called by transport only if it was closed with non-empty
905  * tx fifo and once it decides to begin the closing procedure prior to
906  * issuing a delete notify. This gives the chance to the session layer
907  * to cleanup any outstanding events.
908  */
909 void
910 session_transport_closed_notify (transport_connection_t * tc)
911 {
912   session_t *s;
913
914   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
915     return;
916
917   /* If app close has not been received or has not yet resulted in
918    * a transport close, only mark the session transport as closed */
919   if (s->session_state <= SESSION_STATE_CLOSING)
920     {
921       session_lookup_del_session (s);
922       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
923     }
924   else
925     s->session_state = SESSION_STATE_CLOSED;
926 }
927
928 /**
929  * Notify application that connection has been reset.
930  */
931 void
932 session_transport_reset_notify (transport_connection_t * tc)
933 {
934   session_t *s;
935   app_worker_t *app_wrk;
936   application_t *app;
937   s = session_get (tc->s_index, tc->thread_index);
938   svm_fifo_dequeue_drop_all (s->tx_fifo);
939   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
940     return;
941   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
942   app_wrk = app_worker_get (s->app_wrk_index);
943   app = application_get (app_wrk->app_index);
944   app->cb_fns.session_reset_callback (s);
945 }
946
947 /**
948  * Accept a stream session. Optionally ping the server by callback.
949  */
950 int
951 stream_session_accept (transport_connection_t * tc, u32 listener_index,
952                        u8 notify)
953 {
954   session_t *s, *listener;
955   app_worker_t *app_wrk;
956   segment_manager_t *sm;
957   int rv;
958
959   /* Find the server */
960   listener = listen_session_get (listener_index);
961   app_wrk = application_listener_select_worker (listener, 0);
962
963   sm = app_worker_get_listen_segment_manager (app_wrk, listener);
964   if ((rv = session_alloc_and_init (sm, tc, 1, &s)))
965     return rv;
966
967   s->app_wrk_index = app_wrk->wrk_index;
968   s->listener_index = listener_index;
969
970   /* Shoulder-tap the server */
971   if (notify)
972     {
973       application_t *app = application_get (app_wrk->app_index);
974       return app->cb_fns.session_accept_callback (s);
975     }
976
977   return 0;
978 }
979
980 int
981 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
982 {
983   transport_connection_t *tc;
984   transport_endpoint_cfg_t *tep;
985   segment_manager_t *sm;
986   app_worker_t *app_wrk;
987   session_t *s;
988   application_t *app;
989   int rv;
990
991   tep = session_endpoint_to_transport_cfg (rmt);
992   rv = tp_vfts[rmt->transport_proto].open (tep);
993   if (rv < 0)
994     {
995       SESSION_DBG ("Transport failed to open connection.");
996       return VNET_API_ERROR_SESSION_CONNECT;
997     }
998
999   tc = tp_vfts[rmt->transport_proto].get_half_open ((u32) rv);
1000
1001   /* For dgram type of service, allocate session and fifos now.
1002    */
1003   app_wrk = app_worker_get (app_wrk_index);
1004   sm = app_worker_get_connect_segment_manager (app_wrk);
1005
1006   if (session_alloc_and_init (sm, tc, 1, &s))
1007     return -1;
1008   s->app_wrk_index = app_wrk->wrk_index;
1009   s->session_state = SESSION_STATE_OPENED;
1010
1011   /* Tell the app about the new event fifo for this session */
1012   app = application_get (app_wrk->app_index);
1013   app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, s, 0);
1014
1015   return 0;
1016 }
1017
1018 int
1019 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1020 {
1021   transport_connection_t *tc;
1022   transport_endpoint_cfg_t *tep;
1023   u64 handle;
1024   int rv;
1025
1026   tep = session_endpoint_to_transport_cfg (rmt);
1027   rv = tp_vfts[rmt->transport_proto].open (tep);
1028   if (rv < 0)
1029     {
1030       SESSION_DBG ("Transport failed to open connection.");
1031       return VNET_API_ERROR_SESSION_CONNECT;
1032     }
1033
1034   tc = tp_vfts[rmt->transport_proto].get_half_open ((u32) rv);
1035
1036   /* If transport offers a stream service, only allocate session once the
1037    * connection has been established.
1038    * Add connection to half-open table and save app and tc index. The
1039    * latter is needed to help establish the connection while the former
1040    * is needed when the connect notify comes and we have to notify the
1041    * external app
1042    */
1043   handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
1044   session_lookup_add_half_open (tc, handle);
1045
1046   /* Store api_context (opaque) for when the reply comes. Not the nicest
1047    * thing but better than allocating a separate half-open pool.
1048    */
1049   tc->s_index = opaque;
1050   return 0;
1051 }
1052
1053 int
1054 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1055 {
1056   session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
1057   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
1058
1059   sep->app_wrk_index = app_wrk_index;
1060   sep->opaque = opaque;
1061
1062   return tp_vfts[rmt->transport_proto].open (tep_cfg);
1063 }
1064
1065 typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
1066
1067 /* *INDENT-OFF* */
1068 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1069   session_open_vc,
1070   session_open_cl,
1071   session_open_app,
1072 };
1073 /* *INDENT-ON* */
1074
1075 /**
1076  * Ask transport to open connection to remote transport endpoint.
1077  *
1078  * Stores handle for matching request with reply since the call can be
1079  * asynchronous. For instance, for TCP the 3-way handshake must complete
1080  * before reply comes. Session is only created once connection is established.
1081  *
1082  * @param app_index Index of the application requesting the connect
1083  * @param st Session type requested.
1084  * @param tep Remote transport endpoint
1085  * @param opaque Opaque data (typically, api_context) the application expects
1086  *               on open completion.
1087  */
1088 int
1089 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1090 {
1091   transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type;
1092   return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1093 }
1094
1095 /**
1096  * Ask transport to listen on session endpoint.
1097  *
1098  * @param s Session for which listen will be called. Note that unlike
1099  *          established sessions, listen sessions are not associated to a
1100  *          thread.
1101  * @param sep Local endpoint to be listened on.
1102  */
1103 int
1104 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1105 {
1106   transport_connection_t *tc;
1107   transport_endpoint_t *tep;
1108   u32 tc_index, s_index;
1109
1110   /* Transport bind/listen */
1111   tep = session_endpoint_to_transport (sep);
1112   s_index = ls->session_index;
1113   tc_index = tp_vfts[sep->transport_proto].bind (s_index, tep);
1114
1115   if (tc_index == (u32) ~ 0)
1116     return -1;
1117
1118   /* Attach transport to session */
1119   ls = listen_session_get (s_index);
1120   ls->connection_index = tc_index;
1121
1122   /* Add to the main lookup table after transport was initialized */
1123   tc = tp_vfts[sep->transport_proto].get_listener (tc_index);
1124   session_lookup_add_connection (tc, s_index);
1125   return 0;
1126 }
1127
1128 /**
1129  * Ask transport to stop listening on local transport endpoint.
1130  *
1131  * @param s Session to stop listening on. It must be in state LISTENING.
1132  */
1133 int
1134 session_stop_listen (session_t * s)
1135 {
1136   transport_proto_t tp = session_get_transport_proto (s);
1137   transport_connection_t *tc;
1138   if (s->session_state != SESSION_STATE_LISTENING)
1139     {
1140       clib_warning ("not a listening session");
1141       return -1;
1142     }
1143
1144   tc = tp_vfts[tp].get_listener (s->connection_index);
1145   if (!tc)
1146     {
1147       clib_warning ("no transport");
1148       return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1149     }
1150
1151   session_lookup_del_connection (tc);
1152   tp_vfts[tp].unbind (s->connection_index);
1153   return 0;
1154 }
1155
1156 /**
1157  * Initialize session closing procedure.
1158  *
1159  * Request is always sent to session node to ensure that all outstanding
1160  * requests are served before transport is notified.
1161  */
1162 void
1163 session_close (session_t * s)
1164 {
1165   if (!s)
1166     return;
1167
1168   if (s->session_state >= SESSION_STATE_CLOSING)
1169     {
1170       /* Session will only be removed once both app and transport
1171        * acknowledge the close */
1172       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1173         session_program_transport_close (s);
1174
1175       /* Session already closed. Clear the tx fifo */
1176       if (s->session_state == SESSION_STATE_CLOSED)
1177         svm_fifo_dequeue_drop_all (s->tx_fifo);
1178       return;
1179     }
1180
1181   s->session_state = SESSION_STATE_CLOSING;
1182   session_program_transport_close (s);
1183 }
1184
1185 /**
1186  * Notify transport the session can be disconnected. This should eventually
1187  * result in a delete notification that allows us to cleanup session state.
1188  * Called for both active/passive disconnects.
1189  *
1190  * Must be called from the session's thread.
1191  */
1192 void
1193 session_transport_close (session_t * s)
1194 {
1195   /* If transport is already closed, just free the session */
1196   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1197     {
1198       session_free_w_fifos (s);
1199       return;
1200     }
1201
1202   /* If tx queue wasn't drained, change state to closed waiting for transport.
1203    * This way, the transport, if it so wishes, can continue to try sending the
1204    * outstanding data (in closed state it cannot). It MUST however at one
1205    * point, either after sending everything or after a timeout, call delete
1206    * notify. This will finally lead to the complete cleanup of the session.
1207    */
1208   if (svm_fifo_max_dequeue (s->tx_fifo))
1209     s->session_state = SESSION_STATE_CLOSED_WAITING;
1210   else
1211     s->session_state = SESSION_STATE_CLOSED;
1212
1213   tp_vfts[session_get_transport_proto (s)].close (s->connection_index,
1214                                                   s->thread_index);
1215 }
1216
1217 /**
1218  * Cleanup transport and session state.
1219  *
1220  * Notify transport of the cleanup and free the session. This should
1221  * be called only if transport reported some error and is already
1222  * closed.
1223  */
1224 void
1225 session_transport_cleanup (session_t * s)
1226 {
1227   s->session_state = SESSION_STATE_CLOSED;
1228
1229   /* Delete from main lookup table before we axe the the transport */
1230   session_lookup_del_session (s);
1231   tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index,
1232                                                     s->thread_index);
1233   /* Since we called cleanup, no delete notification will come. So, make
1234    * sure the session is properly freed. */
1235   session_free_w_fifos (s);
1236 }
1237
1238 transport_service_type_t
1239 session_transport_service_type (session_t * s)
1240 {
1241   transport_proto_t tp;
1242   tp = session_get_transport_proto (s);
1243   return transport_protocol_service_type (tp);
1244 }
1245
1246 transport_tx_fn_type_t
1247 session_transport_tx_fn_type (session_t * s)
1248 {
1249   transport_proto_t tp;
1250   tp = session_get_transport_proto (s);
1251   return transport_protocol_tx_fn_type (tp);
1252 }
1253
1254 u8
1255 session_tx_is_dgram (session_t * s)
1256 {
1257   return (session_transport_tx_fn_type (s) == TRANSPORT_TX_DGRAM);
1258 }
1259
1260 /**
1261  * Allocate event queues in the shared-memory segment
1262  *
1263  * That can either be a newly created memfd segment, that will need to be
1264  * mapped by all stack users, or the binary api's svm region. The latter is
1265  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1266  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1267  * vpp uses api svm region for event queues.
1268  */
1269 void
1270 session_vpp_event_queues_allocate (session_manager_main_t * smm)
1271 {
1272   u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1273   ssvm_private_t *eqs = &smm->evt_qs_segment;
1274   api_main_t *am = &api_main;
1275   u64 eqs_size = 64 << 20;
1276   pid_t vpp_pid = getpid ();
1277   void *oldheap;
1278   int i;
1279
1280   if (smm->configured_event_queue_length)
1281     evt_q_length = smm->configured_event_queue_length;
1282
1283   if (smm->evt_qs_use_memfd_seg)
1284     {
1285       if (smm->evt_qs_segment_size)
1286         eqs_size = smm->evt_qs_segment_size;
1287
1288       eqs->ssvm_size = eqs_size;
1289       eqs->i_am_master = 1;
1290       eqs->my_pid = vpp_pid;
1291       eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1292       eqs->requested_va = smm->session_baseva;
1293
1294       if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
1295         {
1296           clib_warning ("failed to initialize queue segment");
1297           return;
1298         }
1299     }
1300
1301   if (smm->evt_qs_use_memfd_seg)
1302     oldheap = ssvm_push_heap (eqs->sh);
1303   else
1304     oldheap = svm_push_data_heap (am->vlib_rp);
1305
1306   for (i = 0; i < vec_len (smm->wrk); i++)
1307     {
1308       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1309       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1310         {evt_q_length, evt_size, 0}
1311         ,
1312         {evt_q_length << 1, 256, 0}
1313       };
1314       cfg->consumer_pid = 0;
1315       cfg->n_rings = 2;
1316       cfg->q_nitems = evt_q_length;
1317       cfg->ring_cfgs = rc;
1318       smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1319       if (smm->evt_qs_use_memfd_seg)
1320         {
1321           if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
1322             clib_warning ("eventfd returned");
1323         }
1324     }
1325
1326   if (smm->evt_qs_use_memfd_seg)
1327     ssvm_pop_heap (oldheap);
1328   else
1329     svm_pop_heap (oldheap);
1330 }
1331
1332 ssvm_private_t *
1333 session_manager_get_evt_q_segment (void)
1334 {
1335   session_manager_main_t *smm = &session_manager_main;
1336   if (smm->evt_qs_use_memfd_seg)
1337     return &smm->evt_qs_segment;
1338   return 0;
1339 }
1340
1341 /* *INDENT-OFF* */
1342 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1343     session_tx_fifo_peek_and_snd,
1344     session_tx_fifo_dequeue_and_snd,
1345     session_tx_fifo_dequeue_internal,
1346     session_tx_fifo_dequeue_and_snd
1347 };
1348 /* *INDENT-ON* */
1349
1350 /**
1351  * Initialize session layer for given transport proto and ip version
1352  *
1353  * Allocates per session type (transport proto + ip version) data structures
1354  * and adds arc from session queue node to session type output node.
1355  */
1356 void
1357 session_register_transport (transport_proto_t transport_proto,
1358                             const transport_proto_vft_t * vft, u8 is_ip4,
1359                             u32 output_node)
1360 {
1361   session_manager_main_t *smm = &session_manager_main;
1362   session_type_t session_type;
1363   u32 next_index = ~0;
1364
1365   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1366
1367   vec_validate (smm->session_type_to_next, session_type);
1368   vec_validate (smm->session_tx_fns, session_type);
1369
1370   /* *INDENT-OFF* */
1371   if (output_node != ~0)
1372     {
1373       foreach_vlib_main (({
1374           next_index = vlib_node_add_next (this_vlib_main,
1375                                            session_queue_node.index,
1376                                            output_node);
1377       }));
1378     }
1379   /* *INDENT-ON* */
1380
1381   smm->session_type_to_next[session_type] = next_index;
1382   smm->session_tx_fns[session_type] = session_tx_fns[vft->tx_type];
1383 }
1384
1385 transport_connection_t *
1386 session_get_transport (session_t * s)
1387 {
1388   transport_proto_t tp;
1389   if (s->session_state != SESSION_STATE_LISTENING)
1390     {
1391       tp = session_get_transport_proto (s);
1392       return tp_vfts[tp].get_connection (s->connection_index,
1393                                          s->thread_index);
1394     }
1395   return 0;
1396 }
1397
1398 transport_connection_t *
1399 listen_session_get_transport (session_t * s)
1400 {
1401   transport_proto_t tp = session_get_transport_proto (s);
1402   return tp_vfts[tp].get_listener (s->connection_index);
1403 }
1404
1405 int
1406 listen_session_get_local_session_endpoint (session_t * listener,
1407                                            session_endpoint_t * sep)
1408 {
1409   transport_proto_t tp = session_get_transport_proto (listener);
1410   transport_connection_t *tc;
1411   tc = tp_vfts[tp].get_listener (listener->connection_index);
1412   if (!tc)
1413     {
1414       clib_warning ("no transport");
1415       return -1;
1416     }
1417
1418   /* N.B. The ip should not be copied because this is the local endpoint */
1419   sep->port = tc->lcl_port;
1420   sep->transport_proto = tc->proto;
1421   sep->is_ip4 = tc->is_ip4;
1422   return 0;
1423 }
1424
1425 void
1426 session_flush_frames_main_thread (vlib_main_t * vm)
1427 {
1428   ASSERT (vlib_get_thread_index () == 0);
1429   vlib_process_signal_event_mt (vm, session_queue_process_node.index,
1430                                 SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
1431 }
1432
1433 static clib_error_t *
1434 session_manager_main_enable (vlib_main_t * vm)
1435 {
1436   segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1437   session_manager_main_t *smm = &session_manager_main;
1438   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1439   u32 num_threads, preallocated_sessions_per_worker;
1440   session_manager_worker_t *wrk;
1441   int i, j;
1442
1443   num_threads = 1 /* main thread */  + vtm->n_threads;
1444
1445   if (num_threads < 1)
1446     return clib_error_return (0, "n_thread_stacks not set");
1447
1448   /* Allocate cache line aligned worker contexts */
1449   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1450
1451   for (i = 0; i < TRANSPORT_N_PROTO; i++)
1452     {
1453       for (j = 0; j < num_threads; j++)
1454         smm->wrk[j].current_enqueue_epoch[i] = 1;
1455     }
1456
1457   for (i = 0; i < num_threads; i++)
1458     {
1459       wrk = &smm->wrk[i];
1460       vec_validate (wrk->free_event_vector, 128);
1461       _vec_len (wrk->free_event_vector) = 0;
1462       vec_validate (wrk->pending_event_vector, 128);
1463       _vec_len (wrk->pending_event_vector) = 0;
1464       vec_validate (wrk->pending_disconnects, 128);
1465       _vec_len (wrk->pending_disconnects) = 0;
1466       vec_validate (wrk->postponed_event_vector, 128);
1467       _vec_len (wrk->postponed_event_vector) = 0;
1468
1469       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
1470       wrk->dispatch_period = 500e-6;
1471
1472       if (num_threads > 1)
1473         clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1474     }
1475
1476 #if SESSION_DEBUG
1477   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1478 #endif
1479
1480   /* Allocate vpp event queues segment and queue */
1481   session_vpp_event_queues_allocate (smm);
1482
1483   /* Initialize fifo segment main baseva and timeout */
1484   sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1485   sm_args->size = smm->session_va_space_size;
1486   segment_manager_main_init (sm_args);
1487
1488   /* Preallocate sessions */
1489   if (smm->preallocated_sessions)
1490     {
1491       if (num_threads == 1)
1492         {
1493           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1494         }
1495       else
1496         {
1497           int j;
1498           preallocated_sessions_per_worker =
1499             (1.1 * (f64) smm->preallocated_sessions /
1500              (f64) (num_threads - 1));
1501
1502           for (j = 1; j < num_threads; j++)
1503             {
1504               pool_init_fixed (smm->wrk[j].sessions,
1505                                preallocated_sessions_per_worker);
1506             }
1507         }
1508     }
1509
1510   session_lookup_init ();
1511   app_namespaces_init ();
1512   transport_init ();
1513
1514   smm->is_enabled = 1;
1515
1516   /* Enable transports */
1517   transport_enable_disable (vm, 1);
1518   transport_init_tx_pacers_period ();
1519   return 0;
1520 }
1521
1522 void
1523 session_node_enable_disable (u8 is_en)
1524 {
1525   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1526   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1527   u8 have_workers = vtm->n_threads != 0;
1528
1529   /* *INDENT-OFF* */
1530   foreach_vlib_main (({
1531     if (have_workers && ii == 0)
1532       {
1533         vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1534                              state);
1535         if (is_en)
1536           {
1537             vlib_node_t *n = vlib_get_node (this_vlib_main,
1538                                             session_queue_process_node.index);
1539             vlib_start_process (this_vlib_main, n->runtime_index);
1540           }
1541         else
1542           {
1543             vlib_process_signal_event_mt (this_vlib_main,
1544                                           session_queue_process_node.index,
1545                                           SESSION_Q_PROCESS_STOP, 0);
1546           }
1547
1548         continue;
1549       }
1550     vlib_node_set_state (this_vlib_main, session_queue_node.index,
1551                          state);
1552   }));
1553   /* *INDENT-ON* */
1554 }
1555
1556 clib_error_t *
1557 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1558 {
1559   clib_error_t *error = 0;
1560   if (is_en)
1561     {
1562       if (session_manager_main.is_enabled)
1563         return 0;
1564
1565       session_node_enable_disable (is_en);
1566       error = session_manager_main_enable (vm);
1567     }
1568   else
1569     {
1570       session_manager_main.is_enabled = 0;
1571       session_node_enable_disable (is_en);
1572     }
1573
1574   return error;
1575 }
1576
1577 clib_error_t *
1578 session_manager_main_init (vlib_main_t * vm)
1579 {
1580   session_manager_main_t *smm = &session_manager_main;
1581   smm->session_baseva = HIGH_SEGMENT_BASEVA;
1582 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1583   smm->session_va_space_size = 128ULL << 30;
1584   smm->evt_qs_segment_size = 64 << 20;
1585 #else
1586   smm->session_va_space_size = 128 << 20;
1587   smm->evt_qs_segment_size = 1 << 20;
1588 #endif
1589   smm->is_enabled = 0;
1590   return 0;
1591 }
1592
1593 VLIB_INIT_FUNCTION (session_manager_main_init);
1594
1595 static clib_error_t *
1596 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1597 {
1598   session_manager_main_t *smm = &session_manager_main;
1599   u32 nitems;
1600   uword tmp;
1601
1602   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1603     {
1604       if (unformat (input, "event-queue-length %d", &nitems))
1605         {
1606           if (nitems >= 2048)
1607             smm->configured_event_queue_length = nitems;
1608           else
1609             clib_warning ("event queue length %d too small, ignored", nitems);
1610         }
1611       else if (unformat (input, "preallocated-sessions %d",
1612                          &smm->preallocated_sessions))
1613         ;
1614       else if (unformat (input, "v4-session-table-buckets %d",
1615                          &smm->configured_v4_session_table_buckets))
1616         ;
1617       else if (unformat (input, "v4-halfopen-table-buckets %d",
1618                          &smm->configured_v4_halfopen_table_buckets))
1619         ;
1620       else if (unformat (input, "v6-session-table-buckets %d",
1621                          &smm->configured_v6_session_table_buckets))
1622         ;
1623       else if (unformat (input, "v6-halfopen-table-buckets %d",
1624                          &smm->configured_v6_halfopen_table_buckets))
1625         ;
1626       else if (unformat (input, "v4-session-table-memory %U",
1627                          unformat_memory_size, &tmp))
1628         {
1629           if (tmp >= 0x100000000)
1630             return clib_error_return (0, "memory size %llx (%lld) too large",
1631                                       tmp, tmp);
1632           smm->configured_v4_session_table_memory = tmp;
1633         }
1634       else if (unformat (input, "v4-halfopen-table-memory %U",
1635                          unformat_memory_size, &tmp))
1636         {
1637           if (tmp >= 0x100000000)
1638             return clib_error_return (0, "memory size %llx (%lld) too large",
1639                                       tmp, tmp);
1640           smm->configured_v4_halfopen_table_memory = tmp;
1641         }
1642       else if (unformat (input, "v6-session-table-memory %U",
1643                          unformat_memory_size, &tmp))
1644         {
1645           if (tmp >= 0x100000000)
1646             return clib_error_return (0, "memory size %llx (%lld) too large",
1647                                       tmp, tmp);
1648           smm->configured_v6_session_table_memory = tmp;
1649         }
1650       else if (unformat (input, "v6-halfopen-table-memory %U",
1651                          unformat_memory_size, &tmp))
1652         {
1653           if (tmp >= 0x100000000)
1654             return clib_error_return (0, "memory size %llx (%lld) too large",
1655                                       tmp, tmp);
1656           smm->configured_v6_halfopen_table_memory = tmp;
1657         }
1658       else if (unformat (input, "local-endpoints-table-memory %U",
1659                          unformat_memory_size, &tmp))
1660         {
1661           if (tmp >= 0x100000000)
1662             return clib_error_return (0, "memory size %llx (%lld) too large",
1663                                       tmp, tmp);
1664           smm->local_endpoints_table_memory = tmp;
1665         }
1666       else if (unformat (input, "local-endpoints-table-buckets %d",
1667                          &smm->local_endpoints_table_buckets))
1668         ;
1669       else if (unformat (input, "evt_qs_memfd_seg"))
1670         smm->evt_qs_use_memfd_seg = 1;
1671       else
1672         return clib_error_return (0, "unknown input `%U'",
1673                                   format_unformat_error, input);
1674     }
1675   return 0;
1676 }
1677
1678 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1679
1680 /*
1681  * fd.io coding-style-patch-verification: ON
1682  *
1683  * Local Variables:
1684  * eval: (c-set-style "gnu")
1685  * End:
1686  */