session: send tx events when data is dequeued
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25
26 session_main_t session_main;
27
28 static inline int
29 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
30                             session_evt_type_t evt_type)
31 {
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35   u32 tries = 0, max_tries;
36
37   mq = session_main_get_vpp_event_queue (thread_index);
38   while (svm_msg_q_try_lock (mq))
39     {
40       max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
41       if (tries++ == max_tries)
42         {
43           SESSION_DBG ("failed to enqueue evt");
44           return -1;
45         }
46     }
47   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
48     {
49       svm_msg_q_unlock (mq);
50       return -2;
51     }
52   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
53   if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
54     {
55       svm_msg_q_unlock (mq);
56       return -2;
57     }
58   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59   evt->event_type = evt_type;
60   switch (evt_type)
61     {
62     case SESSION_CTRL_EVT_RPC:
63       evt->rpc_args.fp = data;
64       evt->rpc_args.arg = args;
65       break;
66     case SESSION_IO_EVT_TX:
67     case SESSION_IO_EVT_TX_FLUSH:
68     case SESSION_IO_EVT_BUILTIN_RX:
69       evt->session_index = *(u32 *) data;
70       break;
71     case SESSION_IO_EVT_BUILTIN_TX:
72     case SESSION_CTRL_EVT_CLOSE:
73       evt->session_handle = session_handle ((session_t *) data);
74       break;
75     default:
76       clib_warning ("evt unhandled!");
77       svm_msg_q_unlock (mq);
78       return -1;
79     }
80
81   svm_msg_q_add_and_unlock (mq, &msg);
82   return 0;
83 }
84
85 int
86 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
87 {
88   return session_send_evt_to_thread (&f->master_session_index, 0,
89                                      f->master_thread_index, evt_type);
90 }
91
92 int
93 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
94                                       session_evt_type_t evt_type)
95 {
96   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
97 }
98
99 int
100 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
101 {
102   /* only event supported for now is disconnect */
103   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE);
104   return session_send_evt_to_thread (s, 0, s->thread_index,
105                                      SESSION_CTRL_EVT_CLOSE);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 static void
129 session_program_transport_close (session_t * s)
130 {
131   u32 thread_index = vlib_get_thread_index ();
132   session_worker_t *wrk;
133   session_event_t *evt;
134
135   /* If we are in the handler thread, or being called with the worker barrier
136    * held, just append a new event to pending disconnects vector. */
137   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
138     {
139       wrk = session_main_get_worker (s->thread_index);
140       vec_add2 (wrk->pending_disconnects, evt, 1);
141       clib_memset (evt, 0, sizeof (*evt));
142       evt->session_handle = session_handle (s);
143       evt->event_type = SESSION_CTRL_EVT_CLOSE;
144     }
145   else
146     session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE);
147 }
148
149 session_t *
150 session_alloc (u32 thread_index)
151 {
152   session_worker_t *wrk = &session_main.wrk[thread_index];
153   session_t *s;
154   u8 will_expand = 0;
155   pool_get_aligned_will_expand (wrk->sessions, will_expand,
156                                 CLIB_CACHE_LINE_BYTES);
157   /* If we have peekers, let them finish */
158   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
159     {
160       clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
161       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
162       clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
163     }
164   else
165     {
166       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
167     }
168   clib_memset (s, 0, sizeof (*s));
169   s->session_index = s - wrk->sessions;
170   s->thread_index = thread_index;
171   return s;
172 }
173
174 void
175 session_free (session_t * s)
176 {
177   if (CLIB_DEBUG)
178     {
179       u8 thread_index = s->thread_index;
180       clib_memset (s, 0xFA, sizeof (*s));
181       pool_put (session_main.wrk[thread_index].sessions, s);
182       return;
183     }
184   SESSION_EVT_DBG (SESSION_EVT_FREE, s);
185   pool_put (session_main.wrk[s->thread_index].sessions, s);
186 }
187
188 void
189 session_free_w_fifos (session_t * s)
190 {
191   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
192   session_free (s);
193 }
194
195 /**
196  * Cleans up session and lookup table.
197  *
198  * Transport connection must still be valid.
199  */
200 static void
201 session_delete (session_t * s)
202 {
203   int rv;
204
205   /* Delete from the main lookup table. */
206   if ((rv = session_lookup_del_session (s)))
207     clib_warning ("hash delete error, rv %d", rv);
208
209   session_free_w_fifos (s);
210 }
211
212 static session_t *
213 session_alloc_for_connection (transport_connection_t * tc)
214 {
215   session_t *s;
216   u32 thread_index = tc->thread_index;
217
218   ASSERT (thread_index == vlib_get_thread_index ()
219           || transport_protocol_is_cl (tc->proto));
220
221   s = session_alloc (thread_index);
222   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
223   s->session_state = SESSION_STATE_CLOSED;
224
225   /* Attach transport to session and vice versa */
226   s->connection_index = tc->c_index;
227   tc->s_index = s->session_index;
228   return s;
229 }
230
231 /**
232  * Discards bytes from buffer chain
233  *
234  * It discards n_bytes_to_drop starting at first buffer after chain_b
235  */
236 always_inline void
237 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
238                                      vlib_buffer_t ** chain_b,
239                                      u32 n_bytes_to_drop)
240 {
241   vlib_buffer_t *next = *chain_b;
242   u32 to_drop = n_bytes_to_drop;
243   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
244   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
245     {
246       next = vlib_get_buffer (vm, next->next_buffer);
247       if (next->current_length > to_drop)
248         {
249           vlib_buffer_advance (next, to_drop);
250           to_drop = 0;
251         }
252       else
253         {
254           to_drop -= next->current_length;
255           next->current_length = 0;
256         }
257     }
258   *chain_b = next;
259
260   if (to_drop == 0)
261     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
262 }
263
264 /**
265  * Enqueue buffer chain tail
266  */
267 always_inline int
268 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
269                             u32 offset, u8 is_in_order)
270 {
271   vlib_buffer_t *chain_b;
272   u32 chain_bi, len, diff;
273   vlib_main_t *vm = vlib_get_main ();
274   u8 *data;
275   u32 written = 0;
276   int rv = 0;
277
278   if (is_in_order && offset)
279     {
280       diff = offset - b->current_length;
281       if (diff > b->total_length_not_including_first_buffer)
282         return 0;
283       chain_b = b;
284       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
285       chain_bi = vlib_get_buffer_index (vm, chain_b);
286     }
287   else
288     chain_bi = b->next_buffer;
289
290   do
291     {
292       chain_b = vlib_get_buffer (vm, chain_bi);
293       data = vlib_buffer_get_current (chain_b);
294       len = chain_b->current_length;
295       if (!len)
296         continue;
297       if (is_in_order)
298         {
299           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
300           if (rv == len)
301             {
302               written += rv;
303             }
304           else if (rv < len)
305             {
306               return (rv > 0) ? (written + rv) : written;
307             }
308           else if (rv > len)
309             {
310               written += rv;
311
312               /* written more than what was left in chain */
313               if (written > b->total_length_not_including_first_buffer)
314                 return written;
315
316               /* drop the bytes that have already been delivered */
317               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
318             }
319         }
320       else
321         {
322           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
323           if (rv)
324             {
325               clib_warning ("failed to enqueue multi-buffer seg");
326               return -1;
327             }
328           offset += len;
329         }
330     }
331   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
332           ? chain_b->next_buffer : 0));
333
334   if (is_in_order)
335     return written;
336
337   return 0;
338 }
339
340 /*
341  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
342  * event but on request can queue notification events for later delivery by
343  * calling stream_server_flush_enqueue_events().
344  *
345  * @param tc Transport connection which is to be enqueued data
346  * @param b Buffer to be enqueued
347  * @param offset Offset at which to start enqueueing if out-of-order
348  * @param queue_event Flag to indicate if peer is to be notified or if event
349  *                    is to be queued. The former is useful when more data is
350  *                    enqueued and only one event is to be generated.
351  * @param is_in_order Flag to indicate if data is in order
352  * @return Number of bytes enqueued or a negative value if enqueueing failed.
353  */
354 int
355 session_enqueue_stream_connection (transport_connection_t * tc,
356                                    vlib_buffer_t * b, u32 offset,
357                                    u8 queue_event, u8 is_in_order)
358 {
359   session_t *s;
360   int enqueued = 0, rv, in_order_off;
361
362   s = session_get (tc->s_index, tc->thread_index);
363
364   if (is_in_order)
365     {
366       enqueued = svm_fifo_enqueue (s->rx_fifo,
367                                    b->current_length,
368                                    vlib_buffer_get_current (b));
369       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
370                          && enqueued >= 0))
371         {
372           in_order_off = enqueued > b->current_length ? enqueued : 0;
373           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
374           if (rv > 0)
375             enqueued += rv;
376         }
377     }
378   else
379     {
380       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
381                                          b->current_length,
382                                          vlib_buffer_get_current (b));
383       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
384         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
385       /* if something was enqueued, report even this as success for ooo
386        * segment handling */
387       return rv;
388     }
389
390   if (queue_event)
391     {
392       /* Queue RX event on this fifo. Eventually these will need to be flushed
393        * by calling stream_server_flush_enqueue_events () */
394       session_worker_t *wrk;
395
396       wrk = session_main_get_worker (s->thread_index);
397       if (!(s->flags & SESSION_F_RX_EVT))
398         {
399           s->flags |= SESSION_F_RX_EVT;
400           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
401         }
402     }
403
404   return enqueued;
405 }
406
407 int
408 session_enqueue_dgram_connection (session_t * s,
409                                   session_dgram_hdr_t * hdr,
410                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
411 {
412   int enqueued = 0, rv, in_order_off;
413
414   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
415           >= b->current_length + sizeof (*hdr));
416
417   svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
418   enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
419                                vlib_buffer_get_current (b));
420   if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
421     {
422       in_order_off = enqueued > b->current_length ? enqueued : 0;
423       rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
424       if (rv > 0)
425         enqueued += rv;
426     }
427   if (queue_event)
428     {
429       /* Queue RX event on this fifo. Eventually these will need to be flushed
430        * by calling stream_server_flush_enqueue_events () */
431       session_worker_t *wrk;
432
433       wrk = session_main_get_worker (s->thread_index);
434       if (!(s->flags & SESSION_F_RX_EVT))
435         {
436           s->flags |= SESSION_F_RX_EVT;
437           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
438         }
439     }
440   return enqueued;
441 }
442
443 int
444 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
445                             u32 offset, u32 max_bytes)
446 {
447   session_t *s = session_get (tc->s_index, tc->thread_index);
448   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
449 }
450
451 u32
452 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
453 {
454   session_t *s = session_get (tc->s_index, tc->thread_index);
455
456   if (svm_fifo_needs_tx_ntf (s->tx_fifo, max_bytes))
457     session_dequeue_notify (s);
458
459   return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
460 }
461
462 static inline int
463 session_notify_subscribers (u32 app_index, session_t * s,
464                             svm_fifo_t * f, session_evt_type_t evt_type)
465 {
466   app_worker_t *app_wrk;
467   application_t *app;
468   int i;
469
470   app = application_get (app_index);
471   if (!app)
472     return -1;
473
474   for (i = 0; i < f->n_subscribers; i++)
475     {
476       app_wrk = application_get_worker (app, f->subscribers[i]);
477       if (!app_wrk)
478         continue;
479       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
480         return -1;
481     }
482
483   return 0;
484 }
485
486 /**
487  * Notify session peer that new data has been enqueued.
488  *
489  * @param s     Stream session for which the event is to be generated.
490  * @param lock  Flag to indicate if call should lock message queue.
491  *
492  * @return 0 on success or negative number if failed to send notification.
493  */
494 static inline int
495 session_enqueue_notify_inline (session_t * s)
496 {
497   app_worker_t *app_wrk;
498   u32 session_index;
499   u8 n_subscribers;
500
501   session_index = s->session_index;
502   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
503
504   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
505   if (PREDICT_FALSE (!app_wrk))
506     {
507       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
508       return 0;
509     }
510
511   /* *INDENT-OFF* */
512   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
513       ed->data[0] = SESSION_IO_EVT_RX;
514       ed->data[1] = svm_fifo_max_dequeue_prod (s->rx_fifo);
515   }));
516   /* *INDENT-ON* */
517
518   s->flags &= ~SESSION_F_RX_EVT;
519   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
520                                                      SESSION_IO_EVT_RX)))
521     return -1;
522
523   if (PREDICT_FALSE (n_subscribers))
524     {
525       s = session_get (session_index, vlib_get_thread_index ());
526       return session_notify_subscribers (app_wrk->app_index, s,
527                                          s->rx_fifo, SESSION_IO_EVT_RX);
528     }
529
530   return 0;
531 }
532
533 int
534 session_enqueue_notify (session_t * s)
535 {
536   return session_enqueue_notify_inline (s);
537 }
538
539 int
540 session_dequeue_notify (session_t * s)
541 {
542   app_worker_t *app_wrk;
543
544   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
545   if (PREDICT_FALSE (!app_wrk))
546     return -1;
547
548   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
549                                                      SESSION_IO_EVT_TX)))
550     return -1;
551
552   if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
553     return session_notify_subscribers (app_wrk->app_index, s,
554                                        s->tx_fifo, SESSION_IO_EVT_TX);
555
556   svm_fifo_clear_tx_ntf (s->tx_fifo);
557
558   return 0;
559 }
560
561 /**
562  * Flushes queue of sessions that are to be notified of new data
563  * enqueued events.
564  *
565  * @param thread_index Thread index for which the flush is to be performed.
566  * @return 0 on success or a positive number indicating the number of
567  *         failures due to API queue being full.
568  */
569 int
570 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
571 {
572   session_worker_t *wrk = session_main_get_worker (thread_index);
573   session_t *s;
574   int i, errors = 0;
575   u32 *indices;
576
577   indices = wrk->session_to_enqueue[transport_proto];
578
579   for (i = 0; i < vec_len (indices); i++)
580     {
581       s = session_get_if_valid (indices[i], thread_index);
582       if (PREDICT_FALSE (!s))
583         {
584           errors++;
585           continue;
586         }
587
588       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
589         errors++;
590     }
591
592   vec_reset_length (indices);
593   wrk->session_to_enqueue[transport_proto] = indices;
594
595   return errors;
596 }
597
598 int
599 session_main_flush_all_enqueue_events (u8 transport_proto)
600 {
601   vlib_thread_main_t *vtm = vlib_get_thread_main ();
602   int i, errors = 0;
603   for (i = 0; i < 1 + vtm->n_threads; i++)
604     errors += session_main_flush_enqueue_events (transport_proto, i);
605   return errors;
606 }
607
608 int
609 session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
610 {
611   u32 opaque = 0, new_ti, new_si;
612   app_worker_t *app_wrk;
613   session_t *s = 0;
614   u64 ho_handle;
615
616   /*
617    * Find connection handle and cleanup half-open table
618    */
619   ho_handle = session_lookup_half_open_handle (tc);
620   if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
621     {
622       SESSION_DBG ("half-open was removed!");
623       return -1;
624     }
625   session_lookup_del_half_open (tc);
626
627   /* Get the app's index from the handle we stored when opening connection
628    * and the opaque (api_context for external apps) from transport session
629    * index */
630   app_wrk = app_worker_get_if_valid (ho_handle >> 32);
631   if (!app_wrk)
632     return -1;
633
634   opaque = tc->s_index;
635
636   if (is_fail)
637     return app_worker_connect_notify (app_wrk, s, opaque);
638
639   s = session_alloc_for_connection (tc);
640   s->session_state = SESSION_STATE_CONNECTING;
641   s->app_wrk_index = app_wrk->wrk_index;
642   new_si = s->session_index;
643   new_ti = s->thread_index;
644
645   if (app_worker_init_connected (app_wrk, s))
646     {
647       session_free (s);
648       app_worker_connect_notify (app_wrk, 0, opaque);
649       return -1;
650     }
651
652   if (app_worker_connect_notify (app_wrk, s, opaque))
653     {
654       s = session_get (new_si, new_ti);
655       session_free_w_fifos (s);
656       return -1;
657     }
658
659   s = session_get (new_si, new_ti);
660   s->session_state = SESSION_STATE_READY;
661   session_lookup_add_connection (tc, session_handle (s));
662
663   return 0;
664 }
665
666 typedef struct _session_switch_pool_args
667 {
668   u32 session_index;
669   u32 thread_index;
670   u32 new_thread_index;
671   u32 new_session_index;
672 } session_switch_pool_args_t;
673
674 static void
675 session_switch_pool (void *cb_args)
676 {
677   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
678   session_t *s;
679   ASSERT (args->thread_index == vlib_get_thread_index ());
680   s = session_get (args->session_index, args->thread_index);
681   s->tx_fifo->master_session_index = args->new_session_index;
682   s->tx_fifo->master_thread_index = args->new_thread_index;
683   transport_cleanup (session_get_transport_proto (s), s->connection_index,
684                      s->thread_index);
685   session_free (s);
686   clib_mem_free (cb_args);
687 }
688
689 /**
690  * Move dgram session to the right thread
691  */
692 int
693 session_dgram_connect_notify (transport_connection_t * tc,
694                               u32 old_thread_index, session_t ** new_session)
695 {
696   session_t *new_s;
697   session_switch_pool_args_t *rpc_args;
698
699   /*
700    * Clone half-open session to the right thread.
701    */
702   new_s = session_clone_safe (tc->s_index, old_thread_index);
703   new_s->connection_index = tc->c_index;
704   new_s->rx_fifo->master_session_index = new_s->session_index;
705   new_s->rx_fifo->master_thread_index = new_s->thread_index;
706   new_s->session_state = SESSION_STATE_READY;
707   session_lookup_add_connection (tc, session_handle (new_s));
708
709   /*
710    * Ask thread owning the old session to clean it up and make us the tx
711    * fifo owner
712    */
713   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
714   rpc_args->new_session_index = new_s->session_index;
715   rpc_args->new_thread_index = new_s->thread_index;
716   rpc_args->session_index = tc->s_index;
717   rpc_args->thread_index = old_thread_index;
718   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
719                                   rpc_args);
720
721   tc->s_index = new_s->session_index;
722   new_s->connection_index = tc->c_index;
723   *new_session = new_s;
724   return 0;
725 }
726
727 /**
728  * Notification from transport that connection is being closed.
729  *
730  * A disconnect is sent to application but state is not removed. Once
731  * disconnect is acknowledged by application, session disconnect is called.
732  * Ultimately this leads to close being called on transport (passive close).
733  */
734 void
735 session_transport_closing_notify (transport_connection_t * tc)
736 {
737   app_worker_t *app_wrk;
738   session_t *s;
739
740   s = session_get (tc->s_index, tc->thread_index);
741   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
742     return;
743   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
744   app_wrk = app_worker_get (s->app_wrk_index);
745   app_worker_close_notify (app_wrk, s);
746 }
747
748 /**
749  * Notification from transport that connection is being deleted
750  *
751  * This removes the session if it is still valid. It should be called only on
752  * previously fully established sessions. For instance failed connects should
753  * call stream_session_connect_notify and indicate that the connect has
754  * failed.
755  */
756 void
757 session_transport_delete_notify (transport_connection_t * tc)
758 {
759   session_t *s;
760
761   /* App might've been removed already */
762   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
763     return;
764
765   /* Make sure we don't try to send anything more */
766   svm_fifo_dequeue_drop_all (s->tx_fifo);
767
768   switch (s->session_state)
769     {
770     case SESSION_STATE_CREATED:
771       /* Session was created but accept notification was not yet sent to the
772        * app. Cleanup everything. */
773       session_lookup_del_session (s);
774       session_free_w_fifos (s);
775       break;
776     case SESSION_STATE_ACCEPTING:
777     case SESSION_STATE_TRANSPORT_CLOSING:
778       /* If transport finishes or times out before we get a reply
779        * from the app, mark transport as closed and wait for reply
780        * before removing the session. Cleanup session table in advance
781        * because transport will soon be closed and closed sessions
782        * are assumed to have been removed from the lookup table */
783       session_lookup_del_session (s);
784       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
785       break;
786     case SESSION_STATE_CLOSING:
787     case SESSION_STATE_CLOSED_WAITING:
788       /* Cleanup lookup table as transport needs to still be valid.
789        * Program transport close to ensure that all session events
790        * have been cleaned up. Once transport close is called, the
791        * session is just removed because both transport and app have
792        * confirmed the close*/
793       session_lookup_del_session (s);
794       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
795       session_program_transport_close (s);
796       break;
797     case SESSION_STATE_TRANSPORT_CLOSED:
798       break;
799     case SESSION_STATE_CLOSED:
800       session_delete (s);
801       break;
802     default:
803       clib_warning ("session state %u", s->session_state);
804       session_delete (s);
805       break;
806     }
807 }
808
809 /**
810  * Notification from transport that session can be closed
811  *
812  * Should be called by transport only if it was closed with non-empty
813  * tx fifo and once it decides to begin the closing procedure prior to
814  * issuing a delete notify. This gives the chance to the session layer
815  * to cleanup any outstanding events.
816  */
817 void
818 session_transport_closed_notify (transport_connection_t * tc)
819 {
820   session_t *s;
821
822   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
823     return;
824
825   /* If app close has not been received or has not yet resulted in
826    * a transport close, only mark the session transport as closed */
827   if (s->session_state <= SESSION_STATE_CLOSING)
828     {
829       session_lookup_del_session (s);
830       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
831     }
832   else
833     s->session_state = SESSION_STATE_CLOSED;
834 }
835
836 /**
837  * Notify application that connection has been reset.
838  */
839 void
840 session_transport_reset_notify (transport_connection_t * tc)
841 {
842   app_worker_t *app_wrk;
843   session_t *s;
844
845   s = session_get (tc->s_index, tc->thread_index);
846   svm_fifo_dequeue_drop_all (s->tx_fifo);
847   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
848     return;
849   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
850   app_wrk = app_worker_get (s->app_wrk_index);
851   app_worker_reset_notify (app_wrk, s);
852 }
853
854 int
855 session_stream_accept_notify (transport_connection_t * tc)
856 {
857   app_worker_t *app_wrk;
858   session_t *s;
859
860   s = session_get (tc->s_index, tc->thread_index);
861   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
862   if (!app_wrk)
863     return -1;
864   s->session_state = SESSION_STATE_ACCEPTING;
865   return app_worker_accept_notify (app_wrk, s);
866 }
867
868 /**
869  * Accept a stream session. Optionally ping the server by callback.
870  */
871 int
872 session_stream_accept (transport_connection_t * tc, u32 listener_index,
873                        u8 notify)
874 {
875   session_t *s;
876   int rv;
877
878   s = session_alloc_for_connection (tc);
879   s->listener_index = listener_index;
880   s->session_state = SESSION_STATE_CREATED;
881
882   if ((rv = app_worker_init_accepted (s)))
883     return rv;
884
885   session_lookup_add_connection (tc, session_handle (s));
886
887   /* Shoulder-tap the server */
888   if (notify)
889     {
890       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
891       return app_worker_accept_notify (app_wrk, s);
892     }
893
894   return 0;
895 }
896
897 int
898 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
899 {
900   transport_connection_t *tc;
901   transport_endpoint_cfg_t *tep;
902   app_worker_t *app_wrk;
903   session_handle_t sh;
904   session_t *s;
905   int rv;
906
907   tep = session_endpoint_to_transport_cfg (rmt);
908   rv = transport_connect (rmt->transport_proto, tep);
909   if (rv < 0)
910     {
911       SESSION_DBG ("Transport failed to open connection.");
912       return VNET_API_ERROR_SESSION_CONNECT;
913     }
914
915   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
916
917   /* For dgram type of service, allocate session and fifos now */
918   app_wrk = app_worker_get (app_wrk_index);
919   s = session_alloc_for_connection (tc);
920   s->app_wrk_index = app_wrk->wrk_index;
921   s->session_state = SESSION_STATE_OPENED;
922   if (app_worker_init_connected (app_wrk, s))
923     {
924       session_free (s);
925       return -1;
926     }
927
928   sh = session_handle (s);
929   session_lookup_add_connection (tc, sh);
930
931   return app_worker_connect_notify (app_wrk, s, opaque);
932 }
933
934 int
935 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
936 {
937   transport_connection_t *tc;
938   transport_endpoint_cfg_t *tep;
939   u64 handle;
940   int rv;
941
942   tep = session_endpoint_to_transport_cfg (rmt);
943   rv = transport_connect (rmt->transport_proto, tep);
944   if (rv < 0)
945     {
946       SESSION_DBG ("Transport failed to open connection.");
947       return VNET_API_ERROR_SESSION_CONNECT;
948     }
949
950   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
951
952   /* If transport offers a stream service, only allocate session once the
953    * connection has been established.
954    * Add connection to half-open table and save app and tc index. The
955    * latter is needed to help establish the connection while the former
956    * is needed when the connect notify comes and we have to notify the
957    * external app
958    */
959   handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
960   session_lookup_add_half_open (tc, handle);
961
962   /* Store api_context (opaque) for when the reply comes. Not the nicest
963    * thing but better than allocating a separate half-open pool.
964    */
965   tc->s_index = opaque;
966   return 0;
967 }
968
969 int
970 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
971 {
972   session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
973   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
974
975   sep->app_wrk_index = app_wrk_index;
976   sep->opaque = opaque;
977
978   return transport_connect (rmt->transport_proto, tep_cfg);
979 }
980
981 typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
982
983 /* *INDENT-OFF* */
984 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
985   session_open_vc,
986   session_open_cl,
987   session_open_app,
988 };
989 /* *INDENT-ON* */
990
991 /**
992  * Ask transport to open connection to remote transport endpoint.
993  *
994  * Stores handle for matching request with reply since the call can be
995  * asynchronous. For instance, for TCP the 3-way handshake must complete
996  * before reply comes. Session is only created once connection is established.
997  *
998  * @param app_index Index of the application requesting the connect
999  * @param st Session type requested.
1000  * @param tep Remote transport endpoint
1001  * @param opaque Opaque data (typically, api_context) the application expects
1002  *               on open completion.
1003  */
1004 int
1005 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1006 {
1007   transport_service_type_t tst;
1008   tst = transport_protocol_service_type (rmt->transport_proto);
1009   return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1010 }
1011
1012 /**
1013  * Ask transport to listen on session endpoint.
1014  *
1015  * @param s Session for which listen will be called. Note that unlike
1016  *          established sessions, listen sessions are not associated to a
1017  *          thread.
1018  * @param sep Local endpoint to be listened on.
1019  */
1020 int
1021 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1022 {
1023   transport_endpoint_t *tep;
1024   u32 tc_index, s_index;
1025
1026   /* Transport bind/listen */
1027   tep = session_endpoint_to_transport (sep);
1028   s_index = ls->session_index;
1029   tc_index = transport_start_listen (session_get_transport_proto (ls),
1030                                      s_index, tep);
1031
1032   if (tc_index == (u32) ~ 0)
1033     return -1;
1034
1035   /* Attach transport to session. Lookup tables are populated by the app
1036    * worker because local tables (for ct sessions) are not backed by a fib */
1037   ls = listen_session_get (s_index);
1038   ls->connection_index = tc_index;
1039
1040   return 0;
1041 }
1042
1043 /**
1044  * Ask transport to stop listening on local transport endpoint.
1045  *
1046  * @param s Session to stop listening on. It must be in state LISTENING.
1047  */
1048 int
1049 session_stop_listen (session_t * s)
1050 {
1051   transport_proto_t tp = session_get_transport_proto (s);
1052   transport_connection_t *tc;
1053
1054   if (s->session_state != SESSION_STATE_LISTENING)
1055     return -1;
1056
1057   tc = transport_get_listener (tp, s->connection_index);
1058   if (!tc)
1059     return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1060
1061   session_lookup_del_connection (tc);
1062   transport_stop_listen (tp, s->connection_index);
1063   return 0;
1064 }
1065
1066 /**
1067  * Initialize session closing procedure.
1068  *
1069  * Request is always sent to session node to ensure that all outstanding
1070  * requests are served before transport is notified.
1071  */
1072 void
1073 session_close (session_t * s)
1074 {
1075   if (!s)
1076     return;
1077
1078   if (s->session_state >= SESSION_STATE_CLOSING)
1079     {
1080       /* Session will only be removed once both app and transport
1081        * acknowledge the close */
1082       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1083         session_program_transport_close (s);
1084
1085       /* Session already closed. Clear the tx fifo */
1086       if (s->session_state == SESSION_STATE_CLOSED)
1087         svm_fifo_dequeue_drop_all (s->tx_fifo);
1088       return;
1089     }
1090
1091   s->session_state = SESSION_STATE_CLOSING;
1092   session_program_transport_close (s);
1093 }
1094
1095 /**
1096  * Notify transport the session can be disconnected. This should eventually
1097  * result in a delete notification that allows us to cleanup session state.
1098  * Called for both active/passive disconnects.
1099  *
1100  * Must be called from the session's thread.
1101  */
1102 void
1103 session_transport_close (session_t * s)
1104 {
1105   /* If transport is already closed, just free the session */
1106   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1107     {
1108       session_free_w_fifos (s);
1109       return;
1110     }
1111
1112   /* If tx queue wasn't drained, change state to closed waiting for transport.
1113    * This way, the transport, if it so wishes, can continue to try sending the
1114    * outstanding data (in closed state it cannot). It MUST however at one
1115    * point, either after sending everything or after a timeout, call delete
1116    * notify. This will finally lead to the complete cleanup of the session.
1117    */
1118   if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1119     s->session_state = SESSION_STATE_CLOSED_WAITING;
1120   else
1121     s->session_state = SESSION_STATE_CLOSED;
1122
1123   transport_close (session_get_transport_proto (s), s->connection_index,
1124                    s->thread_index);
1125 }
1126
1127 /**
1128  * Cleanup transport and session state.
1129  *
1130  * Notify transport of the cleanup and free the session. This should
1131  * be called only if transport reported some error and is already
1132  * closed.
1133  */
1134 void
1135 session_transport_cleanup (session_t * s)
1136 {
1137   s->session_state = SESSION_STATE_CLOSED;
1138
1139   /* Delete from main lookup table before we axe the the transport */
1140   session_lookup_del_session (s);
1141   transport_cleanup (session_get_transport_proto (s), s->connection_index,
1142                      s->thread_index);
1143   /* Since we called cleanup, no delete notification will come. So, make
1144    * sure the session is properly freed. */
1145   session_free_w_fifos (s);
1146 }
1147
1148 /**
1149  * Allocate event queues in the shared-memory segment
1150  *
1151  * That can either be a newly created memfd segment, that will need to be
1152  * mapped by all stack users, or the binary api's svm region. The latter is
1153  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1154  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1155  * vpp uses api svm region for event queues.
1156  */
1157 void
1158 session_vpp_event_queues_allocate (session_main_t * smm)
1159 {
1160   u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1161   ssvm_private_t *eqs = &smm->evt_qs_segment;
1162   api_main_t *am = &api_main;
1163   uword eqs_size = 64 << 20;
1164   pid_t vpp_pid = getpid ();
1165   void *oldheap;
1166   int i;
1167
1168   if (smm->configured_event_queue_length)
1169     evt_q_length = smm->configured_event_queue_length;
1170
1171   if (smm->evt_qs_use_memfd_seg)
1172     {
1173       if (smm->evt_qs_segment_size)
1174         eqs_size = smm->evt_qs_segment_size;
1175
1176       eqs->ssvm_size = eqs_size;
1177       eqs->i_am_master = 1;
1178       eqs->my_pid = vpp_pid;
1179       eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1180       eqs->requested_va = smm->session_baseva;
1181
1182       if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
1183         {
1184           clib_warning ("failed to initialize queue segment");
1185           return;
1186         }
1187     }
1188
1189   if (smm->evt_qs_use_memfd_seg)
1190     oldheap = ssvm_push_heap (eqs->sh);
1191   else
1192     oldheap = svm_push_data_heap (am->vlib_rp);
1193
1194   for (i = 0; i < vec_len (smm->wrk); i++)
1195     {
1196       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1197       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1198         {evt_q_length, evt_size, 0}
1199         ,
1200         {evt_q_length >> 1, 256, 0}
1201       };
1202       cfg->consumer_pid = 0;
1203       cfg->n_rings = 2;
1204       cfg->q_nitems = evt_q_length;
1205       cfg->ring_cfgs = rc;
1206       smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1207       if (smm->evt_qs_use_memfd_seg)
1208         {
1209           if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
1210             clib_warning ("eventfd returned");
1211         }
1212     }
1213
1214   if (smm->evt_qs_use_memfd_seg)
1215     ssvm_pop_heap (oldheap);
1216   else
1217     svm_pop_heap (oldheap);
1218 }
1219
1220 ssvm_private_t *
1221 session_main_get_evt_q_segment (void)
1222 {
1223   session_main_t *smm = &session_main;
1224   if (smm->evt_qs_use_memfd_seg)
1225     return &smm->evt_qs_segment;
1226   return 0;
1227 }
1228
1229 u64
1230 session_segment_handle (session_t * s)
1231 {
1232   svm_fifo_t *f;
1233
1234   if (!s->rx_fifo)
1235     return SESSION_INVALID_HANDLE;
1236
1237   f = s->rx_fifo;
1238   return segment_manager_make_segment_handle (f->segment_manager,
1239                                               f->segment_index);
1240 }
1241
1242 /* *INDENT-OFF* */
1243 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1244     session_tx_fifo_peek_and_snd,
1245     session_tx_fifo_dequeue_and_snd,
1246     session_tx_fifo_dequeue_internal,
1247     session_tx_fifo_dequeue_and_snd
1248 };
1249 /* *INDENT-ON* */
1250
1251 /**
1252  * Initialize session layer for given transport proto and ip version
1253  *
1254  * Allocates per session type (transport proto + ip version) data structures
1255  * and adds arc from session queue node to session type output node.
1256  */
1257 void
1258 session_register_transport (transport_proto_t transport_proto,
1259                             const transport_proto_vft_t * vft, u8 is_ip4,
1260                             u32 output_node)
1261 {
1262   session_main_t *smm = &session_main;
1263   session_type_t session_type;
1264   u32 next_index = ~0;
1265
1266   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1267
1268   vec_validate (smm->session_type_to_next, session_type);
1269   vec_validate (smm->session_tx_fns, session_type);
1270
1271   /* *INDENT-OFF* */
1272   if (output_node != ~0)
1273     {
1274       foreach_vlib_main (({
1275           next_index = vlib_node_add_next (this_vlib_main,
1276                                            session_queue_node.index,
1277                                            output_node);
1278       }));
1279     }
1280   /* *INDENT-ON* */
1281
1282   smm->session_type_to_next[session_type] = next_index;
1283   smm->session_tx_fns[session_type] = session_tx_fns[vft->tx_type];
1284 }
1285
1286 transport_connection_t *
1287 session_get_transport (session_t * s)
1288 {
1289   if (s->session_state != SESSION_STATE_LISTENING)
1290     return transport_get_connection (session_get_transport_proto (s),
1291                                      s->connection_index, s->thread_index);
1292   else
1293     return transport_get_listener (session_get_transport_proto (s),
1294                                    s->connection_index);
1295 }
1296
1297 void
1298 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1299 {
1300   if (s->session_state != SESSION_STATE_LISTENING)
1301     return transport_get_endpoint (session_get_transport_proto (s),
1302                                    s->connection_index, s->thread_index, tep,
1303                                    is_lcl);
1304   else
1305     return transport_get_listener_endpoint (session_get_transport_proto (s),
1306                                             s->connection_index, tep, is_lcl);
1307 }
1308
1309 transport_connection_t *
1310 listen_session_get_transport (session_t * s)
1311 {
1312   return transport_get_listener (session_get_transport_proto (s),
1313                                  s->connection_index);
1314 }
1315
1316 void
1317 session_flush_frames_main_thread (vlib_main_t * vm)
1318 {
1319   ASSERT (vlib_get_thread_index () == 0);
1320   vlib_process_signal_event_mt (vm, session_queue_process_node.index,
1321                                 SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
1322 }
1323
1324 static clib_error_t *
1325 session_manager_main_enable (vlib_main_t * vm)
1326 {
1327   segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1328   session_main_t *smm = &session_main;
1329   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1330   u32 num_threads, preallocated_sessions_per_worker;
1331   session_worker_t *wrk;
1332   int i;
1333
1334   num_threads = 1 /* main thread */  + vtm->n_threads;
1335
1336   if (num_threads < 1)
1337     return clib_error_return (0, "n_thread_stacks not set");
1338
1339   /* Allocate cache line aligned worker contexts */
1340   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1341
1342   for (i = 0; i < num_threads; i++)
1343     {
1344       wrk = &smm->wrk[i];
1345       vec_validate (wrk->free_event_vector, 128);
1346       _vec_len (wrk->free_event_vector) = 0;
1347       vec_validate (wrk->pending_event_vector, 128);
1348       _vec_len (wrk->pending_event_vector) = 0;
1349       vec_validate (wrk->pending_disconnects, 128);
1350       _vec_len (wrk->pending_disconnects) = 0;
1351       vec_validate (wrk->postponed_event_vector, 128);
1352       _vec_len (wrk->postponed_event_vector) = 0;
1353
1354       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
1355       wrk->dispatch_period = 500e-6;
1356
1357       if (num_threads > 1)
1358         clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1359     }
1360
1361 #if SESSION_DEBUG
1362   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1363 #endif
1364
1365   /* Allocate vpp event queues segment and queue */
1366   session_vpp_event_queues_allocate (smm);
1367
1368   /* Initialize fifo segment main baseva and timeout */
1369   sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1370   sm_args->size = smm->session_va_space_size;
1371   segment_manager_main_init (sm_args);
1372
1373   /* Preallocate sessions */
1374   if (smm->preallocated_sessions)
1375     {
1376       if (num_threads == 1)
1377         {
1378           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1379         }
1380       else
1381         {
1382           int j;
1383           preallocated_sessions_per_worker =
1384             (1.1 * (f64) smm->preallocated_sessions /
1385              (f64) (num_threads - 1));
1386
1387           for (j = 1; j < num_threads; j++)
1388             {
1389               pool_init_fixed (smm->wrk[j].sessions,
1390                                preallocated_sessions_per_worker);
1391             }
1392         }
1393     }
1394
1395   session_lookup_init ();
1396   app_namespaces_init ();
1397   transport_init ();
1398
1399   smm->is_enabled = 1;
1400
1401   /* Enable transports */
1402   transport_enable_disable (vm, 1);
1403   transport_init_tx_pacers_period ();
1404   return 0;
1405 }
1406
1407 void
1408 session_node_enable_disable (u8 is_en)
1409 {
1410   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1411   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1412   u8 have_workers = vtm->n_threads != 0;
1413
1414   /* *INDENT-OFF* */
1415   foreach_vlib_main (({
1416     if (have_workers && ii == 0)
1417       {
1418         vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1419                              state);
1420         if (is_en)
1421           {
1422             vlib_node_t *n = vlib_get_node (this_vlib_main,
1423                                             session_queue_process_node.index);
1424             vlib_start_process (this_vlib_main, n->runtime_index);
1425           }
1426         else
1427           {
1428             vlib_process_signal_event_mt (this_vlib_main,
1429                                           session_queue_process_node.index,
1430                                           SESSION_Q_PROCESS_STOP, 0);
1431           }
1432
1433         continue;
1434       }
1435     vlib_node_set_state (this_vlib_main, session_queue_node.index,
1436                          state);
1437   }));
1438   /* *INDENT-ON* */
1439 }
1440
1441 clib_error_t *
1442 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1443 {
1444   clib_error_t *error = 0;
1445   if (is_en)
1446     {
1447       if (session_main.is_enabled)
1448         return 0;
1449
1450       session_node_enable_disable (is_en);
1451       error = session_manager_main_enable (vm);
1452     }
1453   else
1454     {
1455       session_main.is_enabled = 0;
1456       session_node_enable_disable (is_en);
1457     }
1458
1459   return error;
1460 }
1461
1462 clib_error_t *
1463 session_manager_main_init (vlib_main_t * vm)
1464 {
1465   session_main_t *smm = &session_main;
1466   smm->session_baseva = HIGH_SEGMENT_BASEVA;
1467 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1468   smm->session_va_space_size = 128ULL << 30;
1469   smm->evt_qs_segment_size = 64 << 20;
1470 #else
1471   smm->session_va_space_size = 128 << 20;
1472   smm->evt_qs_segment_size = 1 << 20;
1473 #endif
1474   smm->is_enabled = 0;
1475   return 0;
1476 }
1477
1478 VLIB_INIT_FUNCTION (session_manager_main_init);
1479
1480 static clib_error_t *
1481 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1482 {
1483   session_main_t *smm = &session_main;
1484   u32 nitems;
1485   uword tmp;
1486
1487   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1488     {
1489       if (unformat (input, "event-queue-length %d", &nitems))
1490         {
1491           if (nitems >= 2048)
1492             smm->configured_event_queue_length = nitems;
1493           else
1494             clib_warning ("event queue length %d too small, ignored", nitems);
1495         }
1496       else if (unformat (input, "preallocated-sessions %d",
1497                          &smm->preallocated_sessions))
1498         ;
1499       else if (unformat (input, "v4-session-table-buckets %d",
1500                          &smm->configured_v4_session_table_buckets))
1501         ;
1502       else if (unformat (input, "v4-halfopen-table-buckets %d",
1503                          &smm->configured_v4_halfopen_table_buckets))
1504         ;
1505       else if (unformat (input, "v6-session-table-buckets %d",
1506                          &smm->configured_v6_session_table_buckets))
1507         ;
1508       else if (unformat (input, "v6-halfopen-table-buckets %d",
1509                          &smm->configured_v6_halfopen_table_buckets))
1510         ;
1511       else if (unformat (input, "v4-session-table-memory %U",
1512                          unformat_memory_size, &tmp))
1513         {
1514           if (tmp >= 0x100000000)
1515             return clib_error_return (0, "memory size %llx (%lld) too large",
1516                                       tmp, tmp);
1517           smm->configured_v4_session_table_memory = tmp;
1518         }
1519       else if (unformat (input, "v4-halfopen-table-memory %U",
1520                          unformat_memory_size, &tmp))
1521         {
1522           if (tmp >= 0x100000000)
1523             return clib_error_return (0, "memory size %llx (%lld) too large",
1524                                       tmp, tmp);
1525           smm->configured_v4_halfopen_table_memory = tmp;
1526         }
1527       else if (unformat (input, "v6-session-table-memory %U",
1528                          unformat_memory_size, &tmp))
1529         {
1530           if (tmp >= 0x100000000)
1531             return clib_error_return (0, "memory size %llx (%lld) too large",
1532                                       tmp, tmp);
1533           smm->configured_v6_session_table_memory = tmp;
1534         }
1535       else if (unformat (input, "v6-halfopen-table-memory %U",
1536                          unformat_memory_size, &tmp))
1537         {
1538           if (tmp >= 0x100000000)
1539             return clib_error_return (0, "memory size %llx (%lld) too large",
1540                                       tmp, tmp);
1541           smm->configured_v6_halfopen_table_memory = tmp;
1542         }
1543       else if (unformat (input, "local-endpoints-table-memory %U",
1544                          unformat_memory_size, &tmp))
1545         {
1546           if (tmp >= 0x100000000)
1547             return clib_error_return (0, "memory size %llx (%lld) too large",
1548                                       tmp, tmp);
1549           smm->local_endpoints_table_memory = tmp;
1550         }
1551       else if (unformat (input, "local-endpoints-table-buckets %d",
1552                          &smm->local_endpoints_table_buckets))
1553         ;
1554       else if (unformat (input, "evt_qs_memfd_seg"))
1555         smm->evt_qs_use_memfd_seg = 1;
1556       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1557                          &smm->evt_qs_segment_size))
1558         ;
1559       else
1560         return clib_error_return (0, "unknown input `%U'",
1561                                   format_unformat_error, input);
1562     }
1563   return 0;
1564 }
1565
1566 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1567
1568 /*
1569  * fd.io coding-style-patch-verification: ON
1570  *
1571  * Local Variables:
1572  * eval: (c-set-style "gnu")
1573  * End:
1574  */