session: use listener_handle instead of listener_index
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25
26 session_main_t session_main;
27
28 static inline int
29 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
30                             session_evt_type_t evt_type)
31 {
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35   u32 tries = 0, max_tries;
36
37   mq = session_main_get_vpp_event_queue (thread_index);
38   while (svm_msg_q_try_lock (mq))
39     {
40       max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
41       if (tries++ == max_tries)
42         {
43           SESSION_DBG ("failed to enqueue evt");
44           return -1;
45         }
46     }
47   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
48     {
49       svm_msg_q_unlock (mq);
50       return -2;
51     }
52   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
53   if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
54     {
55       svm_msg_q_unlock (mq);
56       return -2;
57     }
58   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59   evt->event_type = evt_type;
60   switch (evt_type)
61     {
62     case SESSION_CTRL_EVT_RPC:
63       evt->rpc_args.fp = data;
64       evt->rpc_args.arg = args;
65       break;
66     case SESSION_IO_EVT_TX:
67     case SESSION_IO_EVT_TX_FLUSH:
68     case SESSION_IO_EVT_BUILTIN_RX:
69       evt->session_index = *(u32 *) data;
70       break;
71     case SESSION_IO_EVT_BUILTIN_TX:
72     case SESSION_CTRL_EVT_CLOSE:
73       evt->session_handle = session_handle ((session_t *) data);
74       break;
75     default:
76       clib_warning ("evt unhandled!");
77       svm_msg_q_unlock (mq);
78       return -1;
79     }
80
81   svm_msg_q_add_and_unlock (mq, &msg);
82   return 0;
83 }
84
85 int
86 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
87 {
88   return session_send_evt_to_thread (&f->master_session_index, 0,
89                                      f->master_thread_index, evt_type);
90 }
91
92 int
93 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
94                                       session_evt_type_t evt_type)
95 {
96   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
97 }
98
99 int
100 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
101 {
102   /* only event supported for now is disconnect */
103   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE);
104   return session_send_evt_to_thread (s, 0, s->thread_index,
105                                      SESSION_CTRL_EVT_CLOSE);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 static void
129 session_program_transport_close (session_t * s)
130 {
131   u32 thread_index = vlib_get_thread_index ();
132   session_worker_t *wrk;
133   session_event_t *evt;
134
135   /* If we are in the handler thread, or being called with the worker barrier
136    * held, just append a new event to pending disconnects vector. */
137   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
138     {
139       wrk = session_main_get_worker (s->thread_index);
140       vec_add2 (wrk->pending_disconnects, evt, 1);
141       clib_memset (evt, 0, sizeof (*evt));
142       evt->session_handle = session_handle (s);
143       evt->event_type = SESSION_CTRL_EVT_CLOSE;
144     }
145   else
146     session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE);
147 }
148
149 session_t *
150 session_alloc (u32 thread_index)
151 {
152   session_worker_t *wrk = &session_main.wrk[thread_index];
153   session_t *s;
154   u8 will_expand = 0;
155   pool_get_aligned_will_expand (wrk->sessions, will_expand,
156                                 CLIB_CACHE_LINE_BYTES);
157   /* If we have peekers, let them finish */
158   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
159     {
160       clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
161       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
162       clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
163     }
164   else
165     {
166       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
167     }
168   clib_memset (s, 0, sizeof (*s));
169   s->session_index = s - wrk->sessions;
170   s->thread_index = thread_index;
171   s->app_index = APP_INVALID_INDEX;
172   return s;
173 }
174
175 void
176 session_free (session_t * s)
177 {
178   if (CLIB_DEBUG)
179     {
180       u8 thread_index = s->thread_index;
181       clib_memset (s, 0xFA, sizeof (*s));
182       pool_put (session_main.wrk[thread_index].sessions, s);
183       return;
184     }
185   SESSION_EVT_DBG (SESSION_EVT_FREE, s);
186   pool_put (session_main.wrk[s->thread_index].sessions, s);
187 }
188
189 void
190 session_free_w_fifos (session_t * s)
191 {
192   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
193   session_free (s);
194 }
195
196 /**
197  * Cleans up session and lookup table.
198  *
199  * Transport connection must still be valid.
200  */
201 static void
202 session_delete (session_t * s)
203 {
204   int rv;
205
206   /* Delete from the main lookup table. */
207   if ((rv = session_lookup_del_session (s)))
208     clib_warning ("hash delete error, rv %d", rv);
209
210   session_free_w_fifos (s);
211 }
212
213 static session_t *
214 session_alloc_for_connection (transport_connection_t * tc)
215 {
216   session_t *s;
217   u32 thread_index = tc->thread_index;
218
219   ASSERT (thread_index == vlib_get_thread_index ()
220           || transport_protocol_is_cl (tc->proto));
221
222   s = session_alloc (thread_index);
223   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
224   s->session_state = SESSION_STATE_CLOSED;
225
226   /* Attach transport to session and vice versa */
227   s->connection_index = tc->c_index;
228   tc->s_index = s->session_index;
229   return s;
230 }
231
232 /**
233  * Discards bytes from buffer chain
234  *
235  * It discards n_bytes_to_drop starting at first buffer after chain_b
236  */
237 always_inline void
238 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
239                                      vlib_buffer_t ** chain_b,
240                                      u32 n_bytes_to_drop)
241 {
242   vlib_buffer_t *next = *chain_b;
243   u32 to_drop = n_bytes_to_drop;
244   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
245   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
246     {
247       next = vlib_get_buffer (vm, next->next_buffer);
248       if (next->current_length > to_drop)
249         {
250           vlib_buffer_advance (next, to_drop);
251           to_drop = 0;
252         }
253       else
254         {
255           to_drop -= next->current_length;
256           next->current_length = 0;
257         }
258     }
259   *chain_b = next;
260
261   if (to_drop == 0)
262     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
263 }
264
265 /**
266  * Enqueue buffer chain tail
267  */
268 always_inline int
269 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
270                             u32 offset, u8 is_in_order)
271 {
272   vlib_buffer_t *chain_b;
273   u32 chain_bi, len, diff;
274   vlib_main_t *vm = vlib_get_main ();
275   u8 *data;
276   u32 written = 0;
277   int rv = 0;
278
279   if (is_in_order && offset)
280     {
281       diff = offset - b->current_length;
282       if (diff > b->total_length_not_including_first_buffer)
283         return 0;
284       chain_b = b;
285       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
286       chain_bi = vlib_get_buffer_index (vm, chain_b);
287     }
288   else
289     chain_bi = b->next_buffer;
290
291   do
292     {
293       chain_b = vlib_get_buffer (vm, chain_bi);
294       data = vlib_buffer_get_current (chain_b);
295       len = chain_b->current_length;
296       if (!len)
297         continue;
298       if (is_in_order)
299         {
300           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
301           if (rv == len)
302             {
303               written += rv;
304             }
305           else if (rv < len)
306             {
307               return (rv > 0) ? (written + rv) : written;
308             }
309           else if (rv > len)
310             {
311               written += rv;
312
313               /* written more than what was left in chain */
314               if (written > b->total_length_not_including_first_buffer)
315                 return written;
316
317               /* drop the bytes that have already been delivered */
318               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
319             }
320         }
321       else
322         {
323           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
324           if (rv)
325             {
326               clib_warning ("failed to enqueue multi-buffer seg");
327               return -1;
328             }
329           offset += len;
330         }
331     }
332   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
333           ? chain_b->next_buffer : 0));
334
335   if (is_in_order)
336     return written;
337
338   return 0;
339 }
340
341 /*
342  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
343  * event but on request can queue notification events for later delivery by
344  * calling stream_server_flush_enqueue_events().
345  *
346  * @param tc Transport connection which is to be enqueued data
347  * @param b Buffer to be enqueued
348  * @param offset Offset at which to start enqueueing if out-of-order
349  * @param queue_event Flag to indicate if peer is to be notified or if event
350  *                    is to be queued. The former is useful when more data is
351  *                    enqueued and only one event is to be generated.
352  * @param is_in_order Flag to indicate if data is in order
353  * @return Number of bytes enqueued or a negative value if enqueueing failed.
354  */
355 int
356 session_enqueue_stream_connection (transport_connection_t * tc,
357                                    vlib_buffer_t * b, u32 offset,
358                                    u8 queue_event, u8 is_in_order)
359 {
360   session_t *s;
361   int enqueued = 0, rv, in_order_off;
362
363   s = session_get (tc->s_index, tc->thread_index);
364
365   if (is_in_order)
366     {
367       enqueued = svm_fifo_enqueue (s->rx_fifo,
368                                    b->current_length,
369                                    vlib_buffer_get_current (b));
370       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
371                          && enqueued >= 0))
372         {
373           in_order_off = enqueued > b->current_length ? enqueued : 0;
374           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
375           if (rv > 0)
376             enqueued += rv;
377         }
378     }
379   else
380     {
381       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
382                                          b->current_length,
383                                          vlib_buffer_get_current (b));
384       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
385         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
386       /* if something was enqueued, report even this as success for ooo
387        * segment handling */
388       return rv;
389     }
390
391   if (queue_event)
392     {
393       /* Queue RX event on this fifo. Eventually these will need to be flushed
394        * by calling stream_server_flush_enqueue_events () */
395       session_worker_t *wrk;
396
397       wrk = session_main_get_worker (s->thread_index);
398       if (!(s->flags & SESSION_F_RX_EVT))
399         {
400           s->flags |= SESSION_F_RX_EVT;
401           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
402         }
403     }
404
405   return enqueued;
406 }
407
408 int
409 session_enqueue_dgram_connection (session_t * s,
410                                   session_dgram_hdr_t * hdr,
411                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
412 {
413   int enqueued = 0, rv, in_order_off;
414
415   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
416           >= b->current_length + sizeof (*hdr));
417
418   svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
419   enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
420                                vlib_buffer_get_current (b));
421   if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
422     {
423       in_order_off = enqueued > b->current_length ? enqueued : 0;
424       rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
425       if (rv > 0)
426         enqueued += rv;
427     }
428   if (queue_event)
429     {
430       /* Queue RX event on this fifo. Eventually these will need to be flushed
431        * by calling stream_server_flush_enqueue_events () */
432       session_worker_t *wrk;
433
434       wrk = session_main_get_worker (s->thread_index);
435       if (!(s->flags & SESSION_F_RX_EVT))
436         {
437           s->flags |= SESSION_F_RX_EVT;
438           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
439         }
440     }
441   return enqueued;
442 }
443
444 int
445 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
446                             u32 offset, u32 max_bytes)
447 {
448   session_t *s = session_get (tc->s_index, tc->thread_index);
449   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
450 }
451
452 u32
453 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
454 {
455   session_t *s = session_get (tc->s_index, tc->thread_index);
456
457   if (svm_fifo_needs_tx_ntf (s->tx_fifo, max_bytes))
458     session_dequeue_notify (s);
459
460   return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
461 }
462
463 static inline int
464 session_notify_subscribers (u32 app_index, session_t * s,
465                             svm_fifo_t * f, session_evt_type_t evt_type)
466 {
467   app_worker_t *app_wrk;
468   application_t *app;
469   int i;
470
471   app = application_get (app_index);
472   if (!app)
473     return -1;
474
475   for (i = 0; i < f->n_subscribers; i++)
476     {
477       app_wrk = application_get_worker (app, f->subscribers[i]);
478       if (!app_wrk)
479         continue;
480       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
481         return -1;
482     }
483
484   return 0;
485 }
486
487 /**
488  * Notify session peer that new data has been enqueued.
489  *
490  * @param s     Stream session for which the event is to be generated.
491  * @param lock  Flag to indicate if call should lock message queue.
492  *
493  * @return 0 on success or negative number if failed to send notification.
494  */
495 static inline int
496 session_enqueue_notify_inline (session_t * s)
497 {
498   app_worker_t *app_wrk;
499   u32 session_index;
500   u8 n_subscribers;
501
502   session_index = s->session_index;
503   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
504
505   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
506   if (PREDICT_FALSE (!app_wrk))
507     {
508       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
509       return 0;
510     }
511
512   /* *INDENT-OFF* */
513   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
514       ed->data[0] = SESSION_IO_EVT_RX;
515       ed->data[1] = svm_fifo_max_dequeue_prod (s->rx_fifo);
516   }));
517   /* *INDENT-ON* */
518
519   s->flags &= ~SESSION_F_RX_EVT;
520   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
521                                                      SESSION_IO_EVT_RX)))
522     return -1;
523
524   if (PREDICT_FALSE (n_subscribers))
525     {
526       s = session_get (session_index, vlib_get_thread_index ());
527       return session_notify_subscribers (app_wrk->app_index, s,
528                                          s->rx_fifo, SESSION_IO_EVT_RX);
529     }
530
531   return 0;
532 }
533
534 int
535 session_enqueue_notify (session_t * s)
536 {
537   return session_enqueue_notify_inline (s);
538 }
539
540 int
541 session_dequeue_notify (session_t * s)
542 {
543   app_worker_t *app_wrk;
544
545   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
546   if (PREDICT_FALSE (!app_wrk))
547     return -1;
548
549   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
550                                                      SESSION_IO_EVT_TX)))
551     return -1;
552
553   if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
554     return session_notify_subscribers (app_wrk->app_index, s,
555                                        s->tx_fifo, SESSION_IO_EVT_TX);
556
557   svm_fifo_clear_tx_ntf (s->tx_fifo);
558
559   return 0;
560 }
561
562 /**
563  * Flushes queue of sessions that are to be notified of new data
564  * enqueued events.
565  *
566  * @param thread_index Thread index for which the flush is to be performed.
567  * @return 0 on success or a positive number indicating the number of
568  *         failures due to API queue being full.
569  */
570 int
571 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
572 {
573   session_worker_t *wrk = session_main_get_worker (thread_index);
574   session_t *s;
575   int i, errors = 0;
576   u32 *indices;
577
578   indices = wrk->session_to_enqueue[transport_proto];
579
580   for (i = 0; i < vec_len (indices); i++)
581     {
582       s = session_get_if_valid (indices[i], thread_index);
583       if (PREDICT_FALSE (!s))
584         {
585           errors++;
586           continue;
587         }
588
589       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
590         errors++;
591     }
592
593   vec_reset_length (indices);
594   wrk->session_to_enqueue[transport_proto] = indices;
595
596   return errors;
597 }
598
599 int
600 session_main_flush_all_enqueue_events (u8 transport_proto)
601 {
602   vlib_thread_main_t *vtm = vlib_get_thread_main ();
603   int i, errors = 0;
604   for (i = 0; i < 1 + vtm->n_threads; i++)
605     errors += session_main_flush_enqueue_events (transport_proto, i);
606   return errors;
607 }
608
609 int
610 session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
611 {
612   u32 opaque = 0, new_ti, new_si;
613   app_worker_t *app_wrk;
614   session_t *s = 0;
615   u64 ho_handle;
616
617   /*
618    * Find connection handle and cleanup half-open table
619    */
620   ho_handle = session_lookup_half_open_handle (tc);
621   if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
622     {
623       SESSION_DBG ("half-open was removed!");
624       return -1;
625     }
626   session_lookup_del_half_open (tc);
627
628   /* Get the app's index from the handle we stored when opening connection
629    * and the opaque (api_context for external apps) from transport session
630    * index */
631   app_wrk = app_worker_get_if_valid (ho_handle >> 32);
632   if (!app_wrk)
633     return -1;
634
635   opaque = tc->s_index;
636
637   if (is_fail)
638     return app_worker_connect_notify (app_wrk, s, opaque);
639
640   s = session_alloc_for_connection (tc);
641   s->session_state = SESSION_STATE_CONNECTING;
642   s->app_wrk_index = app_wrk->wrk_index;
643   new_si = s->session_index;
644   new_ti = s->thread_index;
645
646   if (app_worker_init_connected (app_wrk, s))
647     {
648       session_free (s);
649       app_worker_connect_notify (app_wrk, 0, opaque);
650       return -1;
651     }
652
653   if (app_worker_connect_notify (app_wrk, s, opaque))
654     {
655       s = session_get (new_si, new_ti);
656       session_free_w_fifos (s);
657       return -1;
658     }
659
660   s = session_get (new_si, new_ti);
661   s->session_state = SESSION_STATE_READY;
662   session_lookup_add_connection (tc, session_handle (s));
663
664   return 0;
665 }
666
667 typedef struct _session_switch_pool_args
668 {
669   u32 session_index;
670   u32 thread_index;
671   u32 new_thread_index;
672   u32 new_session_index;
673 } session_switch_pool_args_t;
674
675 static void
676 session_switch_pool (void *cb_args)
677 {
678   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
679   session_t *s;
680   ASSERT (args->thread_index == vlib_get_thread_index ());
681   s = session_get (args->session_index, args->thread_index);
682   s->tx_fifo->master_session_index = args->new_session_index;
683   s->tx_fifo->master_thread_index = args->new_thread_index;
684   transport_cleanup (session_get_transport_proto (s), s->connection_index,
685                      s->thread_index);
686   session_free (s);
687   clib_mem_free (cb_args);
688 }
689
690 /**
691  * Move dgram session to the right thread
692  */
693 int
694 session_dgram_connect_notify (transport_connection_t * tc,
695                               u32 old_thread_index, session_t ** new_session)
696 {
697   session_t *new_s;
698   session_switch_pool_args_t *rpc_args;
699
700   /*
701    * Clone half-open session to the right thread.
702    */
703   new_s = session_clone_safe (tc->s_index, old_thread_index);
704   new_s->connection_index = tc->c_index;
705   new_s->rx_fifo->master_session_index = new_s->session_index;
706   new_s->rx_fifo->master_thread_index = new_s->thread_index;
707   new_s->session_state = SESSION_STATE_READY;
708   session_lookup_add_connection (tc, session_handle (new_s));
709
710   /*
711    * Ask thread owning the old session to clean it up and make us the tx
712    * fifo owner
713    */
714   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
715   rpc_args->new_session_index = new_s->session_index;
716   rpc_args->new_thread_index = new_s->thread_index;
717   rpc_args->session_index = tc->s_index;
718   rpc_args->thread_index = old_thread_index;
719   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
720                                   rpc_args);
721
722   tc->s_index = new_s->session_index;
723   new_s->connection_index = tc->c_index;
724   *new_session = new_s;
725   return 0;
726 }
727
728 /**
729  * Notification from transport that connection is being closed.
730  *
731  * A disconnect is sent to application but state is not removed. Once
732  * disconnect is acknowledged by application, session disconnect is called.
733  * Ultimately this leads to close being called on transport (passive close).
734  */
735 void
736 session_transport_closing_notify (transport_connection_t * tc)
737 {
738   app_worker_t *app_wrk;
739   session_t *s;
740
741   s = session_get (tc->s_index, tc->thread_index);
742   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
743     return;
744   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
745   app_wrk = app_worker_get (s->app_wrk_index);
746   app_worker_close_notify (app_wrk, s);
747 }
748
749 /**
750  * Notification from transport that connection is being deleted
751  *
752  * This removes the session if it is still valid. It should be called only on
753  * previously fully established sessions. For instance failed connects should
754  * call stream_session_connect_notify and indicate that the connect has
755  * failed.
756  */
757 void
758 session_transport_delete_notify (transport_connection_t * tc)
759 {
760   session_t *s;
761
762   /* App might've been removed already */
763   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
764     return;
765
766   /* Make sure we don't try to send anything more */
767   svm_fifo_dequeue_drop_all (s->tx_fifo);
768
769   switch (s->session_state)
770     {
771     case SESSION_STATE_CREATED:
772       /* Session was created but accept notification was not yet sent to the
773        * app. Cleanup everything. */
774       session_lookup_del_session (s);
775       session_free_w_fifos (s);
776       break;
777     case SESSION_STATE_ACCEPTING:
778     case SESSION_STATE_TRANSPORT_CLOSING:
779       /* If transport finishes or times out before we get a reply
780        * from the app, mark transport as closed and wait for reply
781        * before removing the session. Cleanup session table in advance
782        * because transport will soon be closed and closed sessions
783        * are assumed to have been removed from the lookup table */
784       session_lookup_del_session (s);
785       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
786       break;
787     case SESSION_STATE_CLOSING:
788     case SESSION_STATE_CLOSED_WAITING:
789       /* Cleanup lookup table as transport needs to still be valid.
790        * Program transport close to ensure that all session events
791        * have been cleaned up. Once transport close is called, the
792        * session is just removed because both transport and app have
793        * confirmed the close*/
794       session_lookup_del_session (s);
795       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
796       session_program_transport_close (s);
797       break;
798     case SESSION_STATE_TRANSPORT_CLOSED:
799       break;
800     case SESSION_STATE_CLOSED:
801       session_delete (s);
802       break;
803     default:
804       clib_warning ("session state %u", s->session_state);
805       session_delete (s);
806       break;
807     }
808 }
809
810 /**
811  * Notification from transport that session can be closed
812  *
813  * Should be called by transport only if it was closed with non-empty
814  * tx fifo and once it decides to begin the closing procedure prior to
815  * issuing a delete notify. This gives the chance to the session layer
816  * to cleanup any outstanding events.
817  */
818 void
819 session_transport_closed_notify (transport_connection_t * tc)
820 {
821   session_t *s;
822
823   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
824     return;
825
826   /* Transport thinks that app requested close but it actually didn't.
827    * Can happen for tcp if fin and rst are received in close succession. */
828   if (s->session_state == SESSION_STATE_READY)
829     {
830       session_transport_closing_notify (tc);
831       svm_fifo_dequeue_drop_all (s->tx_fifo);
832     }
833   /* If app close has not been received or has not yet resulted in
834    * a transport close, only mark the session transport as closed */
835   else if (s->session_state <= SESSION_STATE_CLOSING)
836     {
837       session_lookup_del_session (s);
838       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
839     }
840   else
841     s->session_state = SESSION_STATE_CLOSED;
842 }
843
844 /**
845  * Notify application that connection has been reset.
846  */
847 void
848 session_transport_reset_notify (transport_connection_t * tc)
849 {
850   app_worker_t *app_wrk;
851   session_t *s;
852
853   s = session_get (tc->s_index, tc->thread_index);
854   svm_fifo_dequeue_drop_all (s->tx_fifo);
855   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
856     return;
857   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
858   app_wrk = app_worker_get (s->app_wrk_index);
859   app_worker_reset_notify (app_wrk, s);
860 }
861
862 int
863 session_stream_accept_notify (transport_connection_t * tc)
864 {
865   app_worker_t *app_wrk;
866   session_t *s;
867
868   s = session_get (tc->s_index, tc->thread_index);
869   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
870   if (!app_wrk)
871     return -1;
872   s->session_state = SESSION_STATE_ACCEPTING;
873   return app_worker_accept_notify (app_wrk, s);
874 }
875
876 /**
877  * Accept a stream session. Optionally ping the server by callback.
878  */
879 int
880 session_stream_accept (transport_connection_t * tc, u32 listener_index,
881                        u32 thread_index, u8 notify)
882 {
883   session_t *s;
884   int rv;
885
886   s = session_alloc_for_connection (tc);
887   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
888   s->session_state = SESSION_STATE_CREATED;
889
890   if ((rv = app_worker_init_accepted (s)))
891     return rv;
892
893   session_lookup_add_connection (tc, session_handle (s));
894
895   /* Shoulder-tap the server */
896   if (notify)
897     {
898       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
899       return app_worker_accept_notify (app_wrk, s);
900     }
901
902   return 0;
903 }
904
905 int
906 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
907 {
908   transport_connection_t *tc;
909   transport_endpoint_cfg_t *tep;
910   app_worker_t *app_wrk;
911   session_handle_t sh;
912   session_t *s;
913   int rv;
914
915   tep = session_endpoint_to_transport_cfg (rmt);
916   rv = transport_connect (rmt->transport_proto, tep);
917   if (rv < 0)
918     {
919       SESSION_DBG ("Transport failed to open connection.");
920       return VNET_API_ERROR_SESSION_CONNECT;
921     }
922
923   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
924
925   /* For dgram type of service, allocate session and fifos now */
926   app_wrk = app_worker_get (app_wrk_index);
927   s = session_alloc_for_connection (tc);
928   s->app_wrk_index = app_wrk->wrk_index;
929   s->session_state = SESSION_STATE_OPENED;
930   if (app_worker_init_connected (app_wrk, s))
931     {
932       session_free (s);
933       return -1;
934     }
935
936   sh = session_handle (s);
937   session_lookup_add_connection (tc, sh);
938
939   return app_worker_connect_notify (app_wrk, s, opaque);
940 }
941
942 int
943 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
944 {
945   transport_connection_t *tc;
946   transport_endpoint_cfg_t *tep;
947   u64 handle;
948   int rv;
949
950   tep = session_endpoint_to_transport_cfg (rmt);
951   rv = transport_connect (rmt->transport_proto, tep);
952   if (rv < 0)
953     {
954       SESSION_DBG ("Transport failed to open connection.");
955       return VNET_API_ERROR_SESSION_CONNECT;
956     }
957
958   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
959
960   /* If transport offers a stream service, only allocate session once the
961    * connection has been established.
962    * Add connection to half-open table and save app and tc index. The
963    * latter is needed to help establish the connection while the former
964    * is needed when the connect notify comes and we have to notify the
965    * external app
966    */
967   handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
968   session_lookup_add_half_open (tc, handle);
969
970   /* Store api_context (opaque) for when the reply comes. Not the nicest
971    * thing but better than allocating a separate half-open pool.
972    */
973   tc->s_index = opaque;
974   return 0;
975 }
976
977 int
978 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
979 {
980   session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
981   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
982
983   sep->app_wrk_index = app_wrk_index;
984   sep->opaque = opaque;
985
986   return transport_connect (rmt->transport_proto, tep_cfg);
987 }
988
989 typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
990
991 /* *INDENT-OFF* */
992 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
993   session_open_vc,
994   session_open_cl,
995   session_open_app,
996 };
997 /* *INDENT-ON* */
998
999 /**
1000  * Ask transport to open connection to remote transport endpoint.
1001  *
1002  * Stores handle for matching request with reply since the call can be
1003  * asynchronous. For instance, for TCP the 3-way handshake must complete
1004  * before reply comes. Session is only created once connection is established.
1005  *
1006  * @param app_index Index of the application requesting the connect
1007  * @param st Session type requested.
1008  * @param tep Remote transport endpoint
1009  * @param opaque Opaque data (typically, api_context) the application expects
1010  *               on open completion.
1011  */
1012 int
1013 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1014 {
1015   transport_service_type_t tst;
1016   tst = transport_protocol_service_type (rmt->transport_proto);
1017   return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1018 }
1019
1020 /**
1021  * Ask transport to listen on session endpoint.
1022  *
1023  * @param s Session for which listen will be called. Note that unlike
1024  *          established sessions, listen sessions are not associated to a
1025  *          thread.
1026  * @param sep Local endpoint to be listened on.
1027  */
1028 int
1029 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1030 {
1031   transport_endpoint_t *tep;
1032   u32 tc_index, s_index;
1033
1034   /* Transport bind/listen */
1035   tep = session_endpoint_to_transport (sep);
1036   s_index = ls->session_index;
1037   tc_index = transport_start_listen (session_get_transport_proto (ls),
1038                                      s_index, tep);
1039
1040   if (tc_index == (u32) ~ 0)
1041     return -1;
1042
1043   /* Attach transport to session. Lookup tables are populated by the app
1044    * worker because local tables (for ct sessions) are not backed by a fib */
1045   ls = listen_session_get (s_index);
1046   ls->connection_index = tc_index;
1047
1048   return 0;
1049 }
1050
1051 /**
1052  * Ask transport to stop listening on local transport endpoint.
1053  *
1054  * @param s Session to stop listening on. It must be in state LISTENING.
1055  */
1056 int
1057 session_stop_listen (session_t * s)
1058 {
1059   transport_proto_t tp = session_get_transport_proto (s);
1060   transport_connection_t *tc;
1061
1062   if (s->session_state != SESSION_STATE_LISTENING)
1063     return -1;
1064
1065   tc = transport_get_listener (tp, s->connection_index);
1066   if (!tc)
1067     return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1068
1069   session_lookup_del_connection (tc);
1070   transport_stop_listen (tp, s->connection_index);
1071   return 0;
1072 }
1073
1074 /**
1075  * Initialize session closing procedure.
1076  *
1077  * Request is always sent to session node to ensure that all outstanding
1078  * requests are served before transport is notified.
1079  */
1080 void
1081 session_close (session_t * s)
1082 {
1083   if (!s)
1084     return;
1085
1086   if (s->session_state >= SESSION_STATE_CLOSING)
1087     {
1088       /* Session will only be removed once both app and transport
1089        * acknowledge the close */
1090       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1091         session_program_transport_close (s);
1092
1093       /* Session already closed. Clear the tx fifo */
1094       if (s->session_state == SESSION_STATE_CLOSED)
1095         svm_fifo_dequeue_drop_all (s->tx_fifo);
1096       return;
1097     }
1098
1099   s->session_state = SESSION_STATE_CLOSING;
1100   session_program_transport_close (s);
1101 }
1102
1103 /**
1104  * Notify transport the session can be disconnected. This should eventually
1105  * result in a delete notification that allows us to cleanup session state.
1106  * Called for both active/passive disconnects.
1107  *
1108  * Must be called from the session's thread.
1109  */
1110 void
1111 session_transport_close (session_t * s)
1112 {
1113   /* If transport is already closed, just free the session */
1114   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1115     {
1116       session_free_w_fifos (s);
1117       return;
1118     }
1119
1120   /* If tx queue wasn't drained, change state to closed waiting for transport.
1121    * This way, the transport, if it so wishes, can continue to try sending the
1122    * outstanding data (in closed state it cannot). It MUST however at one
1123    * point, either after sending everything or after a timeout, call delete
1124    * notify. This will finally lead to the complete cleanup of the session.
1125    */
1126   if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1127     s->session_state = SESSION_STATE_CLOSED_WAITING;
1128   else
1129     s->session_state = SESSION_STATE_CLOSED;
1130
1131   transport_close (session_get_transport_proto (s), s->connection_index,
1132                    s->thread_index);
1133 }
1134
1135 /**
1136  * Cleanup transport and session state.
1137  *
1138  * Notify transport of the cleanup and free the session. This should
1139  * be called only if transport reported some error and is already
1140  * closed.
1141  */
1142 void
1143 session_transport_cleanup (session_t * s)
1144 {
1145   s->session_state = SESSION_STATE_CLOSED;
1146
1147   /* Delete from main lookup table before we axe the the transport */
1148   session_lookup_del_session (s);
1149   transport_cleanup (session_get_transport_proto (s), s->connection_index,
1150                      s->thread_index);
1151   /* Since we called cleanup, no delete notification will come. So, make
1152    * sure the session is properly freed. */
1153   session_free_w_fifos (s);
1154 }
1155
1156 /**
1157  * Allocate event queues in the shared-memory segment
1158  *
1159  * That can either be a newly created memfd segment, that will need to be
1160  * mapped by all stack users, or the binary api's svm region. The latter is
1161  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1162  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1163  * vpp uses api svm region for event queues.
1164  */
1165 void
1166 session_vpp_event_queues_allocate (session_main_t * smm)
1167 {
1168   u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1169   ssvm_private_t *eqs = &smm->evt_qs_segment;
1170   api_main_t *am = &api_main;
1171   uword eqs_size = 64 << 20;
1172   pid_t vpp_pid = getpid ();
1173   void *oldheap;
1174   int i;
1175
1176   if (smm->configured_event_queue_length)
1177     evt_q_length = smm->configured_event_queue_length;
1178
1179   if (smm->evt_qs_use_memfd_seg)
1180     {
1181       if (smm->evt_qs_segment_size)
1182         eqs_size = smm->evt_qs_segment_size;
1183
1184       eqs->ssvm_size = eqs_size;
1185       eqs->i_am_master = 1;
1186       eqs->my_pid = vpp_pid;
1187       eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1188       eqs->requested_va = smm->session_baseva;
1189
1190       if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
1191         {
1192           clib_warning ("failed to initialize queue segment");
1193           return;
1194         }
1195     }
1196
1197   if (smm->evt_qs_use_memfd_seg)
1198     oldheap = ssvm_push_heap (eqs->sh);
1199   else
1200     oldheap = svm_push_data_heap (am->vlib_rp);
1201
1202   for (i = 0; i < vec_len (smm->wrk); i++)
1203     {
1204       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1205       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1206         {evt_q_length, evt_size, 0}
1207         ,
1208         {evt_q_length >> 1, 256, 0}
1209       };
1210       cfg->consumer_pid = 0;
1211       cfg->n_rings = 2;
1212       cfg->q_nitems = evt_q_length;
1213       cfg->ring_cfgs = rc;
1214       smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1215       if (smm->evt_qs_use_memfd_seg)
1216         {
1217           if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
1218             clib_warning ("eventfd returned");
1219         }
1220     }
1221
1222   if (smm->evt_qs_use_memfd_seg)
1223     ssvm_pop_heap (oldheap);
1224   else
1225     svm_pop_heap (oldheap);
1226 }
1227
1228 ssvm_private_t *
1229 session_main_get_evt_q_segment (void)
1230 {
1231   session_main_t *smm = &session_main;
1232   if (smm->evt_qs_use_memfd_seg)
1233     return &smm->evt_qs_segment;
1234   return 0;
1235 }
1236
1237 u64
1238 session_segment_handle (session_t * s)
1239 {
1240   svm_fifo_t *f;
1241
1242   if (!s->rx_fifo)
1243     return SESSION_INVALID_HANDLE;
1244
1245   f = s->rx_fifo;
1246   return segment_manager_make_segment_handle (f->segment_manager,
1247                                               f->segment_index);
1248 }
1249
1250 /* *INDENT-OFF* */
1251 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1252     session_tx_fifo_peek_and_snd,
1253     session_tx_fifo_dequeue_and_snd,
1254     session_tx_fifo_dequeue_internal,
1255     session_tx_fifo_dequeue_and_snd
1256 };
1257 /* *INDENT-ON* */
1258
1259 /**
1260  * Initialize session layer for given transport proto and ip version
1261  *
1262  * Allocates per session type (transport proto + ip version) data structures
1263  * and adds arc from session queue node to session type output node.
1264  */
1265 void
1266 session_register_transport (transport_proto_t transport_proto,
1267                             const transport_proto_vft_t * vft, u8 is_ip4,
1268                             u32 output_node)
1269 {
1270   session_main_t *smm = &session_main;
1271   session_type_t session_type;
1272   u32 next_index = ~0;
1273
1274   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1275
1276   vec_validate (smm->session_type_to_next, session_type);
1277   vec_validate (smm->session_tx_fns, session_type);
1278
1279   /* *INDENT-OFF* */
1280   if (output_node != ~0)
1281     {
1282       foreach_vlib_main (({
1283           next_index = vlib_node_add_next (this_vlib_main,
1284                                            session_queue_node.index,
1285                                            output_node);
1286       }));
1287     }
1288   /* *INDENT-ON* */
1289
1290   smm->session_type_to_next[session_type] = next_index;
1291   smm->session_tx_fns[session_type] = session_tx_fns[vft->tx_type];
1292 }
1293
1294 transport_connection_t *
1295 session_get_transport (session_t * s)
1296 {
1297   if (s->session_state != SESSION_STATE_LISTENING)
1298     return transport_get_connection (session_get_transport_proto (s),
1299                                      s->connection_index, s->thread_index);
1300   else
1301     return transport_get_listener (session_get_transport_proto (s),
1302                                    s->connection_index);
1303 }
1304
1305 void
1306 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1307 {
1308   if (s->session_state != SESSION_STATE_LISTENING)
1309     return transport_get_endpoint (session_get_transport_proto (s),
1310                                    s->connection_index, s->thread_index, tep,
1311                                    is_lcl);
1312   else
1313     return transport_get_listener_endpoint (session_get_transport_proto (s),
1314                                             s->connection_index, tep, is_lcl);
1315 }
1316
1317 transport_connection_t *
1318 listen_session_get_transport (session_t * s)
1319 {
1320   return transport_get_listener (session_get_transport_proto (s),
1321                                  s->connection_index);
1322 }
1323
1324 void
1325 session_flush_frames_main_thread (vlib_main_t * vm)
1326 {
1327   ASSERT (vlib_get_thread_index () == 0);
1328   vlib_process_signal_event_mt (vm, session_queue_process_node.index,
1329                                 SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
1330 }
1331
1332 static clib_error_t *
1333 session_manager_main_enable (vlib_main_t * vm)
1334 {
1335   segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1336   session_main_t *smm = &session_main;
1337   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1338   u32 num_threads, preallocated_sessions_per_worker;
1339   session_worker_t *wrk;
1340   int i;
1341
1342   num_threads = 1 /* main thread */  + vtm->n_threads;
1343
1344   if (num_threads < 1)
1345     return clib_error_return (0, "n_thread_stacks not set");
1346
1347   /* Allocate cache line aligned worker contexts */
1348   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1349
1350   for (i = 0; i < num_threads; i++)
1351     {
1352       wrk = &smm->wrk[i];
1353       vec_validate (wrk->free_event_vector, 128);
1354       _vec_len (wrk->free_event_vector) = 0;
1355       vec_validate (wrk->pending_event_vector, 128);
1356       _vec_len (wrk->pending_event_vector) = 0;
1357       vec_validate (wrk->pending_disconnects, 128);
1358       _vec_len (wrk->pending_disconnects) = 0;
1359       vec_validate (wrk->postponed_event_vector, 128);
1360       _vec_len (wrk->postponed_event_vector) = 0;
1361
1362       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
1363       wrk->dispatch_period = 500e-6;
1364
1365       if (num_threads > 1)
1366         clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1367     }
1368
1369 #if SESSION_DEBUG
1370   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1371 #endif
1372
1373   /* Allocate vpp event queues segment and queue */
1374   session_vpp_event_queues_allocate (smm);
1375
1376   /* Initialize fifo segment main baseva and timeout */
1377   sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1378   sm_args->size = smm->session_va_space_size;
1379   segment_manager_main_init (sm_args);
1380
1381   /* Preallocate sessions */
1382   if (smm->preallocated_sessions)
1383     {
1384       if (num_threads == 1)
1385         {
1386           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1387         }
1388       else
1389         {
1390           int j;
1391           preallocated_sessions_per_worker =
1392             (1.1 * (f64) smm->preallocated_sessions /
1393              (f64) (num_threads - 1));
1394
1395           for (j = 1; j < num_threads; j++)
1396             {
1397               pool_init_fixed (smm->wrk[j].sessions,
1398                                preallocated_sessions_per_worker);
1399             }
1400         }
1401     }
1402
1403   session_lookup_init ();
1404   app_namespaces_init ();
1405   transport_init ();
1406
1407   smm->is_enabled = 1;
1408
1409   /* Enable transports */
1410   transport_enable_disable (vm, 1);
1411   transport_init_tx_pacers_period ();
1412   return 0;
1413 }
1414
1415 void
1416 session_node_enable_disable (u8 is_en)
1417 {
1418   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1419   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1420   u8 have_workers = vtm->n_threads != 0;
1421
1422   /* *INDENT-OFF* */
1423   foreach_vlib_main (({
1424     if (have_workers && ii == 0)
1425       {
1426         vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1427                              state);
1428         if (is_en)
1429           {
1430             vlib_node_t *n = vlib_get_node (this_vlib_main,
1431                                             session_queue_process_node.index);
1432             vlib_start_process (this_vlib_main, n->runtime_index);
1433           }
1434         else
1435           {
1436             vlib_process_signal_event_mt (this_vlib_main,
1437                                           session_queue_process_node.index,
1438                                           SESSION_Q_PROCESS_STOP, 0);
1439           }
1440
1441         continue;
1442       }
1443     vlib_node_set_state (this_vlib_main, session_queue_node.index,
1444                          state);
1445   }));
1446   /* *INDENT-ON* */
1447 }
1448
1449 clib_error_t *
1450 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1451 {
1452   clib_error_t *error = 0;
1453   if (is_en)
1454     {
1455       if (session_main.is_enabled)
1456         return 0;
1457
1458       session_node_enable_disable (is_en);
1459       error = session_manager_main_enable (vm);
1460     }
1461   else
1462     {
1463       session_main.is_enabled = 0;
1464       session_node_enable_disable (is_en);
1465     }
1466
1467   return error;
1468 }
1469
1470 clib_error_t *
1471 session_manager_main_init (vlib_main_t * vm)
1472 {
1473   session_main_t *smm = &session_main;
1474   smm->session_baseva = HIGH_SEGMENT_BASEVA;
1475 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1476   smm->session_va_space_size = 128ULL << 30;
1477   smm->evt_qs_segment_size = 64 << 20;
1478 #else
1479   smm->session_va_space_size = 128 << 20;
1480   smm->evt_qs_segment_size = 1 << 20;
1481 #endif
1482   smm->is_enabled = 0;
1483   return 0;
1484 }
1485
1486 VLIB_INIT_FUNCTION (session_manager_main_init);
1487
1488 static clib_error_t *
1489 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1490 {
1491   session_main_t *smm = &session_main;
1492   u32 nitems;
1493   uword tmp;
1494
1495   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1496     {
1497       if (unformat (input, "event-queue-length %d", &nitems))
1498         {
1499           if (nitems >= 2048)
1500             smm->configured_event_queue_length = nitems;
1501           else
1502             clib_warning ("event queue length %d too small, ignored", nitems);
1503         }
1504       else if (unformat (input, "preallocated-sessions %d",
1505                          &smm->preallocated_sessions))
1506         ;
1507       else if (unformat (input, "v4-session-table-buckets %d",
1508                          &smm->configured_v4_session_table_buckets))
1509         ;
1510       else if (unformat (input, "v4-halfopen-table-buckets %d",
1511                          &smm->configured_v4_halfopen_table_buckets))
1512         ;
1513       else if (unformat (input, "v6-session-table-buckets %d",
1514                          &smm->configured_v6_session_table_buckets))
1515         ;
1516       else if (unformat (input, "v6-halfopen-table-buckets %d",
1517                          &smm->configured_v6_halfopen_table_buckets))
1518         ;
1519       else if (unformat (input, "v4-session-table-memory %U",
1520                          unformat_memory_size, &tmp))
1521         {
1522           if (tmp >= 0x100000000)
1523             return clib_error_return (0, "memory size %llx (%lld) too large",
1524                                       tmp, tmp);
1525           smm->configured_v4_session_table_memory = tmp;
1526         }
1527       else if (unformat (input, "v4-halfopen-table-memory %U",
1528                          unformat_memory_size, &tmp))
1529         {
1530           if (tmp >= 0x100000000)
1531             return clib_error_return (0, "memory size %llx (%lld) too large",
1532                                       tmp, tmp);
1533           smm->configured_v4_halfopen_table_memory = tmp;
1534         }
1535       else if (unformat (input, "v6-session-table-memory %U",
1536                          unformat_memory_size, &tmp))
1537         {
1538           if (tmp >= 0x100000000)
1539             return clib_error_return (0, "memory size %llx (%lld) too large",
1540                                       tmp, tmp);
1541           smm->configured_v6_session_table_memory = tmp;
1542         }
1543       else if (unformat (input, "v6-halfopen-table-memory %U",
1544                          unformat_memory_size, &tmp))
1545         {
1546           if (tmp >= 0x100000000)
1547             return clib_error_return (0, "memory size %llx (%lld) too large",
1548                                       tmp, tmp);
1549           smm->configured_v6_halfopen_table_memory = tmp;
1550         }
1551       else if (unformat (input, "local-endpoints-table-memory %U",
1552                          unformat_memory_size, &tmp))
1553         {
1554           if (tmp >= 0x100000000)
1555             return clib_error_return (0, "memory size %llx (%lld) too large",
1556                                       tmp, tmp);
1557           smm->local_endpoints_table_memory = tmp;
1558         }
1559       else if (unformat (input, "local-endpoints-table-buckets %d",
1560                          &smm->local_endpoints_table_buckets))
1561         ;
1562       else if (unformat (input, "evt_qs_memfd_seg"))
1563         smm->evt_qs_use_memfd_seg = 1;
1564       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1565                          &smm->evt_qs_segment_size))
1566         ;
1567       else
1568         return clib_error_return (0, "unknown input `%U'",
1569                                   format_unformat_error, input);
1570     }
1571   return 0;
1572 }
1573
1574 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1575
1576 /*
1577  * fd.io coding-style-patch-verification: ON
1578  *
1579  * Local Variables:
1580  * eval: (c-set-style "gnu")
1581  * End:
1582  */