tcp: handle fin+rst in same frame
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25
26 session_main_t session_main;
27
28 static inline int
29 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
30                             session_evt_type_t evt_type)
31 {
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35   u32 tries = 0, max_tries;
36
37   mq = session_main_get_vpp_event_queue (thread_index);
38   while (svm_msg_q_try_lock (mq))
39     {
40       max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
41       if (tries++ == max_tries)
42         {
43           SESSION_DBG ("failed to enqueue evt");
44           return -1;
45         }
46     }
47   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
48     {
49       svm_msg_q_unlock (mq);
50       return -2;
51     }
52   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
53   if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
54     {
55       svm_msg_q_unlock (mq);
56       return -2;
57     }
58   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59   evt->event_type = evt_type;
60   switch (evt_type)
61     {
62     case SESSION_CTRL_EVT_RPC:
63       evt->rpc_args.fp = data;
64       evt->rpc_args.arg = args;
65       break;
66     case SESSION_IO_EVT_TX:
67     case SESSION_IO_EVT_TX_FLUSH:
68     case SESSION_IO_EVT_BUILTIN_RX:
69       evt->session_index = *(u32 *) data;
70       break;
71     case SESSION_IO_EVT_BUILTIN_TX:
72     case SESSION_CTRL_EVT_CLOSE:
73       evt->session_handle = session_handle ((session_t *) data);
74       break;
75     default:
76       clib_warning ("evt unhandled!");
77       svm_msg_q_unlock (mq);
78       return -1;
79     }
80
81   svm_msg_q_add_and_unlock (mq, &msg);
82   return 0;
83 }
84
85 int
86 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
87 {
88   return session_send_evt_to_thread (&f->master_session_index, 0,
89                                      f->master_thread_index, evt_type);
90 }
91
92 int
93 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
94                                       session_evt_type_t evt_type)
95 {
96   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
97 }
98
99 int
100 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
101 {
102   /* only event supported for now is disconnect */
103   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE);
104   return session_send_evt_to_thread (s, 0, s->thread_index,
105                                      SESSION_CTRL_EVT_CLOSE);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 static void
129 session_program_transport_close (session_t * s)
130 {
131   u32 thread_index = vlib_get_thread_index ();
132   session_worker_t *wrk;
133   session_event_t *evt;
134
135   /* If we are in the handler thread, or being called with the worker barrier
136    * held, just append a new event to pending disconnects vector. */
137   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
138     {
139       wrk = session_main_get_worker (s->thread_index);
140       vec_add2 (wrk->pending_disconnects, evt, 1);
141       clib_memset (evt, 0, sizeof (*evt));
142       evt->session_handle = session_handle (s);
143       evt->event_type = SESSION_CTRL_EVT_CLOSE;
144     }
145   else
146     session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE);
147 }
148
149 session_t *
150 session_alloc (u32 thread_index)
151 {
152   session_worker_t *wrk = &session_main.wrk[thread_index];
153   session_t *s;
154   u8 will_expand = 0;
155   pool_get_aligned_will_expand (wrk->sessions, will_expand,
156                                 CLIB_CACHE_LINE_BYTES);
157   /* If we have peekers, let them finish */
158   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
159     {
160       clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
161       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
162       clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
163     }
164   else
165     {
166       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
167     }
168   clib_memset (s, 0, sizeof (*s));
169   s->session_index = s - wrk->sessions;
170   s->thread_index = thread_index;
171   return s;
172 }
173
174 void
175 session_free (session_t * s)
176 {
177   if (CLIB_DEBUG)
178     {
179       u8 thread_index = s->thread_index;
180       clib_memset (s, 0xFA, sizeof (*s));
181       pool_put (session_main.wrk[thread_index].sessions, s);
182       return;
183     }
184   SESSION_EVT_DBG (SESSION_EVT_FREE, s);
185   pool_put (session_main.wrk[s->thread_index].sessions, s);
186 }
187
188 void
189 session_free_w_fifos (session_t * s)
190 {
191   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
192   session_free (s);
193 }
194
195 /**
196  * Cleans up session and lookup table.
197  *
198  * Transport connection must still be valid.
199  */
200 static void
201 session_delete (session_t * s)
202 {
203   int rv;
204
205   /* Delete from the main lookup table. */
206   if ((rv = session_lookup_del_session (s)))
207     clib_warning ("hash delete error, rv %d", rv);
208
209   session_free_w_fifos (s);
210 }
211
212 static session_t *
213 session_alloc_for_connection (transport_connection_t * tc)
214 {
215   session_t *s;
216   u32 thread_index = tc->thread_index;
217
218   ASSERT (thread_index == vlib_get_thread_index ()
219           || transport_protocol_is_cl (tc->proto));
220
221   s = session_alloc (thread_index);
222   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
223   s->session_state = SESSION_STATE_CLOSED;
224
225   /* Attach transport to session and vice versa */
226   s->connection_index = tc->c_index;
227   tc->s_index = s->session_index;
228   return s;
229 }
230
231 /**
232  * Discards bytes from buffer chain
233  *
234  * It discards n_bytes_to_drop starting at first buffer after chain_b
235  */
236 always_inline void
237 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
238                                      vlib_buffer_t ** chain_b,
239                                      u32 n_bytes_to_drop)
240 {
241   vlib_buffer_t *next = *chain_b;
242   u32 to_drop = n_bytes_to_drop;
243   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
244   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
245     {
246       next = vlib_get_buffer (vm, next->next_buffer);
247       if (next->current_length > to_drop)
248         {
249           vlib_buffer_advance (next, to_drop);
250           to_drop = 0;
251         }
252       else
253         {
254           to_drop -= next->current_length;
255           next->current_length = 0;
256         }
257     }
258   *chain_b = next;
259
260   if (to_drop == 0)
261     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
262 }
263
264 /**
265  * Enqueue buffer chain tail
266  */
267 always_inline int
268 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
269                             u32 offset, u8 is_in_order)
270 {
271   vlib_buffer_t *chain_b;
272   u32 chain_bi, len, diff;
273   vlib_main_t *vm = vlib_get_main ();
274   u8 *data;
275   u32 written = 0;
276   int rv = 0;
277
278   if (is_in_order && offset)
279     {
280       diff = offset - b->current_length;
281       if (diff > b->total_length_not_including_first_buffer)
282         return 0;
283       chain_b = b;
284       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
285       chain_bi = vlib_get_buffer_index (vm, chain_b);
286     }
287   else
288     chain_bi = b->next_buffer;
289
290   do
291     {
292       chain_b = vlib_get_buffer (vm, chain_bi);
293       data = vlib_buffer_get_current (chain_b);
294       len = chain_b->current_length;
295       if (!len)
296         continue;
297       if (is_in_order)
298         {
299           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
300           if (rv == len)
301             {
302               written += rv;
303             }
304           else if (rv < len)
305             {
306               return (rv > 0) ? (written + rv) : written;
307             }
308           else if (rv > len)
309             {
310               written += rv;
311
312               /* written more than what was left in chain */
313               if (written > b->total_length_not_including_first_buffer)
314                 return written;
315
316               /* drop the bytes that have already been delivered */
317               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
318             }
319         }
320       else
321         {
322           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
323           if (rv)
324             {
325               clib_warning ("failed to enqueue multi-buffer seg");
326               return -1;
327             }
328           offset += len;
329         }
330     }
331   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
332           ? chain_b->next_buffer : 0));
333
334   if (is_in_order)
335     return written;
336
337   return 0;
338 }
339
340 /*
341  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
342  * event but on request can queue notification events for later delivery by
343  * calling stream_server_flush_enqueue_events().
344  *
345  * @param tc Transport connection which is to be enqueued data
346  * @param b Buffer to be enqueued
347  * @param offset Offset at which to start enqueueing if out-of-order
348  * @param queue_event Flag to indicate if peer is to be notified or if event
349  *                    is to be queued. The former is useful when more data is
350  *                    enqueued and only one event is to be generated.
351  * @param is_in_order Flag to indicate if data is in order
352  * @return Number of bytes enqueued or a negative value if enqueueing failed.
353  */
354 int
355 session_enqueue_stream_connection (transport_connection_t * tc,
356                                    vlib_buffer_t * b, u32 offset,
357                                    u8 queue_event, u8 is_in_order)
358 {
359   session_t *s;
360   int enqueued = 0, rv, in_order_off;
361
362   s = session_get (tc->s_index, tc->thread_index);
363
364   if (is_in_order)
365     {
366       enqueued = svm_fifo_enqueue (s->rx_fifo,
367                                    b->current_length,
368                                    vlib_buffer_get_current (b));
369       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
370                          && enqueued >= 0))
371         {
372           in_order_off = enqueued > b->current_length ? enqueued : 0;
373           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
374           if (rv > 0)
375             enqueued += rv;
376         }
377     }
378   else
379     {
380       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
381                                          b->current_length,
382                                          vlib_buffer_get_current (b));
383       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
384         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
385       /* if something was enqueued, report even this as success for ooo
386        * segment handling */
387       return rv;
388     }
389
390   if (queue_event)
391     {
392       /* Queue RX event on this fifo. Eventually these will need to be flushed
393        * by calling stream_server_flush_enqueue_events () */
394       session_worker_t *wrk;
395
396       wrk = session_main_get_worker (s->thread_index);
397       if (!(s->flags & SESSION_F_RX_EVT))
398         {
399           s->flags |= SESSION_F_RX_EVT;
400           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
401         }
402     }
403
404   return enqueued;
405 }
406
407 int
408 session_enqueue_dgram_connection (session_t * s,
409                                   session_dgram_hdr_t * hdr,
410                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
411 {
412   int enqueued = 0, rv, in_order_off;
413
414   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
415           >= b->current_length + sizeof (*hdr));
416
417   svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
418   enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
419                                vlib_buffer_get_current (b));
420   if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
421     {
422       in_order_off = enqueued > b->current_length ? enqueued : 0;
423       rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
424       if (rv > 0)
425         enqueued += rv;
426     }
427   if (queue_event)
428     {
429       /* Queue RX event on this fifo. Eventually these will need to be flushed
430        * by calling stream_server_flush_enqueue_events () */
431       session_worker_t *wrk;
432
433       wrk = session_main_get_worker (s->thread_index);
434       if (!(s->flags & SESSION_F_RX_EVT))
435         {
436           s->flags |= SESSION_F_RX_EVT;
437           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
438         }
439     }
440   return enqueued;
441 }
442
443 int
444 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
445                             u32 offset, u32 max_bytes)
446 {
447   session_t *s = session_get (tc->s_index, tc->thread_index);
448   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
449 }
450
451 u32
452 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
453 {
454   session_t *s = session_get (tc->s_index, tc->thread_index);
455
456   if (svm_fifo_needs_tx_ntf (s->tx_fifo, max_bytes))
457     session_dequeue_notify (s);
458
459   return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
460 }
461
462 static inline int
463 session_notify_subscribers (u32 app_index, session_t * s,
464                             svm_fifo_t * f, session_evt_type_t evt_type)
465 {
466   app_worker_t *app_wrk;
467   application_t *app;
468   int i;
469
470   app = application_get (app_index);
471   if (!app)
472     return -1;
473
474   for (i = 0; i < f->n_subscribers; i++)
475     {
476       app_wrk = application_get_worker (app, f->subscribers[i]);
477       if (!app_wrk)
478         continue;
479       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
480         return -1;
481     }
482
483   return 0;
484 }
485
486 /**
487  * Notify session peer that new data has been enqueued.
488  *
489  * @param s     Stream session for which the event is to be generated.
490  * @param lock  Flag to indicate if call should lock message queue.
491  *
492  * @return 0 on success or negative number if failed to send notification.
493  */
494 static inline int
495 session_enqueue_notify_inline (session_t * s)
496 {
497   app_worker_t *app_wrk;
498   u32 session_index;
499   u8 n_subscribers;
500
501   session_index = s->session_index;
502   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
503
504   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
505   if (PREDICT_FALSE (!app_wrk))
506     {
507       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
508       return 0;
509     }
510
511   /* *INDENT-OFF* */
512   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
513       ed->data[0] = SESSION_IO_EVT_RX;
514       ed->data[1] = svm_fifo_max_dequeue_prod (s->rx_fifo);
515   }));
516   /* *INDENT-ON* */
517
518   s->flags &= ~SESSION_F_RX_EVT;
519   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
520                                                      SESSION_IO_EVT_RX)))
521     return -1;
522
523   if (PREDICT_FALSE (n_subscribers))
524     {
525       s = session_get (session_index, vlib_get_thread_index ());
526       return session_notify_subscribers (app_wrk->app_index, s,
527                                          s->rx_fifo, SESSION_IO_EVT_RX);
528     }
529
530   return 0;
531 }
532
533 int
534 session_enqueue_notify (session_t * s)
535 {
536   return session_enqueue_notify_inline (s);
537 }
538
539 int
540 session_dequeue_notify (session_t * s)
541 {
542   app_worker_t *app_wrk;
543
544   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
545   if (PREDICT_FALSE (!app_wrk))
546     return -1;
547
548   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
549                                                      SESSION_IO_EVT_TX)))
550     return -1;
551
552   if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
553     return session_notify_subscribers (app_wrk->app_index, s,
554                                        s->tx_fifo, SESSION_IO_EVT_TX);
555
556   svm_fifo_clear_tx_ntf (s->tx_fifo);
557
558   return 0;
559 }
560
561 /**
562  * Flushes queue of sessions that are to be notified of new data
563  * enqueued events.
564  *
565  * @param thread_index Thread index for which the flush is to be performed.
566  * @return 0 on success or a positive number indicating the number of
567  *         failures due to API queue being full.
568  */
569 int
570 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
571 {
572   session_worker_t *wrk = session_main_get_worker (thread_index);
573   session_t *s;
574   int i, errors = 0;
575   u32 *indices;
576
577   indices = wrk->session_to_enqueue[transport_proto];
578
579   for (i = 0; i < vec_len (indices); i++)
580     {
581       s = session_get_if_valid (indices[i], thread_index);
582       if (PREDICT_FALSE (!s))
583         {
584           errors++;
585           continue;
586         }
587
588       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
589         errors++;
590     }
591
592   vec_reset_length (indices);
593   wrk->session_to_enqueue[transport_proto] = indices;
594
595   return errors;
596 }
597
598 int
599 session_main_flush_all_enqueue_events (u8 transport_proto)
600 {
601   vlib_thread_main_t *vtm = vlib_get_thread_main ();
602   int i, errors = 0;
603   for (i = 0; i < 1 + vtm->n_threads; i++)
604     errors += session_main_flush_enqueue_events (transport_proto, i);
605   return errors;
606 }
607
608 int
609 session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
610 {
611   u32 opaque = 0, new_ti, new_si;
612   app_worker_t *app_wrk;
613   session_t *s = 0;
614   u64 ho_handle;
615
616   /*
617    * Find connection handle and cleanup half-open table
618    */
619   ho_handle = session_lookup_half_open_handle (tc);
620   if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
621     {
622       SESSION_DBG ("half-open was removed!");
623       return -1;
624     }
625   session_lookup_del_half_open (tc);
626
627   /* Get the app's index from the handle we stored when opening connection
628    * and the opaque (api_context for external apps) from transport session
629    * index */
630   app_wrk = app_worker_get_if_valid (ho_handle >> 32);
631   if (!app_wrk)
632     return -1;
633
634   opaque = tc->s_index;
635
636   if (is_fail)
637     return app_worker_connect_notify (app_wrk, s, opaque);
638
639   s = session_alloc_for_connection (tc);
640   s->session_state = SESSION_STATE_CONNECTING;
641   s->app_wrk_index = app_wrk->wrk_index;
642   new_si = s->session_index;
643   new_ti = s->thread_index;
644
645   if (app_worker_init_connected (app_wrk, s))
646     {
647       session_free (s);
648       app_worker_connect_notify (app_wrk, 0, opaque);
649       return -1;
650     }
651
652   if (app_worker_connect_notify (app_wrk, s, opaque))
653     {
654       s = session_get (new_si, new_ti);
655       session_free_w_fifos (s);
656       return -1;
657     }
658
659   s = session_get (new_si, new_ti);
660   s->session_state = SESSION_STATE_READY;
661   session_lookup_add_connection (tc, session_handle (s));
662
663   return 0;
664 }
665
666 typedef struct _session_switch_pool_args
667 {
668   u32 session_index;
669   u32 thread_index;
670   u32 new_thread_index;
671   u32 new_session_index;
672 } session_switch_pool_args_t;
673
674 static void
675 session_switch_pool (void *cb_args)
676 {
677   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
678   session_t *s;
679   ASSERT (args->thread_index == vlib_get_thread_index ());
680   s = session_get (args->session_index, args->thread_index);
681   s->tx_fifo->master_session_index = args->new_session_index;
682   s->tx_fifo->master_thread_index = args->new_thread_index;
683   transport_cleanup (session_get_transport_proto (s), s->connection_index,
684                      s->thread_index);
685   session_free (s);
686   clib_mem_free (cb_args);
687 }
688
689 /**
690  * Move dgram session to the right thread
691  */
692 int
693 session_dgram_connect_notify (transport_connection_t * tc,
694                               u32 old_thread_index, session_t ** new_session)
695 {
696   session_t *new_s;
697   session_switch_pool_args_t *rpc_args;
698
699   /*
700    * Clone half-open session to the right thread.
701    */
702   new_s = session_clone_safe (tc->s_index, old_thread_index);
703   new_s->connection_index = tc->c_index;
704   new_s->rx_fifo->master_session_index = new_s->session_index;
705   new_s->rx_fifo->master_thread_index = new_s->thread_index;
706   new_s->session_state = SESSION_STATE_READY;
707   session_lookup_add_connection (tc, session_handle (new_s));
708
709   /*
710    * Ask thread owning the old session to clean it up and make us the tx
711    * fifo owner
712    */
713   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
714   rpc_args->new_session_index = new_s->session_index;
715   rpc_args->new_thread_index = new_s->thread_index;
716   rpc_args->session_index = tc->s_index;
717   rpc_args->thread_index = old_thread_index;
718   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
719                                   rpc_args);
720
721   tc->s_index = new_s->session_index;
722   new_s->connection_index = tc->c_index;
723   *new_session = new_s;
724   return 0;
725 }
726
727 /**
728  * Notification from transport that connection is being closed.
729  *
730  * A disconnect is sent to application but state is not removed. Once
731  * disconnect is acknowledged by application, session disconnect is called.
732  * Ultimately this leads to close being called on transport (passive close).
733  */
734 void
735 session_transport_closing_notify (transport_connection_t * tc)
736 {
737   app_worker_t *app_wrk;
738   session_t *s;
739
740   s = session_get (tc->s_index, tc->thread_index);
741   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
742     return;
743   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
744   app_wrk = app_worker_get (s->app_wrk_index);
745   app_worker_close_notify (app_wrk, s);
746 }
747
748 /**
749  * Notification from transport that connection is being deleted
750  *
751  * This removes the session if it is still valid. It should be called only on
752  * previously fully established sessions. For instance failed connects should
753  * call stream_session_connect_notify and indicate that the connect has
754  * failed.
755  */
756 void
757 session_transport_delete_notify (transport_connection_t * tc)
758 {
759   session_t *s;
760
761   /* App might've been removed already */
762   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
763     return;
764
765   /* Make sure we don't try to send anything more */
766   svm_fifo_dequeue_drop_all (s->tx_fifo);
767
768   switch (s->session_state)
769     {
770     case SESSION_STATE_CREATED:
771       /* Session was created but accept notification was not yet sent to the
772        * app. Cleanup everything. */
773       session_lookup_del_session (s);
774       session_free_w_fifos (s);
775       break;
776     case SESSION_STATE_ACCEPTING:
777     case SESSION_STATE_TRANSPORT_CLOSING:
778       /* If transport finishes or times out before we get a reply
779        * from the app, mark transport as closed and wait for reply
780        * before removing the session. Cleanup session table in advance
781        * because transport will soon be closed and closed sessions
782        * are assumed to have been removed from the lookup table */
783       session_lookup_del_session (s);
784       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
785       break;
786     case SESSION_STATE_CLOSING:
787     case SESSION_STATE_CLOSED_WAITING:
788       /* Cleanup lookup table as transport needs to still be valid.
789        * Program transport close to ensure that all session events
790        * have been cleaned up. Once transport close is called, the
791        * session is just removed because both transport and app have
792        * confirmed the close*/
793       session_lookup_del_session (s);
794       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
795       session_program_transport_close (s);
796       break;
797     case SESSION_STATE_TRANSPORT_CLOSED:
798       break;
799     case SESSION_STATE_CLOSED:
800       session_delete (s);
801       break;
802     default:
803       clib_warning ("session state %u", s->session_state);
804       session_delete (s);
805       break;
806     }
807 }
808
809 /**
810  * Notification from transport that session can be closed
811  *
812  * Should be called by transport only if it was closed with non-empty
813  * tx fifo and once it decides to begin the closing procedure prior to
814  * issuing a delete notify. This gives the chance to the session layer
815  * to cleanup any outstanding events.
816  */
817 void
818 session_transport_closed_notify (transport_connection_t * tc)
819 {
820   session_t *s;
821
822   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
823     return;
824
825   /* Transport thinks that app requested close but it actually didn't.
826    * Can happen for tcp if fin and rst are received in close succession. */
827   if (s->session_state == SESSION_STATE_READY)
828     {
829       session_transport_closing_notify (tc);
830       svm_fifo_dequeue_drop_all (s->tx_fifo);
831     }
832   /* If app close has not been received or has not yet resulted in
833    * a transport close, only mark the session transport as closed */
834   else if (s->session_state <= SESSION_STATE_CLOSING)
835     {
836       session_lookup_del_session (s);
837       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
838     }
839   else
840     s->session_state = SESSION_STATE_CLOSED;
841 }
842
843 /**
844  * Notify application that connection has been reset.
845  */
846 void
847 session_transport_reset_notify (transport_connection_t * tc)
848 {
849   app_worker_t *app_wrk;
850   session_t *s;
851
852   s = session_get (tc->s_index, tc->thread_index);
853   svm_fifo_dequeue_drop_all (s->tx_fifo);
854   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
855     return;
856   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
857   app_wrk = app_worker_get (s->app_wrk_index);
858   app_worker_reset_notify (app_wrk, s);
859 }
860
861 int
862 session_stream_accept_notify (transport_connection_t * tc)
863 {
864   app_worker_t *app_wrk;
865   session_t *s;
866
867   s = session_get (tc->s_index, tc->thread_index);
868   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
869   if (!app_wrk)
870     return -1;
871   s->session_state = SESSION_STATE_ACCEPTING;
872   return app_worker_accept_notify (app_wrk, s);
873 }
874
875 /**
876  * Accept a stream session. Optionally ping the server by callback.
877  */
878 int
879 session_stream_accept (transport_connection_t * tc, u32 listener_index,
880                        u8 notify)
881 {
882   session_t *s;
883   int rv;
884
885   s = session_alloc_for_connection (tc);
886   s->listener_index = listener_index;
887   s->session_state = SESSION_STATE_CREATED;
888
889   if ((rv = app_worker_init_accepted (s)))
890     return rv;
891
892   session_lookup_add_connection (tc, session_handle (s));
893
894   /* Shoulder-tap the server */
895   if (notify)
896     {
897       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
898       return app_worker_accept_notify (app_wrk, s);
899     }
900
901   return 0;
902 }
903
904 int
905 session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
906 {
907   transport_connection_t *tc;
908   transport_endpoint_cfg_t *tep;
909   app_worker_t *app_wrk;
910   session_handle_t sh;
911   session_t *s;
912   int rv;
913
914   tep = session_endpoint_to_transport_cfg (rmt);
915   rv = transport_connect (rmt->transport_proto, tep);
916   if (rv < 0)
917     {
918       SESSION_DBG ("Transport failed to open connection.");
919       return VNET_API_ERROR_SESSION_CONNECT;
920     }
921
922   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
923
924   /* For dgram type of service, allocate session and fifos now */
925   app_wrk = app_worker_get (app_wrk_index);
926   s = session_alloc_for_connection (tc);
927   s->app_wrk_index = app_wrk->wrk_index;
928   s->session_state = SESSION_STATE_OPENED;
929   if (app_worker_init_connected (app_wrk, s))
930     {
931       session_free (s);
932       return -1;
933     }
934
935   sh = session_handle (s);
936   session_lookup_add_connection (tc, sh);
937
938   return app_worker_connect_notify (app_wrk, s, opaque);
939 }
940
941 int
942 session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
943 {
944   transport_connection_t *tc;
945   transport_endpoint_cfg_t *tep;
946   u64 handle;
947   int rv;
948
949   tep = session_endpoint_to_transport_cfg (rmt);
950   rv = transport_connect (rmt->transport_proto, tep);
951   if (rv < 0)
952     {
953       SESSION_DBG ("Transport failed to open connection.");
954       return VNET_API_ERROR_SESSION_CONNECT;
955     }
956
957   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
958
959   /* If transport offers a stream service, only allocate session once the
960    * connection has been established.
961    * Add connection to half-open table and save app and tc index. The
962    * latter is needed to help establish the connection while the former
963    * is needed when the connect notify comes and we have to notify the
964    * external app
965    */
966   handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
967   session_lookup_add_half_open (tc, handle);
968
969   /* Store api_context (opaque) for when the reply comes. Not the nicest
970    * thing but better than allocating a separate half-open pool.
971    */
972   tc->s_index = opaque;
973   return 0;
974 }
975
976 int
977 session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
978 {
979   session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
980   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
981
982   sep->app_wrk_index = app_wrk_index;
983   sep->opaque = opaque;
984
985   return transport_connect (rmt->transport_proto, tep_cfg);
986 }
987
988 typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
989
990 /* *INDENT-OFF* */
991 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
992   session_open_vc,
993   session_open_cl,
994   session_open_app,
995 };
996 /* *INDENT-ON* */
997
998 /**
999  * Ask transport to open connection to remote transport endpoint.
1000  *
1001  * Stores handle for matching request with reply since the call can be
1002  * asynchronous. For instance, for TCP the 3-way handshake must complete
1003  * before reply comes. Session is only created once connection is established.
1004  *
1005  * @param app_index Index of the application requesting the connect
1006  * @param st Session type requested.
1007  * @param tep Remote transport endpoint
1008  * @param opaque Opaque data (typically, api_context) the application expects
1009  *               on open completion.
1010  */
1011 int
1012 session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1013 {
1014   transport_service_type_t tst;
1015   tst = transport_protocol_service_type (rmt->transport_proto);
1016   return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1017 }
1018
1019 /**
1020  * Ask transport to listen on session endpoint.
1021  *
1022  * @param s Session for which listen will be called. Note that unlike
1023  *          established sessions, listen sessions are not associated to a
1024  *          thread.
1025  * @param sep Local endpoint to be listened on.
1026  */
1027 int
1028 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1029 {
1030   transport_endpoint_t *tep;
1031   u32 tc_index, s_index;
1032
1033   /* Transport bind/listen */
1034   tep = session_endpoint_to_transport (sep);
1035   s_index = ls->session_index;
1036   tc_index = transport_start_listen (session_get_transport_proto (ls),
1037                                      s_index, tep);
1038
1039   if (tc_index == (u32) ~ 0)
1040     return -1;
1041
1042   /* Attach transport to session. Lookup tables are populated by the app
1043    * worker because local tables (for ct sessions) are not backed by a fib */
1044   ls = listen_session_get (s_index);
1045   ls->connection_index = tc_index;
1046
1047   return 0;
1048 }
1049
1050 /**
1051  * Ask transport to stop listening on local transport endpoint.
1052  *
1053  * @param s Session to stop listening on. It must be in state LISTENING.
1054  */
1055 int
1056 session_stop_listen (session_t * s)
1057 {
1058   transport_proto_t tp = session_get_transport_proto (s);
1059   transport_connection_t *tc;
1060
1061   if (s->session_state != SESSION_STATE_LISTENING)
1062     return -1;
1063
1064   tc = transport_get_listener (tp, s->connection_index);
1065   if (!tc)
1066     return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1067
1068   session_lookup_del_connection (tc);
1069   transport_stop_listen (tp, s->connection_index);
1070   return 0;
1071 }
1072
1073 /**
1074  * Initialize session closing procedure.
1075  *
1076  * Request is always sent to session node to ensure that all outstanding
1077  * requests are served before transport is notified.
1078  */
1079 void
1080 session_close (session_t * s)
1081 {
1082   if (!s)
1083     return;
1084
1085   if (s->session_state >= SESSION_STATE_CLOSING)
1086     {
1087       /* Session will only be removed once both app and transport
1088        * acknowledge the close */
1089       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1090         session_program_transport_close (s);
1091
1092       /* Session already closed. Clear the tx fifo */
1093       if (s->session_state == SESSION_STATE_CLOSED)
1094         svm_fifo_dequeue_drop_all (s->tx_fifo);
1095       return;
1096     }
1097
1098   s->session_state = SESSION_STATE_CLOSING;
1099   session_program_transport_close (s);
1100 }
1101
1102 /**
1103  * Notify transport the session can be disconnected. This should eventually
1104  * result in a delete notification that allows us to cleanup session state.
1105  * Called for both active/passive disconnects.
1106  *
1107  * Must be called from the session's thread.
1108  */
1109 void
1110 session_transport_close (session_t * s)
1111 {
1112   /* If transport is already closed, just free the session */
1113   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1114     {
1115       session_free_w_fifos (s);
1116       return;
1117     }
1118
1119   /* If tx queue wasn't drained, change state to closed waiting for transport.
1120    * This way, the transport, if it so wishes, can continue to try sending the
1121    * outstanding data (in closed state it cannot). It MUST however at one
1122    * point, either after sending everything or after a timeout, call delete
1123    * notify. This will finally lead to the complete cleanup of the session.
1124    */
1125   if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1126     s->session_state = SESSION_STATE_CLOSED_WAITING;
1127   else
1128     s->session_state = SESSION_STATE_CLOSED;
1129
1130   transport_close (session_get_transport_proto (s), s->connection_index,
1131                    s->thread_index);
1132 }
1133
1134 /**
1135  * Cleanup transport and session state.
1136  *
1137  * Notify transport of the cleanup and free the session. This should
1138  * be called only if transport reported some error and is already
1139  * closed.
1140  */
1141 void
1142 session_transport_cleanup (session_t * s)
1143 {
1144   s->session_state = SESSION_STATE_CLOSED;
1145
1146   /* Delete from main lookup table before we axe the the transport */
1147   session_lookup_del_session (s);
1148   transport_cleanup (session_get_transport_proto (s), s->connection_index,
1149                      s->thread_index);
1150   /* Since we called cleanup, no delete notification will come. So, make
1151    * sure the session is properly freed. */
1152   session_free_w_fifos (s);
1153 }
1154
1155 /**
1156  * Allocate event queues in the shared-memory segment
1157  *
1158  * That can either be a newly created memfd segment, that will need to be
1159  * mapped by all stack users, or the binary api's svm region. The latter is
1160  * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1161  * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1162  * vpp uses api svm region for event queues.
1163  */
1164 void
1165 session_vpp_event_queues_allocate (session_main_t * smm)
1166 {
1167   u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1168   ssvm_private_t *eqs = &smm->evt_qs_segment;
1169   api_main_t *am = &api_main;
1170   uword eqs_size = 64 << 20;
1171   pid_t vpp_pid = getpid ();
1172   void *oldheap;
1173   int i;
1174
1175   if (smm->configured_event_queue_length)
1176     evt_q_length = smm->configured_event_queue_length;
1177
1178   if (smm->evt_qs_use_memfd_seg)
1179     {
1180       if (smm->evt_qs_segment_size)
1181         eqs_size = smm->evt_qs_segment_size;
1182
1183       eqs->ssvm_size = eqs_size;
1184       eqs->i_am_master = 1;
1185       eqs->my_pid = vpp_pid;
1186       eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1187       eqs->requested_va = smm->session_baseva;
1188
1189       if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
1190         {
1191           clib_warning ("failed to initialize queue segment");
1192           return;
1193         }
1194     }
1195
1196   if (smm->evt_qs_use_memfd_seg)
1197     oldheap = ssvm_push_heap (eqs->sh);
1198   else
1199     oldheap = svm_push_data_heap (am->vlib_rp);
1200
1201   for (i = 0; i < vec_len (smm->wrk); i++)
1202     {
1203       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1204       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1205         {evt_q_length, evt_size, 0}
1206         ,
1207         {evt_q_length >> 1, 256, 0}
1208       };
1209       cfg->consumer_pid = 0;
1210       cfg->n_rings = 2;
1211       cfg->q_nitems = evt_q_length;
1212       cfg->ring_cfgs = rc;
1213       smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1214       if (smm->evt_qs_use_memfd_seg)
1215         {
1216           if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
1217             clib_warning ("eventfd returned");
1218         }
1219     }
1220
1221   if (smm->evt_qs_use_memfd_seg)
1222     ssvm_pop_heap (oldheap);
1223   else
1224     svm_pop_heap (oldheap);
1225 }
1226
1227 ssvm_private_t *
1228 session_main_get_evt_q_segment (void)
1229 {
1230   session_main_t *smm = &session_main;
1231   if (smm->evt_qs_use_memfd_seg)
1232     return &smm->evt_qs_segment;
1233   return 0;
1234 }
1235
1236 u64
1237 session_segment_handle (session_t * s)
1238 {
1239   svm_fifo_t *f;
1240
1241   if (!s->rx_fifo)
1242     return SESSION_INVALID_HANDLE;
1243
1244   f = s->rx_fifo;
1245   return segment_manager_make_segment_handle (f->segment_manager,
1246                                               f->segment_index);
1247 }
1248
1249 /* *INDENT-OFF* */
1250 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1251     session_tx_fifo_peek_and_snd,
1252     session_tx_fifo_dequeue_and_snd,
1253     session_tx_fifo_dequeue_internal,
1254     session_tx_fifo_dequeue_and_snd
1255 };
1256 /* *INDENT-ON* */
1257
1258 /**
1259  * Initialize session layer for given transport proto and ip version
1260  *
1261  * Allocates per session type (transport proto + ip version) data structures
1262  * and adds arc from session queue node to session type output node.
1263  */
1264 void
1265 session_register_transport (transport_proto_t transport_proto,
1266                             const transport_proto_vft_t * vft, u8 is_ip4,
1267                             u32 output_node)
1268 {
1269   session_main_t *smm = &session_main;
1270   session_type_t session_type;
1271   u32 next_index = ~0;
1272
1273   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1274
1275   vec_validate (smm->session_type_to_next, session_type);
1276   vec_validate (smm->session_tx_fns, session_type);
1277
1278   /* *INDENT-OFF* */
1279   if (output_node != ~0)
1280     {
1281       foreach_vlib_main (({
1282           next_index = vlib_node_add_next (this_vlib_main,
1283                                            session_queue_node.index,
1284                                            output_node);
1285       }));
1286     }
1287   /* *INDENT-ON* */
1288
1289   smm->session_type_to_next[session_type] = next_index;
1290   smm->session_tx_fns[session_type] = session_tx_fns[vft->tx_type];
1291 }
1292
1293 transport_connection_t *
1294 session_get_transport (session_t * s)
1295 {
1296   if (s->session_state != SESSION_STATE_LISTENING)
1297     return transport_get_connection (session_get_transport_proto (s),
1298                                      s->connection_index, s->thread_index);
1299   else
1300     return transport_get_listener (session_get_transport_proto (s),
1301                                    s->connection_index);
1302 }
1303
1304 void
1305 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1306 {
1307   if (s->session_state != SESSION_STATE_LISTENING)
1308     return transport_get_endpoint (session_get_transport_proto (s),
1309                                    s->connection_index, s->thread_index, tep,
1310                                    is_lcl);
1311   else
1312     return transport_get_listener_endpoint (session_get_transport_proto (s),
1313                                             s->connection_index, tep, is_lcl);
1314 }
1315
1316 transport_connection_t *
1317 listen_session_get_transport (session_t * s)
1318 {
1319   return transport_get_listener (session_get_transport_proto (s),
1320                                  s->connection_index);
1321 }
1322
1323 void
1324 session_flush_frames_main_thread (vlib_main_t * vm)
1325 {
1326   ASSERT (vlib_get_thread_index () == 0);
1327   vlib_process_signal_event_mt (vm, session_queue_process_node.index,
1328                                 SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
1329 }
1330
1331 static clib_error_t *
1332 session_manager_main_enable (vlib_main_t * vm)
1333 {
1334   segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1335   session_main_t *smm = &session_main;
1336   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1337   u32 num_threads, preallocated_sessions_per_worker;
1338   session_worker_t *wrk;
1339   int i;
1340
1341   num_threads = 1 /* main thread */  + vtm->n_threads;
1342
1343   if (num_threads < 1)
1344     return clib_error_return (0, "n_thread_stacks not set");
1345
1346   /* Allocate cache line aligned worker contexts */
1347   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1348
1349   for (i = 0; i < num_threads; i++)
1350     {
1351       wrk = &smm->wrk[i];
1352       vec_validate (wrk->free_event_vector, 128);
1353       _vec_len (wrk->free_event_vector) = 0;
1354       vec_validate (wrk->pending_event_vector, 128);
1355       _vec_len (wrk->pending_event_vector) = 0;
1356       vec_validate (wrk->pending_disconnects, 128);
1357       _vec_len (wrk->pending_disconnects) = 0;
1358       vec_validate (wrk->postponed_event_vector, 128);
1359       _vec_len (wrk->postponed_event_vector) = 0;
1360
1361       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
1362       wrk->dispatch_period = 500e-6;
1363
1364       if (num_threads > 1)
1365         clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1366     }
1367
1368 #if SESSION_DEBUG
1369   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1370 #endif
1371
1372   /* Allocate vpp event queues segment and queue */
1373   session_vpp_event_queues_allocate (smm);
1374
1375   /* Initialize fifo segment main baseva and timeout */
1376   sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1377   sm_args->size = smm->session_va_space_size;
1378   segment_manager_main_init (sm_args);
1379
1380   /* Preallocate sessions */
1381   if (smm->preallocated_sessions)
1382     {
1383       if (num_threads == 1)
1384         {
1385           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1386         }
1387       else
1388         {
1389           int j;
1390           preallocated_sessions_per_worker =
1391             (1.1 * (f64) smm->preallocated_sessions /
1392              (f64) (num_threads - 1));
1393
1394           for (j = 1; j < num_threads; j++)
1395             {
1396               pool_init_fixed (smm->wrk[j].sessions,
1397                                preallocated_sessions_per_worker);
1398             }
1399         }
1400     }
1401
1402   session_lookup_init ();
1403   app_namespaces_init ();
1404   transport_init ();
1405
1406   smm->is_enabled = 1;
1407
1408   /* Enable transports */
1409   transport_enable_disable (vm, 1);
1410   transport_init_tx_pacers_period ();
1411   return 0;
1412 }
1413
1414 void
1415 session_node_enable_disable (u8 is_en)
1416 {
1417   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1418   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1419   u8 have_workers = vtm->n_threads != 0;
1420
1421   /* *INDENT-OFF* */
1422   foreach_vlib_main (({
1423     if (have_workers && ii == 0)
1424       {
1425         vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1426                              state);
1427         if (is_en)
1428           {
1429             vlib_node_t *n = vlib_get_node (this_vlib_main,
1430                                             session_queue_process_node.index);
1431             vlib_start_process (this_vlib_main, n->runtime_index);
1432           }
1433         else
1434           {
1435             vlib_process_signal_event_mt (this_vlib_main,
1436                                           session_queue_process_node.index,
1437                                           SESSION_Q_PROCESS_STOP, 0);
1438           }
1439
1440         continue;
1441       }
1442     vlib_node_set_state (this_vlib_main, session_queue_node.index,
1443                          state);
1444   }));
1445   /* *INDENT-ON* */
1446 }
1447
1448 clib_error_t *
1449 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1450 {
1451   clib_error_t *error = 0;
1452   if (is_en)
1453     {
1454       if (session_main.is_enabled)
1455         return 0;
1456
1457       session_node_enable_disable (is_en);
1458       error = session_manager_main_enable (vm);
1459     }
1460   else
1461     {
1462       session_main.is_enabled = 0;
1463       session_node_enable_disable (is_en);
1464     }
1465
1466   return error;
1467 }
1468
1469 clib_error_t *
1470 session_manager_main_init (vlib_main_t * vm)
1471 {
1472   session_main_t *smm = &session_main;
1473   smm->session_baseva = HIGH_SEGMENT_BASEVA;
1474 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1475   smm->session_va_space_size = 128ULL << 30;
1476   smm->evt_qs_segment_size = 64 << 20;
1477 #else
1478   smm->session_va_space_size = 128 << 20;
1479   smm->evt_qs_segment_size = 1 << 20;
1480 #endif
1481   smm->is_enabled = 0;
1482   return 0;
1483 }
1484
1485 VLIB_INIT_FUNCTION (session_manager_main_init);
1486
1487 static clib_error_t *
1488 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1489 {
1490   session_main_t *smm = &session_main;
1491   u32 nitems;
1492   uword tmp;
1493
1494   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1495     {
1496       if (unformat (input, "event-queue-length %d", &nitems))
1497         {
1498           if (nitems >= 2048)
1499             smm->configured_event_queue_length = nitems;
1500           else
1501             clib_warning ("event queue length %d too small, ignored", nitems);
1502         }
1503       else if (unformat (input, "preallocated-sessions %d",
1504                          &smm->preallocated_sessions))
1505         ;
1506       else if (unformat (input, "v4-session-table-buckets %d",
1507                          &smm->configured_v4_session_table_buckets))
1508         ;
1509       else if (unformat (input, "v4-halfopen-table-buckets %d",
1510                          &smm->configured_v4_halfopen_table_buckets))
1511         ;
1512       else if (unformat (input, "v6-session-table-buckets %d",
1513                          &smm->configured_v6_session_table_buckets))
1514         ;
1515       else if (unformat (input, "v6-halfopen-table-buckets %d",
1516                          &smm->configured_v6_halfopen_table_buckets))
1517         ;
1518       else if (unformat (input, "v4-session-table-memory %U",
1519                          unformat_memory_size, &tmp))
1520         {
1521           if (tmp >= 0x100000000)
1522             return clib_error_return (0, "memory size %llx (%lld) too large",
1523                                       tmp, tmp);
1524           smm->configured_v4_session_table_memory = tmp;
1525         }
1526       else if (unformat (input, "v4-halfopen-table-memory %U",
1527                          unformat_memory_size, &tmp))
1528         {
1529           if (tmp >= 0x100000000)
1530             return clib_error_return (0, "memory size %llx (%lld) too large",
1531                                       tmp, tmp);
1532           smm->configured_v4_halfopen_table_memory = tmp;
1533         }
1534       else if (unformat (input, "v6-session-table-memory %U",
1535                          unformat_memory_size, &tmp))
1536         {
1537           if (tmp >= 0x100000000)
1538             return clib_error_return (0, "memory size %llx (%lld) too large",
1539                                       tmp, tmp);
1540           smm->configured_v6_session_table_memory = tmp;
1541         }
1542       else if (unformat (input, "v6-halfopen-table-memory %U",
1543                          unformat_memory_size, &tmp))
1544         {
1545           if (tmp >= 0x100000000)
1546             return clib_error_return (0, "memory size %llx (%lld) too large",
1547                                       tmp, tmp);
1548           smm->configured_v6_halfopen_table_memory = tmp;
1549         }
1550       else if (unformat (input, "local-endpoints-table-memory %U",
1551                          unformat_memory_size, &tmp))
1552         {
1553           if (tmp >= 0x100000000)
1554             return clib_error_return (0, "memory size %llx (%lld) too large",
1555                                       tmp, tmp);
1556           smm->local_endpoints_table_memory = tmp;
1557         }
1558       else if (unformat (input, "local-endpoints-table-buckets %d",
1559                          &smm->local_endpoints_table_buckets))
1560         ;
1561       else if (unformat (input, "evt_qs_memfd_seg"))
1562         smm->evt_qs_use_memfd_seg = 1;
1563       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1564                          &smm->evt_qs_segment_size))
1565         ;
1566       else
1567         return clib_error_return (0, "unknown input `%U'",
1568                                   format_unformat_error, input);
1569     }
1570   return 0;
1571 }
1572
1573 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1574
1575 /*
1576  * fd.io coding-style-patch-verification: ON
1577  *
1578  * Local Variables:
1579  * eval: (c-set-style "gnu")
1580  * End:
1581  */