session: call session_dequeue_notify after svm_fifo_dequeue_drop
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25
26 session_main_t session_main;
27
28 static inline int
29 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
30                             session_evt_type_t evt_type)
31 {
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35   u32 tries = 0, max_tries;
36
37   mq = session_main_get_vpp_event_queue (thread_index);
38   while (svm_msg_q_try_lock (mq))
39     {
40       max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
41       if (tries++ == max_tries)
42         {
43           SESSION_DBG ("failed to enqueue evt");
44           return -1;
45         }
46     }
47   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
48     {
49       svm_msg_q_unlock (mq);
50       return -2;
51     }
52   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
53   if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
54     {
55       svm_msg_q_unlock (mq);
56       return -2;
57     }
58   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59   evt->event_type = evt_type;
60   switch (evt_type)
61     {
62     case SESSION_CTRL_EVT_RPC:
63       evt->rpc_args.fp = data;
64       evt->rpc_args.arg = args;
65       break;
66     case SESSION_IO_EVT_TX:
67     case SESSION_IO_EVT_TX_FLUSH:
68     case SESSION_IO_EVT_BUILTIN_RX:
69       evt->session_index = *(u32 *) data;
70       break;
71     case SESSION_IO_EVT_BUILTIN_TX:
72     case SESSION_CTRL_EVT_CLOSE:
73       evt->session_handle = session_handle ((session_t *) data);
74       break;
75     default:
76       clib_warning ("evt unhandled!");
77       svm_msg_q_unlock (mq);
78       return -1;
79     }
80
81   svm_msg_q_add_and_unlock (mq, &msg);
82   return 0;
83 }
84
85 int
86 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
87 {
88   return session_send_evt_to_thread (&f->master_session_index, 0,
89                                      f->master_thread_index, evt_type);
90 }
91
92 int
93 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
94                                       session_evt_type_t evt_type)
95 {
96   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
97 }
98
99 int
100 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
101 {
102   /* only event supported for now is disconnect */
103   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE);
104   return session_send_evt_to_thread (s, 0, s->thread_index,
105                                      SESSION_CTRL_EVT_CLOSE);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 static void
129 session_program_transport_close (session_t * s)
130 {
131   u32 thread_index = vlib_get_thread_index ();
132   session_worker_t *wrk;
133   session_event_t *evt;
134
135   /* If we are in the handler thread, or being called with the worker barrier
136    * held, just append a new event to pending disconnects vector. */
137   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
138     {
139       wrk = session_main_get_worker (s->thread_index);
140       vec_add2 (wrk->pending_disconnects, evt, 1);
141       clib_memset (evt, 0, sizeof (*evt));
142       evt->session_handle = session_handle (s);
143       evt->event_type = SESSION_CTRL_EVT_CLOSE;
144     }
145   else
146     session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE);
147 }
148
149 session_t *
150 session_alloc (u32 thread_index)
151 {
152   session_worker_t *wrk = &session_main.wrk[thread_index];
153   session_t *s;
154   u8 will_expand = 0;
155   pool_get_aligned_will_expand (wrk->sessions, will_expand,
156                                 CLIB_CACHE_LINE_BYTES);
157   /* If we have peekers, let them finish */
158   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
159     {
160       clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
161       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
162       clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
163     }
164   else
165     {
166       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
167     }
168   clib_memset (s, 0, sizeof (*s));
169   s->session_index = s - wrk->sessions;
170   s->thread_index = thread_index;
171   s->app_index = APP_INVALID_INDEX;
172   return s;
173 }
174
175 void
176 session_free (session_t * s)
177 {
178   if (CLIB_DEBUG)
179     {
180       u8 thread_index = s->thread_index;
181       clib_memset (s, 0xFA, sizeof (*s));
182       pool_put (session_main.wrk[thread_index].sessions, s);
183       return;
184     }
185   SESSION_EVT_DBG (SESSION_EVT_FREE, s);
186   pool_put (session_main.wrk[s->thread_index].sessions, s);
187 }
188
189 void
190 session_free_w_fifos (session_t * s)
191 {
192   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
193   session_free (s);
194 }
195
196 /**
197  * Cleans up session and lookup table.
198  *
199  * Transport connection must still be valid.
200  */
201 static void
202 session_delete (session_t * s)
203 {
204   int rv;
205
206   /* Delete from the main lookup table. */
207   if ((rv = session_lookup_del_session (s)))
208     clib_warning ("hash delete error, rv %d", rv);
209
210   session_free_w_fifos (s);
211 }
212
213 static session_t *
214 session_alloc_for_connection (transport_connection_t * tc)
215 {
216   session_t *s;
217   u32 thread_index = tc->thread_index;
218
219   ASSERT (thread_index == vlib_get_thread_index ()
220           || transport_protocol_is_cl (tc->proto));
221
222   s = session_alloc (thread_index);
223   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
224   s->session_state = SESSION_STATE_CLOSED;
225
226   /* Attach transport to session and vice versa */
227   s->connection_index = tc->c_index;
228   tc->s_index = s->session_index;
229   return s;
230 }
231
232 /**
233  * Discards bytes from buffer chain
234  *
235  * It discards n_bytes_to_drop starting at first buffer after chain_b
236  */
237 always_inline void
238 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
239                                      vlib_buffer_t ** chain_b,
240                                      u32 n_bytes_to_drop)
241 {
242   vlib_buffer_t *next = *chain_b;
243   u32 to_drop = n_bytes_to_drop;
244   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
245   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
246     {
247       next = vlib_get_buffer (vm, next->next_buffer);
248       if (next->current_length > to_drop)
249         {
250           vlib_buffer_advance (next, to_drop);
251           to_drop = 0;
252         }
253       else
254         {
255           to_drop -= next->current_length;
256           next->current_length = 0;
257         }
258     }
259   *chain_b = next;
260
261   if (to_drop == 0)
262     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
263 }
264
265 /**
266  * Enqueue buffer chain tail
267  */
268 always_inline int
269 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
270                             u32 offset, u8 is_in_order)
271 {
272   vlib_buffer_t *chain_b;
273   u32 chain_bi, len, diff;
274   vlib_main_t *vm = vlib_get_main ();
275   u8 *data;
276   u32 written = 0;
277   int rv = 0;
278
279   if (is_in_order && offset)
280     {
281       diff = offset - b->current_length;
282       if (diff > b->total_length_not_including_first_buffer)
283         return 0;
284       chain_b = b;
285       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
286       chain_bi = vlib_get_buffer_index (vm, chain_b);
287     }
288   else
289     chain_bi = b->next_buffer;
290
291   do
292     {
293       chain_b = vlib_get_buffer (vm, chain_bi);
294       data = vlib_buffer_get_current (chain_b);
295       len = chain_b->current_length;
296       if (!len)
297         continue;
298       if (is_in_order)
299         {
300           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
301           if (rv == len)
302             {
303               written += rv;
304             }
305           else if (rv < len)
306             {
307               return (rv > 0) ? (written + rv) : written;
308             }
309           else if (rv > len)
310             {
311               written += rv;
312
313               /* written more than what was left in chain */
314               if (written > b->total_length_not_including_first_buffer)
315                 return written;
316
317               /* drop the bytes that have already been delivered */
318               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
319             }
320         }
321       else
322         {
323           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
324           if (rv)
325             {
326               clib_warning ("failed to enqueue multi-buffer seg");
327               return -1;
328             }
329           offset += len;
330         }
331     }
332   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
333           ? chain_b->next_buffer : 0));
334
335   if (is_in_order)
336     return written;
337
338   return 0;
339 }
340
341 /*
342  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
343  * event but on request can queue notification events for later delivery by
344  * calling stream_server_flush_enqueue_events().
345  *
346  * @param tc Transport connection which is to be enqueued data
347  * @param b Buffer to be enqueued
348  * @param offset Offset at which to start enqueueing if out-of-order
349  * @param queue_event Flag to indicate if peer is to be notified or if event
350  *                    is to be queued. The former is useful when more data is
351  *                    enqueued and only one event is to be generated.
352  * @param is_in_order Flag to indicate if data is in order
353  * @return Number of bytes enqueued or a negative value if enqueueing failed.
354  */
355 int
356 session_enqueue_stream_connection (transport_connection_t * tc,
357                                    vlib_buffer_t * b, u32 offset,
358                                    u8 queue_event, u8 is_in_order)
359 {
360   session_t *s;
361   int enqueued = 0, rv, in_order_off;
362
363   s = session_get (tc->s_index, tc->thread_index);
364
365   if (is_in_order)
366     {
367       enqueued = svm_fifo_enqueue (s->rx_fifo,
368                                    b->current_length,
369                                    vlib_buffer_get_current (b));
370       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
371                          && enqueued >= 0))
372         {
373           in_order_off = enqueued > b->current_length ? enqueued : 0;
374           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
375           if (rv > 0)
376             enqueued += rv;
377         }
378     }
379   else
380     {
381       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
382                                          b->current_length,
383                                          vlib_buffer_get_current (b));
384       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
385         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
386       /* if something was enqueued, report even this as success for ooo
387        * segment handling */
388       return rv;
389     }
390
391   if (queue_event)
392     {
393       /* Queue RX event on this fifo. Eventually these will need to be flushed
394        * by calling stream_server_flush_enqueue_events () */
395       session_worker_t *wrk;
396
397       wrk = session_main_get_worker (s->thread_index);
398       if (!(s->flags & SESSION_F_RX_EVT))
399         {
400           s->flags |= SESSION_F_RX_EVT;
401           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
402         }
403     }
404
405   return enqueued;
406 }
407
408 int
409 session_enqueue_dgram_connection (session_t * s,
410                                   session_dgram_hdr_t * hdr,
411                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
412 {
413   int enqueued = 0, rv, in_order_off;
414
415   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
416           >= b->current_length + sizeof (*hdr));
417
418   svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
419   enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
420                                vlib_buffer_get_current (b));
421   if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
422     {
423       in_order_off = enqueued > b->current_length ? enqueued : 0;
424       rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
425       if (rv > 0)
426         enqueued += rv;
427     }
428   if (queue_event)
429     {
430       /* Queue RX event on this fifo. Eventually these will need to be flushed
431        * by calling stream_server_flush_enqueue_events () */
432       session_worker_t *wrk;
433
434       wrk = session_main_get_worker (s->thread_index);
435       if (!(s->flags & SESSION_F_RX_EVT))
436         {
437           s->flags |= SESSION_F_RX_EVT;
438           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
439         }
440     }
441   return enqueued;
442 }
443
444 int
445 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
446                             u32 offset, u32 max_bytes)
447 {
448   session_t *s = session_get (tc->s_index, tc->thread_index);
449   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
450 }
451
452 u32
453 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
454 {
455   session_t *s = session_get (tc->s_index, tc->thread_index);
456   u32 rv;
457
458   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
459
460   if (svm_fifo_needs_tx_ntf (s->tx_fifo, max_bytes))
461     session_dequeue_notify (s);
462
463   return rv;
464 }
465
466 static inline int
467 session_notify_subscribers (u32 app_index, session_t * s,
468                             svm_fifo_t * f, session_evt_type_t evt_type)
469 {
470   app_worker_t *app_wrk;
471   application_t *app;
472   int i;
473
474   app = application_get (app_index);
475   if (!app)
476     return -1;
477
478   for (i = 0; i < f->n_subscribers; i++)
479     {
480       app_wrk = application_get_worker (app, f->subscribers[i]);
481       if (!app_wrk)
482         continue;
483       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
484         return -1;
485     }
486
487   return 0;
488 }
489
490 /**
491  * Notify session peer that new data has been enqueued.
492  *
493  * @param s     Stream session for which the event is to be generated.
494  * @param lock  Flag to indicate if call should lock message queue.
495  *
496  * @return 0 on success or negative number if failed to send notification.
497  */
498 static inline int
499 session_enqueue_notify_inline (session_t * s)
500 {
501   app_worker_t *app_wrk;
502   u32 session_index;
503   u8 n_subscribers;
504
505   session_index = s->session_index;
506   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
507
508   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
509   if (PREDICT_FALSE (!app_wrk))
510     {
511       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
512       return 0;
513     }
514
515   /* *INDENT-OFF* */
516   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
517       ed->data[0] = SESSION_IO_EVT_RX;
518       ed->data[1] = svm_fifo_max_dequeue_prod (s->rx_fifo);
519   }));
520   /* *INDENT-ON* */
521
522   s->flags &= ~SESSION_F_RX_EVT;
523   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
524                                                      SESSION_IO_EVT_RX)))
525     return -1;
526
527   if (PREDICT_FALSE (n_subscribers))
528     {
529       s = session_get (session_index, vlib_get_thread_index ());
530       return session_notify_subscribers (app_wrk->app_index, s,
531                                          s->rx_fifo, SESSION_IO_EVT_RX);
532     }
533
534   return 0;
535 }
536
537 int
538 session_enqueue_notify (session_t * s)
539 {
540   return session_enqueue_notify_inline (s);
541 }
542
543 int
544 session_dequeue_notify (session_t * s)
545 {
546   app_worker_t *app_wrk;
547
548   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
549   if (PREDICT_FALSE (!app_wrk))
550     return -1;
551
552   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
553                                                      SESSION_IO_EVT_TX)))
554     return -1;
555
556   if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
557     return session_notify_subscribers (app_wrk->app_index, s,
558                                        s->tx_fifo, SESSION_IO_EVT_TX);
559
560   svm_fifo_clear_tx_ntf (s->tx_fifo);
561
562   return 0;
563 }
564
565 /**
566  * Flushes queue of sessions that are to be notified of new data
567  * enqueued events.
568  *
569  * @param thread_index Thread index for which the flush is to be performed.
570  * @return 0 on success or a positive number indicating the number of
571  *         failures due to API queue being full.
572  */
573 int
574 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
575 {
576   session_worker_t *wrk = session_main_get_worker (thread_index);
577   session_t *s;
578   int i, errors = 0;
579   u32 *indices;
580
581   indices = wrk->session_to_enqueue[transport_proto];
582
583   for (i = 0; i < vec_len (indices); i++)
584     {
585       s = session_get_if_valid (indices[i], thread_index);
586       if (PREDICT_FALSE (!s))
587         {
588           errors++;
589           continue;
590         }
591
592       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
593         errors++;
594     }
595
596   vec_reset_length (indices);
597   wrk->session_to_enqueue[transport_proto] = indices;
598
599   return errors;
600 }
601
602 int
603 session_main_flush_all_enqueue_events (u8 transport_proto)
604 {
605   vlib_thread_main_t *vtm = vlib_get_thread_main ();
606   int i, errors = 0;
607   for (i = 0; i < 1 + vtm->n_threads; i++)
608     errors += session_main_flush_enqueue_events (transport_proto, i);
609   return errors;
610 }
611
612 int
613 session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
614 {
615   u32 opaque = 0, new_ti, new_si;
616   app_worker_t *app_wrk;
617   session_t *s = 0;
618   u64 ho_handle;
619
620   /*
621    * Find connection handle and cleanup half-open table
622    */
623   ho_handle = session_lookup_half_open_handle (tc);
624   if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
625     {
626       SESSION_DBG ("half-open was removed!");
627       return -1;
628     }
629   session_lookup_del_half_open (tc);
630
631   /* Get the app's index from the handle we stored when opening connection
632    * and the opaque (api_context for external apps) from transport session
633    * index */
634   app_wrk = app_worker_get_if_valid (ho_handle >> 32);
635   if (!app_wrk)
636     return -1;
637
638   opaque = tc->s_index;
639
640   if (is_fail)
641     return app_worker_connect_notify (app_wrk, s, opaque);
642
643   s = session_alloc_for_connection (tc);
644   s->session_state = SESSION_STATE_CONNECTING;
645   s->app_wrk_index = app_wrk->wrk_index;
646   new_si = s->session_index;
647   new_ti = s->thread_index;
648
649   if (app_worker_init_connected (app_wrk, s))
650     {
651       session_free (s);
652       app_worker_connect_notify (app_wrk, 0, opaque);
653       return -1;
654     }
655
656   if (app_worker_connect_notify (app_wrk, s, opaque))
657     {
658       s = session_get (new_si, new_ti);
659       session_free_w_fifos (s);
660       return -1;
661     }
662
663   s = session_get (new_si, new_ti);
664   s->session_state = SESSION_STATE_READY;
665   session_lookup_add_connection (tc, session_handle (s));
666
667   return 0;
668 }
669
670 typedef struct _session_switch_pool_args
671 {
672   u32 session_index;
673   u32 thread_index;
674   u32 new_thread_index;
675   u32 new_session_index;
676 } session_switch_pool_args_t;
677
678 static void
679 session_switch_pool (void *cb_args)
680 {
681   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
682   session_t *s;
683   ASSERT (args->thread_index == vlib_get_thread_index ());
684   s = session_get (args->session_index, args->thread_index);
685   s->tx_fifo->master_session_index = args->new_session_index;
686   s->tx_fifo->master_thread_index = args->new_thread_index;
687   transport_cleanup (session_get_transport_proto (s), s->connection_index,
688                      s->thread_index);
689   session_free (s);
690   clib_mem_free (cb_args);
691 }
692
693 /**
694  * Move dgram session to the right thread
695  */
696 int
697 session_dgram_connect_notify (transport_connection_t * tc,
698                               u32 old_thread_index, session_t ** new_session)
699 {
700   session_t *new_s;
701   session_switch_pool_args_t *rpc_args;
702
703   /*
704    * Clone half-open session to the right thread.
705    */
706   new_s = session_clone_safe (tc->s_index, old_thread_index);
707   new_s->connection_index = tc->c_index;
708   new_s->rx_fifo->master_session_index = new_s->session_index;
709   new_s->rx_fifo->master_thread_index = new_s->thread_index;
710   new_s->session_state = SESSION_STATE_READY;
711   session_lookup_add_connection (tc, session_handle (new_s));
712
713   /*
714    * Ask thread owning the old session to clean it up and make us the tx
715    * fifo owner
716    */
717   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
718   rpc_args->new_session_index = new_s->session_index;
719   rpc_args->new_thread_index = new_s->thread_index;
720   rpc_args->session_index = tc->s_index;
721   rpc_args->thread_index = old_thread_index;
722   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
723                                   rpc_args);
724
725   tc->s_index = new_s->session_index;
726   new_s->connection_index = tc->c_index;
727   *new_session = new_s;
728   return 0;
729 }
730
731 /**
732  * Notification from transport that connection is being closed.
733  *
734  * A disconnect is sent to application but state is not removed. Once
735  * disconnect is acknowledged by application, session disconnect is called.
736  * Ultimately this leads to close being called on transport (passive close).
737  */
738 void
739 session_transport_closing_notify (transport_connection_t * tc)
740 {
741   app_worker_t *app_wrk;
742   session_t *s;
743
744   s = session_get (tc->s_index, tc->thread_index);
745   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
746     return;
747   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
748   app_wrk = app_worker_get (s->app_wrk_index);
749   app_worker_close_notify (app_wrk, s);
750 }
751
752 /**
753  * Notification from transport that connection is being deleted
754  *
755  * This removes the session if it is still valid. It should be called only on
756  * previously fully established sessions. For instance failed connects should
757  * call stream_session_connect_notify and indicate that the connect has
758  * failed.
759  */
760 void
761 session_transport_delete_notify (transport_connection_t * tc)
762 {
763   session_t *s;
764
765   /* App might've been removed already */
766   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
767     return;
768
769   /* Make sure we don't try to send anything more */
770   svm_fifo_dequeue_drop_all (s->tx_fifo);
771
772   switch (s->session_state)
773     {
774     case SESSION_STATE_CREATED:
775       /* Session was created but accept notification was not yet sent to the
776        * app. Cleanup everything. */
777       session_lookup_del_session (s);
778       session_free_w_fifos (s);
779       break;
780     case SESSION_STATE_ACCEPTING:
781     case SESSION_STATE_TRANSPORT_CLOSING:
782       /* If transport finishes or times out before we get a reply
783        * from the app, mark transport as closed and wait for reply
784        * before removing the session. Cleanup session table in advance
785        * because transport will soon be closed and closed sessions
786        * are assumed to have been removed from the lookup table */
787       session_lookup_del_session (s);
788       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
789       break;
790     case SESSION_STATE_CLOSING:
791     case SESSION_STATE_CLOSED_WAITING:
792       /* Cleanup lookup table as transport needs to still be valid.
793        * Program transport close to ensure that all session events
794        * have been cleaned up. Once transport close is called, the
795        * session is just removed because both transport and app have
796        * confirmed the close*/
797       session_lookup_del_session (s);
798       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
799       session_program_transport_close (s);
800       break;
801     case SESSION_STATE_TRANSPORT_CLOSED:
802       break;
803     case SESSION_STATE_CLOSED:
804       session_delete (s);
805       break;
806     default:
807       clib_warning ("session state %u", s->session_state);
808       session_delete (s);
809       break;
810     }
811 }
812
813 /**
814  * Notification from transport that session can be closed
815  *
816  * Should be called by transport only if it was closed with non-empty
817  * tx fifo and once it decides to begin the closing procedure prior to
818  * issuing a delete notify. This gives the chance to the session layer
819  * to cleanup any outstanding events.
820  */
821 void
822 session_transport_closed_notify (transport_connection_t * tc)
823 {
824   session_t *s;
825
826   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
827     return;
828
829   /* Transport thinks that app requested close but it actually didn't.
830    * Can happen for tcp if fin and rst are received in close succession. */
831   if (s->session_state == SESSION_STATE_READY)
832     {
833       session_transport_closing_notify (tc);
834       svm_fifo_dequeue_drop_all (s->tx_fifo);
835     }
836   /* If app close has not been received or has not yet resulted in
837    * a transport close, only mark the session transport as closed */
838   else if (s->session_state <= SESSION_STATE_CLOSING)
839     {
840       session_lookup_del_session (s);
841       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
842     }
843   else
844     s->session_state = SESSION_STATE_CLOSED;
845 }
846
847 /**
848  * Notify application that connection has been reset.
849  */
850 void
851 session_transport_reset_notify (transport_connection_t * tc)
852 {
853   app_worker_t *app_wrk;
854   session_t *s;
855
856   s = session_get (tc->s_index, tc->thread_index);
857   svm_fifo_dequeue_drop_all (s->tx_fifo);
858   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
859     return;
860   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
861   app_wrk = app_worker_get (s->app_wrk_index);
862   app_worker_reset_notify (app_wrk, s);
863 }
864
865 int
866 session_stream_accept_notify (transport_connection_t * tc)
867 {
868   app_worker_t *app_wrk;
869   session_t *s;
870
871   s = session_get (tc->s_index, tc->thread_index);
872   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
873   if (!app_wrk)
874     return -1;
875   s->session_state = SESSION_STATE_ACCEPTING;
876   return app_worker_accept_notify (app_wrk, s);
877 }
878
879 /**
880  * Accept a stream session. Optionally ping the server by callback.
881  */
882 int
883 session_stream_accept (transport_connection_t * tc, u32 listener_index,
884                        u32 thread_index, u8 notify)
885 {
886   session_t *s;
887   int rv;
888
889   s = session_alloc_for_connection (tc);
890   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
891   s->session_state = SESSION_STATE_CREATED;
892
893   if ((rv = app_worker_init_accepted (s)))
894     return rv;
895
896   session_lookup_add_connection (tc, session_handle (s));
897
898   /* Shoulder-tap the server */
899   if (notify)
900     {
901       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
902       return app_worker_accept_notify (app_wrk, s);
903     }
904
905   return 0;
906 }
907
908 int
909 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
910 {
911   transport_connection_t *tc;
912   transport_endpoint_cfg_t *tep;
913   app_worker_t *app_wrk;
914   session_handle_t sh;
915   session_t *s;
916   int rv;
917
918   tep = session_endpoint_to_transport_cfg (rmt);
919   rv = transport_connect (rmt->transport_proto, tep);
920   if (rv < 0)
921     {
922       SESSION_DBG ("Transport failed to open connection.");
923       return VNET_API_ERROR_SESSION_CONNECT;
924     }
925
926   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
927
928   /* For dgram type of service, allocate session and fifos now */
929   app_wrk = app_worker_get (app_wrk_index);
930   s = session_alloc_for_connection (tc);
931   s->app_wrk_index = app_wrk->wrk_index;
932   s->session_state = SESSION_STATE_OPENED;
933   if (app_worker_init_connected (app_wrk, s))
934     {
935       session_free (s);
936       return -1;
937     }
938
939   sh = session_handle (s);
940   session_lookup_add_connection (tc, sh);
941
942   return app_worker_connect_notify (app_wrk, s, opaque);
943 }
944
945 int
946 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
947 {
948   transport_connection_t *tc;
949   transport_endpoint_cfg_t *tep;
950   u64 handle;
951   int rv;
952
953   tep = session_endpoint_to_transport_cfg (rmt);
954   rv = transport_connect (rmt->transport_proto, tep);
955   if (rv < 0)
956     {
957       SESSION_DBG ("Transport failed to open connection.");
958       return VNET_API_ERROR_SESSION_CONNECT;
959     }
960
961   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
962
963   /* If transport offers a stream service, only allocate session once the
964    * connection has been established.
965    * Add connection to half-open table and save app and tc index. The
966    * latter is needed to help establish the connection while the former
967    * is needed when the connect notify comes and we have to notify the
968    * external app
969    */
970   handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
971   session_lookup_add_half_open (tc, handle);
972
973   /* Store api_context (opaque) for when the reply comes. Not the nicest
974    * thing but better than allocating a separate half-open pool.
975    */
976   tc->s_index = opaque;
977   return 0;
978 }
979
980 int
981 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
982 {
983   session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
984   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
985
986   sep->app_wrk_index = app_wrk_index;
987   sep->opaque = opaque;
988
989   return transport_connect (rmt->transport_proto, tep_cfg);
990 }
991
992 typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
993
994 /* *INDENT-OFF* */
995 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
996   session_open_vc,
997   session_open_cl,
998   session_open_app,
999 };
1000 /* *INDENT-ON* */
1001
1002 /**
1003  * Ask transport to open connection to remote transport endpoint.
1004  *
1005  * Stores handle for matching request with reply since the call can be
1006  * asynchronous. For instance, for TCP the 3-way handshake must complete
1007  * before reply comes. Session is only created once connection is established.
1008  *
1009  * @param app_index Index of the application requesting the connect
1010  * @param st Session type requested.
1011  * @param tep Remote transport endpoint
1012  * @param opaque Opaque data (typically, api_context) the application expects
1013  *               on open completion.
1014  */
1015 int
1016 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1017 {
1018   transport_service_type_t tst;
1019   tst = transport_protocol_service_type (rmt->transport_proto);
1020   return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1021 }
1022
1023 /**
1024  * Ask transport to listen on session endpoint.
1025  *
1026  * @param s Session for which listen will be called. Note that unlike
1027  *          established sessions, listen sessions are not associated to a
1028  *          thread.
1029  * @param sep Local endpoint to be listened on.
1030  */
1031 int
1032 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1033 {
1034   transport_endpoint_t *tep;
1035   u32 tc_index, s_index;
1036
1037   /* Transport bind/listen */
1038   tep = session_endpoint_to_transport (sep);
1039   s_index = ls->session_index;
1040   tc_index = transport_start_listen (session_get_transport_proto (ls),
1041                                      s_index, tep);
1042
1043   if (tc_index == (u32) ~ 0)
1044     return -1;
1045
1046   /* Attach transport to session. Lookup tables are populated by the app
1047    * worker because local tables (for ct sessions) are not backed by a fib */
1048   ls = listen_session_get (s_index);
1049   ls->connection_index = tc_index;
1050
1051   return 0;
1052 }
1053
1054 /**
1055  * Ask transport to stop listening on local transport endpoint.
1056  *
1057  * @param s Session to stop listening on. It must be in state LISTENING.
1058  */
1059 int
1060 session_stop_listen (session_t * s)
1061 {
1062   transport_proto_t tp = session_get_transport_proto (s);
1063   transport_connection_t *tc;
1064
1065   if (s->session_state != SESSION_STATE_LISTENING)
1066     return -1;
1067
1068   tc = transport_get_listener (tp, s->connection_index);
1069   if (!tc)
1070     return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1071
1072   session_lookup_del_connection (tc);
1073   transport_stop_listen (tp, s->connection_index);
1074   return 0;
1075 }
1076
1077 /**
1078  * Initialize session closing procedure.
1079  *
1080  * Request is always sent to session node to ensure that all outstanding
1081  * requests are served before transport is notified.
1082  */
1083 void
1084 session_close (session_t * s)
1085 {
1086   if (!s)
1087     return;
1088
1089   if (s->session_state >= SESSION_STATE_CLOSING)
1090     {
1091       /* Session will only be removed once both app and transport
1092        * acknowledge the close */
1093       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1094         session_program_transport_close (s);
1095
1096       /* Session already closed. Clear the tx fifo */
1097       if (s->session_state == SESSION_STATE_CLOSED)
1098         svm_fifo_dequeue_drop_all (s->tx_fifo);
1099       return;
1100     }
1101
1102   s->session_state = SESSION_STATE_CLOSING;
1103   session_program_transport_close (s);
1104 }
1105
1106 /**
1107  * Notify transport the session can be disconnected. This should eventually
1108  * result in a delete notification that allows us to cleanup session state.
1109  * Called for both active/passive disconnects.
1110  *
1111  * Must be called from the session's thread.
1112  */
1113 void
1114 session_transport_close (session_t * s)
1115 {
1116   /* If transport is already closed, just free the session */
1117   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1118     {
1119       session_free_w_fifos (s);
1120       return;
1121     }
1122
1123   /* If tx queue wasn't drained, change state to closed waiting for transport.
1124    * This way, the transport, if it so wishes, can continue to try sending the
1125    * outstanding data (in closed state it cannot). It MUST however at one
1126    * point, either after sending everything or after a timeout, call delete
1127    * notify. This will finally lead to the complete cleanup of the session.
1128    */
1129   if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1130     s->session_state = SESSION_STATE_CLOSED_WAITING;
1131   else
1132     s->session_state = SESSION_STATE_CLOSED;
1133
1134   transport_close (session_get_transport_proto (s), s->connection_index,
1135                    s->thread_index);
1136 }
1137
1138 /**
1139  * Cleanup transport and session state.
1140  *
1141  * Notify transport of the cleanup and free the session. This should
1142  * be called only if transport reported some error and is already
1143  * closed.
1144  */
1145 void
1146 session_transport_cleanup (session_t * s)
1147 {
1148   s->session_state = SESSION_STATE_CLOSED;
1149
1150   /* Delete from main lookup table before we axe the the transport */
1151   session_lookup_del_session (s);
1152   transport_cleanup (session_get_transport_proto (s), s->connection_index,
1153                      s->thread_index);
1154   /* Since we called cleanup, no delete notification will come. So, make
1155    * sure the session is properly freed. */
1156   session_free_w_fifos (s);
1157 }
1158
1159 /**
1160  * Allocate event queues in the shared-memory segment
1161  *
1162  * That can either be a newly created memfd segment, that will need to be
1163  * mapped by all stack users, or the binary api's svm region. The latter is
1164  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1165  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1166  * vpp uses api svm region for event queues.
1167  */
1168 void
1169 session_vpp_event_queues_allocate (session_main_t * smm)
1170 {
1171   u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1172   ssvm_private_t *eqs = &smm->evt_qs_segment;
1173   api_main_t *am = &api_main;
1174   uword eqs_size = 64 << 20;
1175   pid_t vpp_pid = getpid ();
1176   void *oldheap;
1177   int i;
1178
1179   if (smm->configured_event_queue_length)
1180     evt_q_length = smm->configured_event_queue_length;
1181
1182   if (smm->evt_qs_use_memfd_seg)
1183     {
1184       if (smm->evt_qs_segment_size)
1185         eqs_size = smm->evt_qs_segment_size;
1186
1187       eqs->ssvm_size = eqs_size;
1188       eqs->i_am_master = 1;
1189       eqs->my_pid = vpp_pid;
1190       eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1191       eqs->requested_va = smm->session_baseva;
1192
1193       if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
1194         {
1195           clib_warning ("failed to initialize queue segment");
1196           return;
1197         }
1198     }
1199
1200   if (smm->evt_qs_use_memfd_seg)
1201     oldheap = ssvm_push_heap (eqs->sh);
1202   else
1203     oldheap = svm_push_data_heap (am->vlib_rp);
1204
1205   for (i = 0; i < vec_len (smm->wrk); i++)
1206     {
1207       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1208       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1209         {evt_q_length, evt_size, 0}
1210         ,
1211         {evt_q_length >> 1, 256, 0}
1212       };
1213       cfg->consumer_pid = 0;
1214       cfg->n_rings = 2;
1215       cfg->q_nitems = evt_q_length;
1216       cfg->ring_cfgs = rc;
1217       smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1218       if (smm->evt_qs_use_memfd_seg)
1219         {
1220           if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
1221             clib_warning ("eventfd returned");
1222         }
1223     }
1224
1225   if (smm->evt_qs_use_memfd_seg)
1226     ssvm_pop_heap (oldheap);
1227   else
1228     svm_pop_heap (oldheap);
1229 }
1230
1231 ssvm_private_t *
1232 session_main_get_evt_q_segment (void)
1233 {
1234   session_main_t *smm = &session_main;
1235   if (smm->evt_qs_use_memfd_seg)
1236     return &smm->evt_qs_segment;
1237   return 0;
1238 }
1239
1240 u64
1241 session_segment_handle (session_t * s)
1242 {
1243   svm_fifo_t *f;
1244
1245   if (!s->rx_fifo)
1246     return SESSION_INVALID_HANDLE;
1247
1248   f = s->rx_fifo;
1249   return segment_manager_make_segment_handle (f->segment_manager,
1250                                               f->segment_index);
1251 }
1252
1253 /* *INDENT-OFF* */
1254 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1255     session_tx_fifo_peek_and_snd,
1256     session_tx_fifo_dequeue_and_snd,
1257     session_tx_fifo_dequeue_internal,
1258     session_tx_fifo_dequeue_and_snd
1259 };
1260 /* *INDENT-ON* */
1261
1262 /**
1263  * Initialize session layer for given transport proto and ip version
1264  *
1265  * Allocates per session type (transport proto + ip version) data structures
1266  * and adds arc from session queue node to session type output node.
1267  */
1268 void
1269 session_register_transport (transport_proto_t transport_proto,
1270                             const transport_proto_vft_t * vft, u8 is_ip4,
1271                             u32 output_node)
1272 {
1273   session_main_t *smm = &session_main;
1274   session_type_t session_type;
1275   u32 next_index = ~0;
1276
1277   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1278
1279   vec_validate (smm->session_type_to_next, session_type);
1280   vec_validate (smm->session_tx_fns, session_type);
1281
1282   /* *INDENT-OFF* */
1283   if (output_node != ~0)
1284     {
1285       foreach_vlib_main (({
1286           next_index = vlib_node_add_next (this_vlib_main,
1287                                            session_queue_node.index,
1288                                            output_node);
1289       }));
1290     }
1291   /* *INDENT-ON* */
1292
1293   smm->session_type_to_next[session_type] = next_index;
1294   smm->session_tx_fns[session_type] = session_tx_fns[vft->tx_type];
1295 }
1296
1297 transport_connection_t *
1298 session_get_transport (session_t * s)
1299 {
1300   if (s->session_state != SESSION_STATE_LISTENING)
1301     return transport_get_connection (session_get_transport_proto (s),
1302                                      s->connection_index, s->thread_index);
1303   else
1304     return transport_get_listener (session_get_transport_proto (s),
1305                                    s->connection_index);
1306 }
1307
1308 void
1309 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1310 {
1311   if (s->session_state != SESSION_STATE_LISTENING)
1312     return transport_get_endpoint (session_get_transport_proto (s),
1313                                    s->connection_index, s->thread_index, tep,
1314                                    is_lcl);
1315   else
1316     return transport_get_listener_endpoint (session_get_transport_proto (s),
1317                                             s->connection_index, tep, is_lcl);
1318 }
1319
1320 transport_connection_t *
1321 listen_session_get_transport (session_t * s)
1322 {
1323   return transport_get_listener (session_get_transport_proto (s),
1324                                  s->connection_index);
1325 }
1326
1327 void
1328 session_flush_frames_main_thread (vlib_main_t * vm)
1329 {
1330   ASSERT (vlib_get_thread_index () == 0);
1331   vlib_process_signal_event_mt (vm, session_queue_process_node.index,
1332                                 SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
1333 }
1334
1335 static clib_error_t *
1336 session_manager_main_enable (vlib_main_t * vm)
1337 {
1338   segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1339   session_main_t *smm = &session_main;
1340   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1341   u32 num_threads, preallocated_sessions_per_worker;
1342   session_worker_t *wrk;
1343   int i;
1344
1345   num_threads = 1 /* main thread */  + vtm->n_threads;
1346
1347   if (num_threads < 1)
1348     return clib_error_return (0, "n_thread_stacks not set");
1349
1350   /* Allocate cache line aligned worker contexts */
1351   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1352
1353   for (i = 0; i < num_threads; i++)
1354     {
1355       wrk = &smm->wrk[i];
1356       vec_validate (wrk->free_event_vector, 128);
1357       _vec_len (wrk->free_event_vector) = 0;
1358       vec_validate (wrk->pending_event_vector, 128);
1359       _vec_len (wrk->pending_event_vector) = 0;
1360       vec_validate (wrk->pending_disconnects, 128);
1361       _vec_len (wrk->pending_disconnects) = 0;
1362       vec_validate (wrk->postponed_event_vector, 128);
1363       _vec_len (wrk->postponed_event_vector) = 0;
1364
1365       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
1366       wrk->dispatch_period = 500e-6;
1367
1368       if (num_threads > 1)
1369         clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1370     }
1371
1372 #if SESSION_DEBUG
1373   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1374 #endif
1375
1376   /* Allocate vpp event queues segment and queue */
1377   session_vpp_event_queues_allocate (smm);
1378
1379   /* Initialize fifo segment main baseva and timeout */
1380   sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1381   sm_args->size = smm->session_va_space_size;
1382   segment_manager_main_init (sm_args);
1383
1384   /* Preallocate sessions */
1385   if (smm->preallocated_sessions)
1386     {
1387       if (num_threads == 1)
1388         {
1389           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1390         }
1391       else
1392         {
1393           int j;
1394           preallocated_sessions_per_worker =
1395             (1.1 * (f64) smm->preallocated_sessions /
1396              (f64) (num_threads - 1));
1397
1398           for (j = 1; j < num_threads; j++)
1399             {
1400               pool_init_fixed (smm->wrk[j].sessions,
1401                                preallocated_sessions_per_worker);
1402             }
1403         }
1404     }
1405
1406   session_lookup_init ();
1407   app_namespaces_init ();
1408   transport_init ();
1409
1410   smm->is_enabled = 1;
1411
1412   /* Enable transports */
1413   transport_enable_disable (vm, 1);
1414   transport_init_tx_pacers_period ();
1415   return 0;
1416 }
1417
1418 void
1419 session_node_enable_disable (u8 is_en)
1420 {
1421   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1422   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1423   u8 have_workers = vtm->n_threads != 0;
1424
1425   /* *INDENT-OFF* */
1426   foreach_vlib_main (({
1427     if (have_workers && ii == 0)
1428       {
1429         vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1430                              state);
1431         if (is_en)
1432           {
1433             vlib_node_t *n = vlib_get_node (this_vlib_main,
1434                                             session_queue_process_node.index);
1435             vlib_start_process (this_vlib_main, n->runtime_index);
1436           }
1437         else
1438           {
1439             vlib_process_signal_event_mt (this_vlib_main,
1440                                           session_queue_process_node.index,
1441                                           SESSION_Q_PROCESS_STOP, 0);
1442           }
1443
1444         continue;
1445       }
1446     vlib_node_set_state (this_vlib_main, session_queue_node.index,
1447                          state);
1448   }));
1449   /* *INDENT-ON* */
1450 }
1451
1452 clib_error_t *
1453 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1454 {
1455   clib_error_t *error = 0;
1456   if (is_en)
1457     {
1458       if (session_main.is_enabled)
1459         return 0;
1460
1461       session_node_enable_disable (is_en);
1462       error = session_manager_main_enable (vm);
1463     }
1464   else
1465     {
1466       session_main.is_enabled = 0;
1467       session_node_enable_disable (is_en);
1468     }
1469
1470   return error;
1471 }
1472
1473 clib_error_t *
1474 session_manager_main_init (vlib_main_t * vm)
1475 {
1476   session_main_t *smm = &session_main;
1477   smm->session_baseva = HIGH_SEGMENT_BASEVA;
1478 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1479   smm->session_va_space_size = 128ULL << 30;
1480   smm->evt_qs_segment_size = 64 << 20;
1481 #else
1482   smm->session_va_space_size = 128 << 20;
1483   smm->evt_qs_segment_size = 1 << 20;
1484 #endif
1485   smm->is_enabled = 0;
1486   return 0;
1487 }
1488
1489 VLIB_INIT_FUNCTION (session_manager_main_init);
1490
1491 static clib_error_t *
1492 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1493 {
1494   session_main_t *smm = &session_main;
1495   u32 nitems;
1496   uword tmp;
1497
1498   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1499     {
1500       if (unformat (input, "event-queue-length %d", &nitems))
1501         {
1502           if (nitems >= 2048)
1503             smm->configured_event_queue_length = nitems;
1504           else
1505             clib_warning ("event queue length %d too small, ignored", nitems);
1506         }
1507       else if (unformat (input, "preallocated-sessions %d",
1508                          &smm->preallocated_sessions))
1509         ;
1510       else if (unformat (input, "v4-session-table-buckets %d",
1511                          &smm->configured_v4_session_table_buckets))
1512         ;
1513       else if (unformat (input, "v4-halfopen-table-buckets %d",
1514                          &smm->configured_v4_halfopen_table_buckets))
1515         ;
1516       else if (unformat (input, "v6-session-table-buckets %d",
1517                          &smm->configured_v6_session_table_buckets))
1518         ;
1519       else if (unformat (input, "v6-halfopen-table-buckets %d",
1520                          &smm->configured_v6_halfopen_table_buckets))
1521         ;
1522       else if (unformat (input, "v4-session-table-memory %U",
1523                          unformat_memory_size, &tmp))
1524         {
1525           if (tmp >= 0x100000000)
1526             return clib_error_return (0, "memory size %llx (%lld) too large",
1527                                       tmp, tmp);
1528           smm->configured_v4_session_table_memory = tmp;
1529         }
1530       else if (unformat (input, "v4-halfopen-table-memory %U",
1531                          unformat_memory_size, &tmp))
1532         {
1533           if (tmp >= 0x100000000)
1534             return clib_error_return (0, "memory size %llx (%lld) too large",
1535                                       tmp, tmp);
1536           smm->configured_v4_halfopen_table_memory = tmp;
1537         }
1538       else if (unformat (input, "v6-session-table-memory %U",
1539                          unformat_memory_size, &tmp))
1540         {
1541           if (tmp >= 0x100000000)
1542             return clib_error_return (0, "memory size %llx (%lld) too large",
1543                                       tmp, tmp);
1544           smm->configured_v6_session_table_memory = tmp;
1545         }
1546       else if (unformat (input, "v6-halfopen-table-memory %U",
1547                          unformat_memory_size, &tmp))
1548         {
1549           if (tmp >= 0x100000000)
1550             return clib_error_return (0, "memory size %llx (%lld) too large",
1551                                       tmp, tmp);
1552           smm->configured_v6_halfopen_table_memory = tmp;
1553         }
1554       else if (unformat (input, "local-endpoints-table-memory %U",
1555                          unformat_memory_size, &tmp))
1556         {
1557           if (tmp >= 0x100000000)
1558             return clib_error_return (0, "memory size %llx (%lld) too large",
1559                                       tmp, tmp);
1560           smm->local_endpoints_table_memory = tmp;
1561         }
1562       else if (unformat (input, "local-endpoints-table-buckets %d",
1563                          &smm->local_endpoints_table_buckets))
1564         ;
1565       else if (unformat (input, "evt_qs_memfd_seg"))
1566         smm->evt_qs_use_memfd_seg = 1;
1567       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1568                          &smm->evt_qs_segment_size))
1569         ;
1570       else
1571         return clib_error_return (0, "unknown input `%U'",
1572                                   format_unformat_error, input);
1573     }
1574   return 0;
1575 }
1576
1577 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1578
1579 /*
1580  * fd.io coding-style-patch-verification: ON
1581  *
1582  * Local Variables:
1583  * eval: (c-set-style "gnu")
1584  * End:
1585  */