svm: more fifo refactor/cleanup
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25
26 session_main_t session_main;
27
28 static inline int
29 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
30                             session_evt_type_t evt_type)
31 {
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35   u32 tries = 0, max_tries;
36
37   mq = session_main_get_vpp_event_queue (thread_index);
38   while (svm_msg_q_try_lock (mq))
39     {
40       max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
41       if (tries++ == max_tries)
42         {
43           SESSION_DBG ("failed to enqueue evt");
44           return -1;
45         }
46     }
47   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
48     {
49       svm_msg_q_unlock (mq);
50       return -2;
51     }
52   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
53   if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
54     {
55       svm_msg_q_unlock (mq);
56       return -2;
57     }
58   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59   evt->event_type = evt_type;
60   switch (evt_type)
61     {
62     case SESSION_CTRL_EVT_RPC:
63       evt->rpc_args.fp = data;
64       evt->rpc_args.arg = args;
65       break;
66     case SESSION_IO_EVT_TX:
67     case SESSION_IO_EVT_TX_FLUSH:
68     case SESSION_IO_EVT_BUILTIN_RX:
69       evt->session_index = *(u32 *) data;
70       break;
71     case SESSION_IO_EVT_BUILTIN_TX:
72     case SESSION_CTRL_EVT_CLOSE:
73       evt->session_handle = session_handle ((session_t *) data);
74       break;
75     default:
76       clib_warning ("evt unhandled!");
77       svm_msg_q_unlock (mq);
78       return -1;
79     }
80
81   svm_msg_q_add_and_unlock (mq, &msg);
82   return 0;
83 }
84
85 int
86 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
87 {
88   return session_send_evt_to_thread (&f->master_session_index, 0,
89                                      f->master_thread_index, evt_type);
90 }
91
92 int
93 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
94                                       session_evt_type_t evt_type)
95 {
96   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
97 }
98
99 int
100 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
101 {
102   /* only event supported for now is disconnect */
103   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE);
104   return session_send_evt_to_thread (s, 0, s->thread_index,
105                                      SESSION_CTRL_EVT_CLOSE);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 static void
129 session_program_transport_close (session_t * s)
130 {
131   u32 thread_index = vlib_get_thread_index ();
132   session_worker_t *wrk;
133   session_event_t *evt;
134
135   /* If we are in the handler thread, or being called with the worker barrier
136    * held, just append a new event to pending disconnects vector. */
137   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
138     {
139       wrk = session_main_get_worker (s->thread_index);
140       vec_add2 (wrk->pending_disconnects, evt, 1);
141       clib_memset (evt, 0, sizeof (*evt));
142       evt->session_handle = session_handle (s);
143       evt->event_type = SESSION_CTRL_EVT_CLOSE;
144     }
145   else
146     session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE);
147 }
148
149 session_t *
150 session_alloc (u32 thread_index)
151 {
152   session_worker_t *wrk = &session_main.wrk[thread_index];
153   session_t *s;
154   u8 will_expand = 0;
155   pool_get_aligned_will_expand (wrk->sessions, will_expand,
156                                 CLIB_CACHE_LINE_BYTES);
157   /* If we have peekers, let them finish */
158   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
159     {
160       clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
161       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
162       clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
163     }
164   else
165     {
166       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
167     }
168   clib_memset (s, 0, sizeof (*s));
169   s->session_index = s - wrk->sessions;
170   s->thread_index = thread_index;
171   return s;
172 }
173
174 void
175 session_free (session_t * s)
176 {
177   if (CLIB_DEBUG)
178     {
179       u8 thread_index = s->thread_index;
180       clib_memset (s, 0xFA, sizeof (*s));
181       pool_put (session_main.wrk[thread_index].sessions, s);
182       return;
183     }
184   SESSION_EVT_DBG (SESSION_EVT_FREE, s);
185   pool_put (session_main.wrk[s->thread_index].sessions, s);
186 }
187
188 void
189 session_free_w_fifos (session_t * s)
190 {
191   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
192   session_free (s);
193 }
194
195 /**
196  * Cleans up session and lookup table.
197  *
198  * Transport connection must still be valid.
199  */
200 static void
201 session_delete (session_t * s)
202 {
203   int rv;
204
205   /* Delete from the main lookup table. */
206   if ((rv = session_lookup_del_session (s)))
207     clib_warning ("hash delete error, rv %d", rv);
208
209   session_free_w_fifos (s);
210 }
211
212 static session_t *
213 session_alloc_for_connection (transport_connection_t * tc)
214 {
215   session_t *s;
216   u32 thread_index = tc->thread_index;
217
218   ASSERT (thread_index == vlib_get_thread_index ()
219           || transport_protocol_is_cl (tc->proto));
220
221   s = session_alloc (thread_index);
222   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
223   s->session_state = SESSION_STATE_CLOSED;
224
225   /* Attach transport to session and vice versa */
226   s->connection_index = tc->c_index;
227   tc->s_index = s->session_index;
228   return s;
229 }
230
231 /**
232  * Discards bytes from buffer chain
233  *
234  * It discards n_bytes_to_drop starting at first buffer after chain_b
235  */
236 always_inline void
237 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
238                                      vlib_buffer_t ** chain_b,
239                                      u32 n_bytes_to_drop)
240 {
241   vlib_buffer_t *next = *chain_b;
242   u32 to_drop = n_bytes_to_drop;
243   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
244   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
245     {
246       next = vlib_get_buffer (vm, next->next_buffer);
247       if (next->current_length > to_drop)
248         {
249           vlib_buffer_advance (next, to_drop);
250           to_drop = 0;
251         }
252       else
253         {
254           to_drop -= next->current_length;
255           next->current_length = 0;
256         }
257     }
258   *chain_b = next;
259
260   if (to_drop == 0)
261     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
262 }
263
264 /**
265  * Enqueue buffer chain tail
266  */
267 always_inline int
268 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
269                             u32 offset, u8 is_in_order)
270 {
271   vlib_buffer_t *chain_b;
272   u32 chain_bi, len, diff;
273   vlib_main_t *vm = vlib_get_main ();
274   u8 *data;
275   u32 written = 0;
276   int rv = 0;
277
278   if (is_in_order && offset)
279     {
280       diff = offset - b->current_length;
281       if (diff > b->total_length_not_including_first_buffer)
282         return 0;
283       chain_b = b;
284       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
285       chain_bi = vlib_get_buffer_index (vm, chain_b);
286     }
287   else
288     chain_bi = b->next_buffer;
289
290   do
291     {
292       chain_b = vlib_get_buffer (vm, chain_bi);
293       data = vlib_buffer_get_current (chain_b);
294       len = chain_b->current_length;
295       if (!len)
296         continue;
297       if (is_in_order)
298         {
299           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
300           if (rv == len)
301             {
302               written += rv;
303             }
304           else if (rv < len)
305             {
306               return (rv > 0) ? (written + rv) : written;
307             }
308           else if (rv > len)
309             {
310               written += rv;
311
312               /* written more than what was left in chain */
313               if (written > b->total_length_not_including_first_buffer)
314                 return written;
315
316               /* drop the bytes that have already been delivered */
317               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
318             }
319         }
320       else
321         {
322           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
323           if (rv)
324             {
325               clib_warning ("failed to enqueue multi-buffer seg");
326               return -1;
327             }
328           offset += len;
329         }
330     }
331   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
332           ? chain_b->next_buffer : 0));
333
334   if (is_in_order)
335     return written;
336
337   return 0;
338 }
339
340 /*
341  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
342  * event but on request can queue notification events for later delivery by
343  * calling stream_server_flush_enqueue_events().
344  *
345  * @param tc Transport connection which is to be enqueued data
346  * @param b Buffer to be enqueued
347  * @param offset Offset at which to start enqueueing if out-of-order
348  * @param queue_event Flag to indicate if peer is to be notified or if event
349  *                    is to be queued. The former is useful when more data is
350  *                    enqueued and only one event is to be generated.
351  * @param is_in_order Flag to indicate if data is in order
352  * @return Number of bytes enqueued or a negative value if enqueueing failed.
353  */
354 int
355 session_enqueue_stream_connection (transport_connection_t * tc,
356                                    vlib_buffer_t * b, u32 offset,
357                                    u8 queue_event, u8 is_in_order)
358 {
359   session_t *s;
360   int enqueued = 0, rv, in_order_off;
361
362   s = session_get (tc->s_index, tc->thread_index);
363
364   if (is_in_order)
365     {
366       enqueued = svm_fifo_enqueue (s->rx_fifo,
367                                    b->current_length,
368                                    vlib_buffer_get_current (b));
369       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
370                          && enqueued >= 0))
371         {
372           in_order_off = enqueued > b->current_length ? enqueued : 0;
373           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
374           if (rv > 0)
375             enqueued += rv;
376         }
377     }
378   else
379     {
380       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
381                                          b->current_length,
382                                          vlib_buffer_get_current (b));
383       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
384         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
385       /* if something was enqueued, report even this as success for ooo
386        * segment handling */
387       return rv;
388     }
389
390   if (queue_event)
391     {
392       /* Queue RX event on this fifo. Eventually these will need to be flushed
393        * by calling stream_server_flush_enqueue_events () */
394       session_worker_t *wrk;
395
396       wrk = session_main_get_worker (s->thread_index);
397       if (!(s->flags & SESSION_F_RX_EVT))
398         {
399           s->flags |= SESSION_F_RX_EVT;
400           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
401         }
402     }
403
404   return enqueued;
405 }
406
407 int
408 session_enqueue_dgram_connection (session_t * s,
409                                   session_dgram_hdr_t * hdr,
410                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
411 {
412   int enqueued = 0, rv, in_order_off;
413
414   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
415           >= b->current_length + sizeof (*hdr));
416
417   svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
418   enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
419                                vlib_buffer_get_current (b));
420   if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
421     {
422       in_order_off = enqueued > b->current_length ? enqueued : 0;
423       rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
424       if (rv > 0)
425         enqueued += rv;
426     }
427   if (queue_event)
428     {
429       /* Queue RX event on this fifo. Eventually these will need to be flushed
430        * by calling stream_server_flush_enqueue_events () */
431       session_worker_t *wrk;
432
433       wrk = session_main_get_worker (s->thread_index);
434       if (!(s->flags & SESSION_F_RX_EVT))
435         {
436           s->flags |= SESSION_F_RX_EVT;
437           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
438         }
439     }
440   return enqueued;
441 }
442
443 int
444 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
445                             u32 offset, u32 max_bytes)
446 {
447   session_t *s = session_get (tc->s_index, tc->thread_index);
448   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
449 }
450
451 u32
452 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
453 {
454   session_t *s = session_get (tc->s_index, tc->thread_index);
455   return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
456 }
457
458 static inline int
459 session_notify_subscribers (u32 app_index, session_t * s,
460                             svm_fifo_t * f, session_evt_type_t evt_type)
461 {
462   app_worker_t *app_wrk;
463   application_t *app;
464   int i;
465
466   app = application_get (app_index);
467   if (!app)
468     return -1;
469
470   for (i = 0; i < f->n_subscribers; i++)
471     {
472       app_wrk = application_get_worker (app, f->subscribers[i]);
473       if (!app_wrk)
474         continue;
475       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
476         return -1;
477     }
478
479   return 0;
480 }
481
482 /**
483  * Notify session peer that new data has been enqueued.
484  *
485  * @param s     Stream session for which the event is to be generated.
486  * @param lock  Flag to indicate if call should lock message queue.
487  *
488  * @return 0 on success or negative number if failed to send notification.
489  */
490 static inline int
491 session_enqueue_notify_inline (session_t * s)
492 {
493   app_worker_t *app_wrk;
494   u32 session_index;
495   u8 n_subscribers;
496
497   session_index = s->session_index;
498   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
499
500   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
501   if (PREDICT_FALSE (!app_wrk))
502     {
503       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
504       return 0;
505     }
506
507   /* *INDENT-OFF* */
508   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
509       ed->data[0] = SESSION_IO_EVT_RX;
510       ed->data[1] = svm_fifo_max_dequeue_prod (s->rx_fifo);
511   }));
512   /* *INDENT-ON* */
513
514   s->flags &= ~SESSION_F_RX_EVT;
515   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
516                                                      SESSION_IO_EVT_RX)))
517     return -1;
518
519   if (PREDICT_FALSE (n_subscribers))
520     {
521       s = session_get (session_index, vlib_get_thread_index ());
522       return session_notify_subscribers (app_wrk->app_index, s,
523                                          s->rx_fifo, SESSION_IO_EVT_RX);
524     }
525
526   return 0;
527 }
528
529 int
530 session_enqueue_notify (session_t * s)
531 {
532   return session_enqueue_notify_inline (s);
533 }
534
535 int
536 session_dequeue_notify (session_t * s)
537 {
538   app_worker_t *app_wrk;
539
540   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
541   if (PREDICT_FALSE (!app_wrk))
542     return -1;
543
544   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
545                                                      SESSION_IO_EVT_TX)))
546     return -1;
547
548   if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
549     return session_notify_subscribers (app_wrk->app_index, s,
550                                        s->tx_fifo, SESSION_IO_EVT_TX);
551
552   svm_fifo_clear_tx_ntf (s->tx_fifo);
553
554   return 0;
555 }
556
557 /**
558  * Flushes queue of sessions that are to be notified of new data
559  * enqueued events.
560  *
561  * @param thread_index Thread index for which the flush is to be performed.
562  * @return 0 on success or a positive number indicating the number of
563  *         failures due to API queue being full.
564  */
565 int
566 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
567 {
568   session_worker_t *wrk = session_main_get_worker (thread_index);
569   session_t *s;
570   int i, errors = 0;
571   u32 *indices;
572
573   indices = wrk->session_to_enqueue[transport_proto];
574
575   for (i = 0; i < vec_len (indices); i++)
576     {
577       s = session_get_if_valid (indices[i], thread_index);
578       if (PREDICT_FALSE (!s))
579         {
580           errors++;
581           continue;
582         }
583
584       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
585         errors++;
586     }
587
588   vec_reset_length (indices);
589   wrk->session_to_enqueue[transport_proto] = indices;
590
591   return errors;
592 }
593
594 int
595 session_main_flush_all_enqueue_events (u8 transport_proto)
596 {
597   vlib_thread_main_t *vtm = vlib_get_thread_main ();
598   int i, errors = 0;
599   for (i = 0; i < 1 + vtm->n_threads; i++)
600     errors += session_main_flush_enqueue_events (transport_proto, i);
601   return errors;
602 }
603
604 int
605 session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
606 {
607   u32 opaque = 0, new_ti, new_si;
608   app_worker_t *app_wrk;
609   session_t *s = 0;
610   u64 ho_handle;
611
612   /*
613    * Find connection handle and cleanup half-open table
614    */
615   ho_handle = session_lookup_half_open_handle (tc);
616   if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
617     {
618       SESSION_DBG ("half-open was removed!");
619       return -1;
620     }
621   session_lookup_del_half_open (tc);
622
623   /* Get the app's index from the handle we stored when opening connection
624    * and the opaque (api_context for external apps) from transport session
625    * index */
626   app_wrk = app_worker_get_if_valid (ho_handle >> 32);
627   if (!app_wrk)
628     return -1;
629
630   opaque = tc->s_index;
631
632   if (is_fail)
633     return app_worker_connect_notify (app_wrk, s, opaque);
634
635   s = session_alloc_for_connection (tc);
636   s->session_state = SESSION_STATE_CONNECTING;
637   s->app_wrk_index = app_wrk->wrk_index;
638   new_si = s->session_index;
639   new_ti = s->thread_index;
640
641   if (app_worker_init_connected (app_wrk, s))
642     {
643       session_free (s);
644       app_worker_connect_notify (app_wrk, 0, opaque);
645       return -1;
646     }
647
648   if (app_worker_connect_notify (app_wrk, s, opaque))
649     {
650       s = session_get (new_si, new_ti);
651       session_free_w_fifos (s);
652       return -1;
653     }
654
655   s = session_get (new_si, new_ti);
656   s->session_state = SESSION_STATE_READY;
657   session_lookup_add_connection (tc, session_handle (s));
658
659   return 0;
660 }
661
662 typedef struct _session_switch_pool_args
663 {
664   u32 session_index;
665   u32 thread_index;
666   u32 new_thread_index;
667   u32 new_session_index;
668 } session_switch_pool_args_t;
669
670 static void
671 session_switch_pool (void *cb_args)
672 {
673   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
674   session_t *s;
675   ASSERT (args->thread_index == vlib_get_thread_index ());
676   s = session_get (args->session_index, args->thread_index);
677   s->tx_fifo->master_session_index = args->new_session_index;
678   s->tx_fifo->master_thread_index = args->new_thread_index;
679   transport_cleanup (session_get_transport_proto (s), s->connection_index,
680                      s->thread_index);
681   session_free (s);
682   clib_mem_free (cb_args);
683 }
684
685 /**
686  * Move dgram session to the right thread
687  */
688 int
689 session_dgram_connect_notify (transport_connection_t * tc,
690                               u32 old_thread_index, session_t ** new_session)
691 {
692   session_t *new_s;
693   session_switch_pool_args_t *rpc_args;
694
695   /*
696    * Clone half-open session to the right thread.
697    */
698   new_s = session_clone_safe (tc->s_index, old_thread_index);
699   new_s->connection_index = tc->c_index;
700   new_s->rx_fifo->master_session_index = new_s->session_index;
701   new_s->rx_fifo->master_thread_index = new_s->thread_index;
702   new_s->session_state = SESSION_STATE_READY;
703   session_lookup_add_connection (tc, session_handle (new_s));
704
705   /*
706    * Ask thread owning the old session to clean it up and make us the tx
707    * fifo owner
708    */
709   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
710   rpc_args->new_session_index = new_s->session_index;
711   rpc_args->new_thread_index = new_s->thread_index;
712   rpc_args->session_index = tc->s_index;
713   rpc_args->thread_index = old_thread_index;
714   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
715                                   rpc_args);
716
717   tc->s_index = new_s->session_index;
718   new_s->connection_index = tc->c_index;
719   *new_session = new_s;
720   return 0;
721 }
722
723 /**
724  * Notification from transport that connection is being closed.
725  *
726  * A disconnect is sent to application but state is not removed. Once
727  * disconnect is acknowledged by application, session disconnect is called.
728  * Ultimately this leads to close being called on transport (passive close).
729  */
730 void
731 session_transport_closing_notify (transport_connection_t * tc)
732 {
733   app_worker_t *app_wrk;
734   session_t *s;
735
736   s = session_get (tc->s_index, tc->thread_index);
737   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
738     return;
739   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
740   app_wrk = app_worker_get (s->app_wrk_index);
741   app_worker_close_notify (app_wrk, s);
742 }
743
744 /**
745  * Notification from transport that connection is being deleted
746  *
747  * This removes the session if it is still valid. It should be called only on
748  * previously fully established sessions. For instance failed connects should
749  * call stream_session_connect_notify and indicate that the connect has
750  * failed.
751  */
752 void
753 session_transport_delete_notify (transport_connection_t * tc)
754 {
755   session_t *s;
756
757   /* App might've been removed already */
758   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
759     return;
760
761   /* Make sure we don't try to send anything more */
762   svm_fifo_dequeue_drop_all (s->tx_fifo);
763
764   switch (s->session_state)
765     {
766     case SESSION_STATE_CREATED:
767       /* Session was created but accept notification was not yet sent to the
768        * app. Cleanup everything. */
769       session_lookup_del_session (s);
770       session_free_w_fifos (s);
771       break;
772     case SESSION_STATE_ACCEPTING:
773     case SESSION_STATE_TRANSPORT_CLOSING:
774       /* If transport finishes or times out before we get a reply
775        * from the app, mark transport as closed and wait for reply
776        * before removing the session. Cleanup session table in advance
777        * because transport will soon be closed and closed sessions
778        * are assumed to have been removed from the lookup table */
779       session_lookup_del_session (s);
780       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
781       break;
782     case SESSION_STATE_CLOSING:
783     case SESSION_STATE_CLOSED_WAITING:
784       /* Cleanup lookup table as transport needs to still be valid.
785        * Program transport close to ensure that all session events
786        * have been cleaned up. Once transport close is called, the
787        * session is just removed because both transport and app have
788        * confirmed the close*/
789       session_lookup_del_session (s);
790       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
791       session_program_transport_close (s);
792       break;
793     case SESSION_STATE_TRANSPORT_CLOSED:
794       break;
795     case SESSION_STATE_CLOSED:
796       session_delete (s);
797       break;
798     default:
799       clib_warning ("session state %u", s->session_state);
800       session_delete (s);
801       break;
802     }
803 }
804
805 /**
806  * Notification from transport that session can be closed
807  *
808  * Should be called by transport only if it was closed with non-empty
809  * tx fifo and once it decides to begin the closing procedure prior to
810  * issuing a delete notify. This gives the chance to the session layer
811  * to cleanup any outstanding events.
812  */
813 void
814 session_transport_closed_notify (transport_connection_t * tc)
815 {
816   session_t *s;
817
818   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
819     return;
820
821   /* If app close has not been received or has not yet resulted in
822    * a transport close, only mark the session transport as closed */
823   if (s->session_state <= SESSION_STATE_CLOSING)
824     {
825       session_lookup_del_session (s);
826       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
827     }
828   else
829     s->session_state = SESSION_STATE_CLOSED;
830 }
831
832 /**
833  * Notify application that connection has been reset.
834  */
835 void
836 session_transport_reset_notify (transport_connection_t * tc)
837 {
838   app_worker_t *app_wrk;
839   session_t *s;
840
841   s = session_get (tc->s_index, tc->thread_index);
842   svm_fifo_dequeue_drop_all (s->tx_fifo);
843   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
844     return;
845   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
846   app_wrk = app_worker_get (s->app_wrk_index);
847   app_worker_reset_notify (app_wrk, s);
848 }
849
850 int
851 session_stream_accept_notify (transport_connection_t * tc)
852 {
853   app_worker_t *app_wrk;
854   session_t *s;
855
856   s = session_get (tc->s_index, tc->thread_index);
857   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
858   if (!app_wrk)
859     return -1;
860   s->session_state = SESSION_STATE_ACCEPTING;
861   return app_worker_accept_notify (app_wrk, s);
862 }
863
864 /**
865  * Accept a stream session. Optionally ping the server by callback.
866  */
867 int
868 session_stream_accept (transport_connection_t * tc, u32 listener_index,
869                        u8 notify)
870 {
871   session_t *s;
872   int rv;
873
874   s = session_alloc_for_connection (tc);
875   s->listener_index = listener_index;
876   s->session_state = SESSION_STATE_CREATED;
877
878   if ((rv = app_worker_init_accepted (s)))
879     return rv;
880
881   session_lookup_add_connection (tc, session_handle (s));
882
883   /* Shoulder-tap the server */
884   if (notify)
885     {
886       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
887       return app_worker_accept_notify (app_wrk, s);
888     }
889
890   return 0;
891 }
892
893 int
894 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
895 {
896   transport_connection_t *tc;
897   transport_endpoint_cfg_t *tep;
898   app_worker_t *app_wrk;
899   session_handle_t sh;
900   session_t *s;
901   int rv;
902
903   tep = session_endpoint_to_transport_cfg (rmt);
904   rv = transport_connect (rmt->transport_proto, tep);
905   if (rv < 0)
906     {
907       SESSION_DBG ("Transport failed to open connection.");
908       return VNET_API_ERROR_SESSION_CONNECT;
909     }
910
911   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
912
913   /* For dgram type of service, allocate session and fifos now */
914   app_wrk = app_worker_get (app_wrk_index);
915   s = session_alloc_for_connection (tc);
916   s->app_wrk_index = app_wrk->wrk_index;
917   s->session_state = SESSION_STATE_OPENED;
918   if (app_worker_init_connected (app_wrk, s))
919     {
920       session_free (s);
921       return -1;
922     }
923
924   sh = session_handle (s);
925   session_lookup_add_connection (tc, sh);
926
927   return app_worker_connect_notify (app_wrk, s, opaque);
928 }
929
930 int
931 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
932 {
933   transport_connection_t *tc;
934   transport_endpoint_cfg_t *tep;
935   u64 handle;
936   int rv;
937
938   tep = session_endpoint_to_transport_cfg (rmt);
939   rv = transport_connect (rmt->transport_proto, tep);
940   if (rv < 0)
941     {
942       SESSION_DBG ("Transport failed to open connection.");
943       return VNET_API_ERROR_SESSION_CONNECT;
944     }
945
946   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
947
948   /* If transport offers a stream service, only allocate session once the
949    * connection has been established.
950    * Add connection to half-open table and save app and tc index. The
951    * latter is needed to help establish the connection while the former
952    * is needed when the connect notify comes and we have to notify the
953    * external app
954    */
955   handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
956   session_lookup_add_half_open (tc, handle);
957
958   /* Store api_context (opaque) for when the reply comes. Not the nicest
959    * thing but better than allocating a separate half-open pool.
960    */
961   tc->s_index = opaque;
962   return 0;
963 }
964
965 int
966 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
967 {
968   session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
969   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
970
971   sep->app_wrk_index = app_wrk_index;
972   sep->opaque = opaque;
973
974   return transport_connect (rmt->transport_proto, tep_cfg);
975 }
976
977 typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
978
979 /* *INDENT-OFF* */
980 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
981   session_open_vc,
982   session_open_cl,
983   session_open_app,
984 };
985 /* *INDENT-ON* */
986
987 /**
988  * Ask transport to open connection to remote transport endpoint.
989  *
990  * Stores handle for matching request with reply since the call can be
991  * asynchronous. For instance, for TCP the 3-way handshake must complete
992  * before reply comes. Session is only created once connection is established.
993  *
994  * @param app_index Index of the application requesting the connect
995  * @param st Session type requested.
996  * @param tep Remote transport endpoint
997  * @param opaque Opaque data (typically, api_context) the application expects
998  *               on open completion.
999  */
1000 int
1001 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1002 {
1003   transport_service_type_t tst;
1004   tst = transport_protocol_service_type (rmt->transport_proto);
1005   return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1006 }
1007
1008 /**
1009  * Ask transport to listen on session endpoint.
1010  *
1011  * @param s Session for which listen will be called. Note that unlike
1012  *          established sessions, listen sessions are not associated to a
1013  *          thread.
1014  * @param sep Local endpoint to be listened on.
1015  */
1016 int
1017 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1018 {
1019   transport_endpoint_t *tep;
1020   u32 tc_index, s_index;
1021
1022   /* Transport bind/listen */
1023   tep = session_endpoint_to_transport (sep);
1024   s_index = ls->session_index;
1025   tc_index = transport_start_listen (session_get_transport_proto (ls),
1026                                      s_index, tep);
1027
1028   if (tc_index == (u32) ~ 0)
1029     return -1;
1030
1031   /* Attach transport to session. Lookup tables are populated by the app
1032    * worker because local tables (for ct sessions) are not backed by a fib */
1033   ls = listen_session_get (s_index);
1034   ls->connection_index = tc_index;
1035
1036   return 0;
1037 }
1038
1039 /**
1040  * Ask transport to stop listening on local transport endpoint.
1041  *
1042  * @param s Session to stop listening on. It must be in state LISTENING.
1043  */
1044 int
1045 session_stop_listen (session_t * s)
1046 {
1047   transport_proto_t tp = session_get_transport_proto (s);
1048   transport_connection_t *tc;
1049
1050   if (s->session_state != SESSION_STATE_LISTENING)
1051     return -1;
1052
1053   tc = transport_get_listener (tp, s->connection_index);
1054   if (!tc)
1055     return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1056
1057   session_lookup_del_connection (tc);
1058   transport_stop_listen (tp, s->connection_index);
1059   return 0;
1060 }
1061
1062 /**
1063  * Initialize session closing procedure.
1064  *
1065  * Request is always sent to session node to ensure that all outstanding
1066  * requests are served before transport is notified.
1067  */
1068 void
1069 session_close (session_t * s)
1070 {
1071   if (!s)
1072     return;
1073
1074   if (s->session_state >= SESSION_STATE_CLOSING)
1075     {
1076       /* Session will only be removed once both app and transport
1077        * acknowledge the close */
1078       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1079         session_program_transport_close (s);
1080
1081       /* Session already closed. Clear the tx fifo */
1082       if (s->session_state == SESSION_STATE_CLOSED)
1083         svm_fifo_dequeue_drop_all (s->tx_fifo);
1084       return;
1085     }
1086
1087   s->session_state = SESSION_STATE_CLOSING;
1088   session_program_transport_close (s);
1089 }
1090
1091 /**
1092  * Notify transport the session can be disconnected. This should eventually
1093  * result in a delete notification that allows us to cleanup session state.
1094  * Called for both active/passive disconnects.
1095  *
1096  * Must be called from the session's thread.
1097  */
1098 void
1099 session_transport_close (session_t * s)
1100 {
1101   /* If transport is already closed, just free the session */
1102   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1103     {
1104       session_free_w_fifos (s);
1105       return;
1106     }
1107
1108   /* If tx queue wasn't drained, change state to closed waiting for transport.
1109    * This way, the transport, if it so wishes, can continue to try sending the
1110    * outstanding data (in closed state it cannot). It MUST however at one
1111    * point, either after sending everything or after a timeout, call delete
1112    * notify. This will finally lead to the complete cleanup of the session.
1113    */
1114   if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1115     s->session_state = SESSION_STATE_CLOSED_WAITING;
1116   else
1117     s->session_state = SESSION_STATE_CLOSED;
1118
1119   transport_close (session_get_transport_proto (s), s->connection_index,
1120                    s->thread_index);
1121 }
1122
1123 /**
1124  * Cleanup transport and session state.
1125  *
1126  * Notify transport of the cleanup and free the session. This should
1127  * be called only if transport reported some error and is already
1128  * closed.
1129  */
1130 void
1131 session_transport_cleanup (session_t * s)
1132 {
1133   s->session_state = SESSION_STATE_CLOSED;
1134
1135   /* Delete from main lookup table before we axe the the transport */
1136   session_lookup_del_session (s);
1137   transport_cleanup (session_get_transport_proto (s), s->connection_index,
1138                      s->thread_index);
1139   /* Since we called cleanup, no delete notification will come. So, make
1140    * sure the session is properly freed. */
1141   session_free_w_fifos (s);
1142 }
1143
1144 /**
1145  * Allocate event queues in the shared-memory segment
1146  *
1147  * That can either be a newly created memfd segment, that will need to be
1148  * mapped by all stack users, or the binary api's svm region. The latter is
1149  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1150  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1151  * vpp uses api svm region for event queues.
1152  */
1153 void
1154 session_vpp_event_queues_allocate (session_main_t * smm)
1155 {
1156   u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1157   ssvm_private_t *eqs = &smm->evt_qs_segment;
1158   api_main_t *am = &api_main;
1159   uword eqs_size = 64 << 20;
1160   pid_t vpp_pid = getpid ();
1161   void *oldheap;
1162   int i;
1163
1164   if (smm->configured_event_queue_length)
1165     evt_q_length = smm->configured_event_queue_length;
1166
1167   if (smm->evt_qs_use_memfd_seg)
1168     {
1169       if (smm->evt_qs_segment_size)
1170         eqs_size = smm->evt_qs_segment_size;
1171
1172       eqs->ssvm_size = eqs_size;
1173       eqs->i_am_master = 1;
1174       eqs->my_pid = vpp_pid;
1175       eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1176       eqs->requested_va = smm->session_baseva;
1177
1178       if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
1179         {
1180           clib_warning ("failed to initialize queue segment");
1181           return;
1182         }
1183     }
1184
1185   if (smm->evt_qs_use_memfd_seg)
1186     oldheap = ssvm_push_heap (eqs->sh);
1187   else
1188     oldheap = svm_push_data_heap (am->vlib_rp);
1189
1190   for (i = 0; i < vec_len (smm->wrk); i++)
1191     {
1192       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1193       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1194         {evt_q_length, evt_size, 0}
1195         ,
1196         {evt_q_length >> 1, 256, 0}
1197       };
1198       cfg->consumer_pid = 0;
1199       cfg->n_rings = 2;
1200       cfg->q_nitems = evt_q_length;
1201       cfg->ring_cfgs = rc;
1202       smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1203       if (smm->evt_qs_use_memfd_seg)
1204         {
1205           if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
1206             clib_warning ("eventfd returned");
1207         }
1208     }
1209
1210   if (smm->evt_qs_use_memfd_seg)
1211     ssvm_pop_heap (oldheap);
1212   else
1213     svm_pop_heap (oldheap);
1214 }
1215
1216 ssvm_private_t *
1217 session_main_get_evt_q_segment (void)
1218 {
1219   session_main_t *smm = &session_main;
1220   if (smm->evt_qs_use_memfd_seg)
1221     return &smm->evt_qs_segment;
1222   return 0;
1223 }
1224
1225 u64
1226 session_segment_handle (session_t * s)
1227 {
1228   svm_fifo_t *f;
1229
1230   if (!s->rx_fifo)
1231     return SESSION_INVALID_HANDLE;
1232
1233   f = s->rx_fifo;
1234   return segment_manager_make_segment_handle (f->segment_manager,
1235                                               f->segment_index);
1236 }
1237
1238 /* *INDENT-OFF* */
1239 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1240     session_tx_fifo_peek_and_snd,
1241     session_tx_fifo_dequeue_and_snd,
1242     session_tx_fifo_dequeue_internal,
1243     session_tx_fifo_dequeue_and_snd
1244 };
1245 /* *INDENT-ON* */
1246
1247 /**
1248  * Initialize session layer for given transport proto and ip version
1249  *
1250  * Allocates per session type (transport proto + ip version) data structures
1251  * and adds arc from session queue node to session type output node.
1252  */
1253 void
1254 session_register_transport (transport_proto_t transport_proto,
1255                             const transport_proto_vft_t * vft, u8 is_ip4,
1256                             u32 output_node)
1257 {
1258   session_main_t *smm = &session_main;
1259   session_type_t session_type;
1260   u32 next_index = ~0;
1261
1262   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1263
1264   vec_validate (smm->session_type_to_next, session_type);
1265   vec_validate (smm->session_tx_fns, session_type);
1266
1267   /* *INDENT-OFF* */
1268   if (output_node != ~0)
1269     {
1270       foreach_vlib_main (({
1271           next_index = vlib_node_add_next (this_vlib_main,
1272                                            session_queue_node.index,
1273                                            output_node);
1274       }));
1275     }
1276   /* *INDENT-ON* */
1277
1278   smm->session_type_to_next[session_type] = next_index;
1279   smm->session_tx_fns[session_type] = session_tx_fns[vft->tx_type];
1280 }
1281
1282 transport_connection_t *
1283 session_get_transport (session_t * s)
1284 {
1285   if (s->session_state != SESSION_STATE_LISTENING)
1286     return transport_get_connection (session_get_transport_proto (s),
1287                                      s->connection_index, s->thread_index);
1288   else
1289     return transport_get_listener (session_get_transport_proto (s),
1290                                    s->connection_index);
1291 }
1292
1293 void
1294 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1295 {
1296   if (s->session_state != SESSION_STATE_LISTENING)
1297     return transport_get_endpoint (session_get_transport_proto (s),
1298                                    s->connection_index, s->thread_index, tep,
1299                                    is_lcl);
1300   else
1301     return transport_get_listener_endpoint (session_get_transport_proto (s),
1302                                             s->connection_index, tep, is_lcl);
1303 }
1304
1305 transport_connection_t *
1306 listen_session_get_transport (session_t * s)
1307 {
1308   return transport_get_listener (session_get_transport_proto (s),
1309                                  s->connection_index);
1310 }
1311
1312 void
1313 session_flush_frames_main_thread (vlib_main_t * vm)
1314 {
1315   ASSERT (vlib_get_thread_index () == 0);
1316   vlib_process_signal_event_mt (vm, session_queue_process_node.index,
1317                                 SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
1318 }
1319
1320 static clib_error_t *
1321 session_manager_main_enable (vlib_main_t * vm)
1322 {
1323   segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1324   session_main_t *smm = &session_main;
1325   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1326   u32 num_threads, preallocated_sessions_per_worker;
1327   session_worker_t *wrk;
1328   int i;
1329
1330   num_threads = 1 /* main thread */  + vtm->n_threads;
1331
1332   if (num_threads < 1)
1333     return clib_error_return (0, "n_thread_stacks not set");
1334
1335   /* Allocate cache line aligned worker contexts */
1336   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1337
1338   for (i = 0; i < num_threads; i++)
1339     {
1340       wrk = &smm->wrk[i];
1341       vec_validate (wrk->free_event_vector, 128);
1342       _vec_len (wrk->free_event_vector) = 0;
1343       vec_validate (wrk->pending_event_vector, 128);
1344       _vec_len (wrk->pending_event_vector) = 0;
1345       vec_validate (wrk->pending_disconnects, 128);
1346       _vec_len (wrk->pending_disconnects) = 0;
1347       vec_validate (wrk->postponed_event_vector, 128);
1348       _vec_len (wrk->postponed_event_vector) = 0;
1349
1350       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
1351       wrk->dispatch_period = 500e-6;
1352
1353       if (num_threads > 1)
1354         clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1355     }
1356
1357 #if SESSION_DEBUG
1358   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1359 #endif
1360
1361   /* Allocate vpp event queues segment and queue */
1362   session_vpp_event_queues_allocate (smm);
1363
1364   /* Initialize fifo segment main baseva and timeout */
1365   sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1366   sm_args->size = smm->session_va_space_size;
1367   segment_manager_main_init (sm_args);
1368
1369   /* Preallocate sessions */
1370   if (smm->preallocated_sessions)
1371     {
1372       if (num_threads == 1)
1373         {
1374           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1375         }
1376       else
1377         {
1378           int j;
1379           preallocated_sessions_per_worker =
1380             (1.1 * (f64) smm->preallocated_sessions /
1381              (f64) (num_threads - 1));
1382
1383           for (j = 1; j < num_threads; j++)
1384             {
1385               pool_init_fixed (smm->wrk[j].sessions,
1386                                preallocated_sessions_per_worker);
1387             }
1388         }
1389     }
1390
1391   session_lookup_init ();
1392   app_namespaces_init ();
1393   transport_init ();
1394
1395   smm->is_enabled = 1;
1396
1397   /* Enable transports */
1398   transport_enable_disable (vm, 1);
1399   transport_init_tx_pacers_period ();
1400   return 0;
1401 }
1402
1403 void
1404 session_node_enable_disable (u8 is_en)
1405 {
1406   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1407   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1408   u8 have_workers = vtm->n_threads != 0;
1409
1410   /* *INDENT-OFF* */
1411   foreach_vlib_main (({
1412     if (have_workers && ii == 0)
1413       {
1414         vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1415                              state);
1416         if (is_en)
1417           {
1418             vlib_node_t *n = vlib_get_node (this_vlib_main,
1419                                             session_queue_process_node.index);
1420             vlib_start_process (this_vlib_main, n->runtime_index);
1421           }
1422         else
1423           {
1424             vlib_process_signal_event_mt (this_vlib_main,
1425                                           session_queue_process_node.index,
1426                                           SESSION_Q_PROCESS_STOP, 0);
1427           }
1428
1429         continue;
1430       }
1431     vlib_node_set_state (this_vlib_main, session_queue_node.index,
1432                          state);
1433   }));
1434   /* *INDENT-ON* */
1435 }
1436
1437 clib_error_t *
1438 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1439 {
1440   clib_error_t *error = 0;
1441   if (is_en)
1442     {
1443       if (session_main.is_enabled)
1444         return 0;
1445
1446       session_node_enable_disable (is_en);
1447       error = session_manager_main_enable (vm);
1448     }
1449   else
1450     {
1451       session_main.is_enabled = 0;
1452       session_node_enable_disable (is_en);
1453     }
1454
1455   return error;
1456 }
1457
1458 clib_error_t *
1459 session_manager_main_init (vlib_main_t * vm)
1460 {
1461   session_main_t *smm = &session_main;
1462   smm->session_baseva = HIGH_SEGMENT_BASEVA;
1463 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1464   smm->session_va_space_size = 128ULL << 30;
1465   smm->evt_qs_segment_size = 64 << 20;
1466 #else
1467   smm->session_va_space_size = 128 << 20;
1468   smm->evt_qs_segment_size = 1 << 20;
1469 #endif
1470   smm->is_enabled = 0;
1471   return 0;
1472 }
1473
1474 VLIB_INIT_FUNCTION (session_manager_main_init);
1475
1476 static clib_error_t *
1477 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1478 {
1479   session_main_t *smm = &session_main;
1480   u32 nitems;
1481   uword tmp;
1482
1483   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1484     {
1485       if (unformat (input, "event-queue-length %d", &nitems))
1486         {
1487           if (nitems >= 2048)
1488             smm->configured_event_queue_length = nitems;
1489           else
1490             clib_warning ("event queue length %d too small, ignored", nitems);
1491         }
1492       else if (unformat (input, "preallocated-sessions %d",
1493                          &smm->preallocated_sessions))
1494         ;
1495       else if (unformat (input, "v4-session-table-buckets %d",
1496                          &smm->configured_v4_session_table_buckets))
1497         ;
1498       else if (unformat (input, "v4-halfopen-table-buckets %d",
1499                          &smm->configured_v4_halfopen_table_buckets))
1500         ;
1501       else if (unformat (input, "v6-session-table-buckets %d",
1502                          &smm->configured_v6_session_table_buckets))
1503         ;
1504       else if (unformat (input, "v6-halfopen-table-buckets %d",
1505                          &smm->configured_v6_halfopen_table_buckets))
1506         ;
1507       else if (unformat (input, "v4-session-table-memory %U",
1508                          unformat_memory_size, &tmp))
1509         {
1510           if (tmp >= 0x100000000)
1511             return clib_error_return (0, "memory size %llx (%lld) too large",
1512                                       tmp, tmp);
1513           smm->configured_v4_session_table_memory = tmp;
1514         }
1515       else if (unformat (input, "v4-halfopen-table-memory %U",
1516                          unformat_memory_size, &tmp))
1517         {
1518           if (tmp >= 0x100000000)
1519             return clib_error_return (0, "memory size %llx (%lld) too large",
1520                                       tmp, tmp);
1521           smm->configured_v4_halfopen_table_memory = tmp;
1522         }
1523       else if (unformat (input, "v6-session-table-memory %U",
1524                          unformat_memory_size, &tmp))
1525         {
1526           if (tmp >= 0x100000000)
1527             return clib_error_return (0, "memory size %llx (%lld) too large",
1528                                       tmp, tmp);
1529           smm->configured_v6_session_table_memory = tmp;
1530         }
1531       else if (unformat (input, "v6-halfopen-table-memory %U",
1532                          unformat_memory_size, &tmp))
1533         {
1534           if (tmp >= 0x100000000)
1535             return clib_error_return (0, "memory size %llx (%lld) too large",
1536                                       tmp, tmp);
1537           smm->configured_v6_halfopen_table_memory = tmp;
1538         }
1539       else if (unformat (input, "local-endpoints-table-memory %U",
1540                          unformat_memory_size, &tmp))
1541         {
1542           if (tmp >= 0x100000000)
1543             return clib_error_return (0, "memory size %llx (%lld) too large",
1544                                       tmp, tmp);
1545           smm->local_endpoints_table_memory = tmp;
1546         }
1547       else if (unformat (input, "local-endpoints-table-buckets %d",
1548                          &smm->local_endpoints_table_buckets))
1549         ;
1550       else if (unformat (input, "evt_qs_memfd_seg"))
1551         smm->evt_qs_use_memfd_seg = 1;
1552       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1553                          &smm->evt_qs_segment_size))
1554         ;
1555       else
1556         return clib_error_return (0, "unknown input `%U'",
1557                                   format_unformat_error, input);
1558     }
1559   return 0;
1560 }
1561
1562 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1563
1564 /*
1565  * fd.io coding-style-patch-verification: ON
1566  *
1567  * Local Variables:
1568  * eval: (c-set-style "gnu")
1569  * End:
1570  */