vcl: allow more rx events on peek
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/plugin/plugin.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25 #include <vlib/stats/stats.h>
26 #include <vlib/dma/dma.h>
27
28 session_main_t session_main;
29
30 static inline int
31 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
32                             session_evt_type_t evt_type)
33 {
34   session_worker_t *wrk = session_main_get_worker (thread_index);
35   session_event_t *evt;
36   svm_msg_q_msg_t msg;
37   svm_msg_q_t *mq;
38
39   mq = wrk->vpp_event_queue;
40   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
41     return -1;
42   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
43     {
44       svm_msg_q_unlock (mq);
45       return -2;
46     }
47   switch (evt_type)
48     {
49     case SESSION_CTRL_EVT_RPC:
50       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
51       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
52       evt->rpc_args.fp = data;
53       evt->rpc_args.arg = args;
54       break;
55     case SESSION_IO_EVT_RX:
56     case SESSION_IO_EVT_TX:
57     case SESSION_IO_EVT_TX_FLUSH:
58     case SESSION_IO_EVT_BUILTIN_RX:
59       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
60       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
61       evt->session_index = *(u32 *) data;
62       break;
63     case SESSION_IO_EVT_TX_MAIN:
64     case SESSION_CTRL_EVT_CLOSE:
65     case SESSION_CTRL_EVT_RESET:
66       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
67       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
68       evt->session_handle = session_handle ((session_t *) data);
69       break;
70     default:
71       clib_warning ("evt unhandled!");
72       svm_msg_q_unlock (mq);
73       return -1;
74     }
75   evt->event_type = evt_type;
76
77   svm_msg_q_add_and_unlock (mq, &msg);
78
79   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
80     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
81
82   return 0;
83 }
84
85 int
86 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
87 {
88   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
89                                      f->master_thread_index, evt_type);
90 }
91
92 int
93 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
94                                       session_evt_type_t evt_type)
95 {
96   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
97 }
98
99 int
100 session_program_tx_io_evt (session_handle_tu_t sh, session_evt_type_t evt_type)
101 {
102   return session_send_evt_to_thread ((void *) &sh.session_index, 0,
103                                      (u32) sh.thread_index, evt_type);
104 }
105
106 int
107 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
108 {
109   /* only events supported are disconnect, shutdown and reset */
110   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
111           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
112           evt_type == SESSION_CTRL_EVT_RESET);
113   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
114 }
115
116 void
117 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
118                                       void *rpc_args)
119 {
120   session_send_evt_to_thread (fp, rpc_args, thread_index,
121                               SESSION_CTRL_EVT_RPC);
122 }
123
124 void
125 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
126 {
127   if (thread_index != vlib_get_thread_index ())
128     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
129   else
130     {
131       void (*fnp) (void *) = fp;
132       fnp (rpc_args);
133     }
134 }
135
136 void
137 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
138 {
139   session_t *s = session_get (tc->s_index, tc->thread_index);
140
141   ASSERT (s->thread_index == vlib_get_thread_index ());
142   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
143
144   if (!(s->flags & SESSION_F_CUSTOM_TX))
145     {
146       s->flags |= SESSION_F_CUSTOM_TX;
147       if (svm_fifo_set_event (s->tx_fifo)
148           || transport_connection_is_descheduled (tc))
149         {
150           session_evt_elt_t *elt;
151           session_worker_t *wrk;
152
153           wrk = session_main_get_worker (tc->thread_index);
154           if (has_prio)
155             elt = session_evt_alloc_new (wrk);
156           else
157             elt = session_evt_alloc_old (wrk);
158           elt->evt.session_index = tc->s_index;
159           elt->evt.event_type = SESSION_IO_EVT_TX;
160           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
161
162           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
163             vlib_node_set_interrupt_pending (wrk->vm,
164                                              session_queue_node.index);
165         }
166     }
167 }
168
169 void
170 sesssion_reschedule_tx (transport_connection_t * tc)
171 {
172   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
173   session_evt_elt_t *elt;
174
175   ASSERT (tc->thread_index == vlib_get_thread_index ());
176
177   elt = session_evt_alloc_new (wrk);
178   elt->evt.session_index = tc->s_index;
179   elt->evt.event_type = SESSION_IO_EVT_TX;
180
181   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
182     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
183 }
184
185 static void
186 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
187 {
188   u32 thread_index = vlib_get_thread_index ();
189   session_evt_elt_t *elt;
190   session_worker_t *wrk;
191
192   /* If we are in the handler thread, or being called with the worker barrier
193    * held, just append a new event to pending disconnects vector. */
194   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
195     {
196       wrk = session_main_get_worker (s->thread_index);
197       elt = session_evt_alloc_ctrl (wrk);
198       clib_memset (&elt->evt, 0, sizeof (session_event_t));
199       elt->evt.session_handle = session_handle (s);
200       elt->evt.event_type = evt;
201
202       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
203         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
204     }
205   else
206     session_send_ctrl_evt_to_thread (s, evt);
207 }
208
209 session_t *
210 session_alloc (u32 thread_index)
211 {
212   session_worker_t *wrk = &session_main.wrk[thread_index];
213   session_t *s;
214
215   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
216   clib_memset (s, 0, sizeof (*s));
217   s->session_index = s - wrk->sessions;
218   s->thread_index = thread_index;
219   s->al_index = APP_INVALID_INDEX;
220
221   return s;
222 }
223
224 void
225 session_free (session_t * s)
226 {
227   session_worker_t *wrk = &session_main.wrk[s->thread_index];
228
229   SESSION_EVT (SESSION_EVT_FREE, s);
230   if (CLIB_DEBUG)
231     clib_memset (s, 0xFA, sizeof (*s));
232   pool_put (wrk->sessions, s);
233 }
234
235 u8
236 session_is_valid (u32 si, u8 thread_index)
237 {
238   session_t *s;
239   transport_connection_t *tc;
240
241   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
242
243   if (s->thread_index != thread_index || s->session_index != si)
244     return 0;
245
246   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
247       || s->session_state <= SESSION_STATE_LISTENING)
248     return 1;
249
250   if ((s->session_state == SESSION_STATE_CONNECTING ||
251        s->session_state == SESSION_STATE_TRANSPORT_CLOSED) &&
252       (s->flags & SESSION_F_HALF_OPEN))
253     return 1;
254
255   tc = session_get_transport (s);
256   if (s->connection_index != tc->c_index ||
257       s->thread_index != tc->thread_index || tc->s_index != si)
258     return 0;
259
260   return 1;
261 }
262
263 void
264 session_cleanup (session_t *s)
265 {
266   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
267   session_free (s);
268 }
269
270 static void
271 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
272 {
273   app_worker_t *app_wrk;
274
275   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
276   if (PREDICT_FALSE (!app_wrk))
277     {
278       if (ntf == SESSION_CLEANUP_TRANSPORT)
279         return;
280
281       session_cleanup (s);
282       return;
283     }
284   app_worker_cleanup_notify (app_wrk, s, ntf);
285 }
286
287 void
288 session_program_cleanup (session_t *s)
289 {
290   ASSERT (s->session_state == SESSION_STATE_TRANSPORT_DELETED);
291   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
292 }
293
294 /**
295  * Cleans up session and lookup table.
296  *
297  * Transport connection must still be valid.
298  */
299 static void
300 session_delete (session_t * s)
301 {
302   int rv;
303
304   /* Delete from the main lookup table. */
305   if ((rv = session_lookup_del_session (s)))
306     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
307
308   session_program_cleanup (s);
309 }
310
311 void
312 session_cleanup_half_open (session_handle_t ho_handle)
313 {
314   session_t *ho = session_get_from_handle (ho_handle);
315
316   /* App transports can migrate their half-opens */
317   if (ho->flags & SESSION_F_IS_MIGRATING)
318     {
319       /* Session still migrating, move to closed state to signal that the
320        * session should be removed. */
321       if (ho->connection_index == ~0)
322         {
323           session_set_state (ho, SESSION_STATE_CLOSED);
324           return;
325         }
326       /* Migrated transports are no longer half-opens */
327       transport_cleanup (session_get_transport_proto (ho),
328                          ho->connection_index, ho->al_index /* overloaded */);
329     }
330   else if (ho->session_state != SESSION_STATE_TRANSPORT_DELETED)
331     {
332       /* Cleanup half-open session lookup table if need be */
333       if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
334         {
335           transport_connection_t *tc;
336           tc = transport_get_half_open (session_get_transport_proto (ho),
337                                         ho->connection_index);
338           if (tc && !(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
339             session_lookup_del_half_open (tc);
340         }
341       transport_cleanup_half_open (session_get_transport_proto (ho),
342                                    ho->connection_index);
343     }
344   session_free (ho);
345 }
346
347 static void
348 session_half_open_free (session_t *ho)
349 {
350   app_worker_t *app_wrk;
351
352   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
353   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
354   if (app_wrk)
355     app_worker_del_half_open (app_wrk, ho);
356   else
357     session_free (ho);
358 }
359
360 static void
361 session_half_open_free_rpc (void *args)
362 {
363   session_t *ho = ho_session_get (pointer_to_uword (args));
364   session_half_open_free (ho);
365 }
366
367 void
368 session_half_open_delete_notify (transport_connection_t *tc)
369 {
370   session_t *ho = ho_session_get (tc->s_index);
371
372   /* Cleanup half-open lookup table if need be */
373   if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
374     {
375       if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
376         session_lookup_del_half_open (tc);
377     }
378   session_set_state (ho, SESSION_STATE_TRANSPORT_DELETED);
379
380   /* Notification from ctrl thread accepted without rpc */
381   if (tc->thread_index == transport_cl_thread ())
382     {
383       session_half_open_free (ho);
384     }
385   else
386     {
387       void *args = uword_to_pointer ((uword) tc->s_index, void *);
388       session_send_rpc_evt_to_thread_force (transport_cl_thread (),
389                                             session_half_open_free_rpc, args);
390     }
391 }
392
393 void
394 session_half_open_migrate_notify (transport_connection_t *tc)
395 {
396   session_t *ho;
397
398   /* Support half-open migrations only for transports with no lookup */
399   ASSERT (tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP);
400
401   ho = ho_session_get (tc->s_index);
402   ho->flags |= SESSION_F_IS_MIGRATING;
403   ho->connection_index = ~0;
404 }
405
406 int
407 session_half_open_migrated_notify (transport_connection_t *tc)
408 {
409   session_t *ho;
410
411   ho = ho_session_get (tc->s_index);
412
413   /* App probably detached so the half-open must be cleaned up */
414   if (ho->session_state == SESSION_STATE_CLOSED)
415     {
416       session_half_open_delete_notify (tc);
417       return -1;
418     }
419   ho->connection_index = tc->c_index;
420   /* Overload al_index for half-open with new thread */
421   ho->al_index = tc->thread_index;
422   return 0;
423 }
424
425 session_t *
426 session_alloc_for_connection (transport_connection_t * tc)
427 {
428   session_t *s;
429   u32 thread_index = tc->thread_index;
430
431   ASSERT (thread_index == vlib_get_thread_index ()
432           || transport_protocol_is_cl (tc->proto));
433
434   s = session_alloc (thread_index);
435   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
436   session_set_state (s, SESSION_STATE_CLOSED);
437
438   /* Attach transport to session and vice versa */
439   s->connection_index = tc->c_index;
440   tc->s_index = s->session_index;
441   return s;
442 }
443
444 session_t *
445 session_alloc_for_half_open (transport_connection_t *tc)
446 {
447   session_t *s;
448
449   s = ho_session_alloc ();
450   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
451   s->connection_index = tc->c_index;
452   tc->s_index = s->session_index;
453   return s;
454 }
455
456 /**
457  * Discards bytes from buffer chain
458  *
459  * It discards n_bytes_to_drop starting at first buffer after chain_b
460  */
461 always_inline void
462 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
463                                      vlib_buffer_t ** chain_b,
464                                      u32 n_bytes_to_drop)
465 {
466   vlib_buffer_t *next = *chain_b;
467   u32 to_drop = n_bytes_to_drop;
468   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
469   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
470     {
471       next = vlib_get_buffer (vm, next->next_buffer);
472       if (next->current_length > to_drop)
473         {
474           vlib_buffer_advance (next, to_drop);
475           to_drop = 0;
476         }
477       else
478         {
479           to_drop -= next->current_length;
480           next->current_length = 0;
481         }
482     }
483   *chain_b = next;
484
485   if (to_drop == 0)
486     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
487 }
488
489 /**
490  * Enqueue buffer chain tail
491  */
492 always_inline int
493 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
494                             u32 offset, u8 is_in_order)
495 {
496   vlib_buffer_t *chain_b;
497   u32 chain_bi, len, diff;
498   vlib_main_t *vm = vlib_get_main ();
499   u8 *data;
500   u32 written = 0;
501   int rv = 0;
502
503   if (is_in_order && offset)
504     {
505       diff = offset - b->current_length;
506       if (diff > b->total_length_not_including_first_buffer)
507         return 0;
508       chain_b = b;
509       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
510       chain_bi = vlib_get_buffer_index (vm, chain_b);
511     }
512   else
513     chain_bi = b->next_buffer;
514
515   do
516     {
517       chain_b = vlib_get_buffer (vm, chain_bi);
518       data = vlib_buffer_get_current (chain_b);
519       len = chain_b->current_length;
520       if (!len)
521         continue;
522       if (is_in_order)
523         {
524           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
525           if (rv == len)
526             {
527               written += rv;
528             }
529           else if (rv < len)
530             {
531               return (rv > 0) ? (written + rv) : written;
532             }
533           else if (rv > len)
534             {
535               written += rv;
536
537               /* written more than what was left in chain */
538               if (written > b->total_length_not_including_first_buffer)
539                 return written;
540
541               /* drop the bytes that have already been delivered */
542               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
543             }
544         }
545       else
546         {
547           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
548           if (rv)
549             {
550               clib_warning ("failed to enqueue multi-buffer seg");
551               return -1;
552             }
553           offset += len;
554         }
555     }
556   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
557           ? chain_b->next_buffer : 0));
558
559   if (is_in_order)
560     return written;
561
562   return 0;
563 }
564
565 void
566 session_fifo_tuning (session_t * s, svm_fifo_t * f,
567                      session_ft_action_t act, u32 len)
568 {
569   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
570     {
571       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
572       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
573       if (CLIB_ASSERT_ENABLE)
574         {
575           segment_manager_t *sm;
576           sm = segment_manager_get (f->segment_manager);
577           ASSERT (f->shr->size >= 4096);
578           ASSERT (f->shr->size <= sm->max_fifo_size);
579         }
580     }
581 }
582
583 void
584 session_wrk_program_app_wrk_evts (session_worker_t *wrk, u32 app_wrk_index)
585 {
586   u8 need_interrupt;
587
588   ASSERT ((wrk - session_main.wrk) == vlib_get_thread_index ());
589   need_interrupt = clib_bitmap_is_zero (wrk->app_wrks_pending_ntf);
590   wrk->app_wrks_pending_ntf =
591     clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk_index, 1);
592
593   if (need_interrupt)
594     vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
595 }
596
597 always_inline void
598 session_program_io_event (app_worker_t *app_wrk, session_t *s,
599                           session_evt_type_t et, u8 is_cl)
600 {
601   if (is_cl)
602     {
603       /* Special events for connectionless sessions */
604       et += SESSION_IO_EVT_BUILTIN_RX - SESSION_IO_EVT_RX;
605
606       ASSERT (s->thread_index == 0 || et == SESSION_IO_EVT_TX_MAIN);
607       session_event_t evt = {
608         .event_type = et,
609         .session_handle = session_handle (s),
610       };
611
612       app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
613     }
614   else
615     {
616       app_worker_add_event (app_wrk, s, et);
617     }
618 }
619
620 static inline int
621 session_notify_subscribers (u32 app_index, session_t *s, svm_fifo_t *f,
622                             session_evt_type_t evt_type)
623 {
624   app_worker_t *app_wrk;
625   application_t *app;
626   u8 is_cl;
627   int i;
628
629   app = application_get (app_index);
630   if (!app)
631     return -1;
632
633   is_cl = s->thread_index != vlib_get_thread_index ();
634   for (i = 0; i < f->shr->n_subscribers; i++)
635     {
636       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
637       if (!app_wrk)
638         continue;
639       session_program_io_event (app_wrk, s, evt_type, is_cl ? 1 : 0);
640     }
641
642   return 0;
643 }
644
645 always_inline int
646 session_enqueue_notify_inline (session_t *s, u8 is_cl)
647 {
648   app_worker_t *app_wrk;
649
650   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
651   if (PREDICT_FALSE (!app_wrk))
652     return -1;
653
654   session_program_io_event (app_wrk, s, SESSION_IO_EVT_RX, is_cl);
655
656   if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo)))
657     return session_notify_subscribers (app_wrk->app_index, s, s->rx_fifo,
658                                        SESSION_IO_EVT_RX);
659
660   return 0;
661 }
662
663 int
664 session_enqueue_notify (session_t *s)
665 {
666   return session_enqueue_notify_inline (s, 0 /* is_cl */);
667 }
668
669 int
670 session_enqueue_notify_cl (session_t *s)
671 {
672   return session_enqueue_notify_inline (s, 1 /* is_cl */);
673 }
674
675 int
676 session_dequeue_notify (session_t *s)
677 {
678   app_worker_t *app_wrk;
679   u8 is_cl;
680
681   /* Unset as soon as event is requested */
682   svm_fifo_clear_deq_ntf (s->tx_fifo);
683
684   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
685   if (PREDICT_FALSE (!app_wrk))
686     return -1;
687
688   is_cl = s->session_state == SESSION_STATE_LISTENING ||
689           s->session_state == SESSION_STATE_OPENED;
690   session_program_io_event (app_wrk, s, SESSION_IO_EVT_TX, is_cl ? 1 : 0);
691
692   if (PREDICT_FALSE (svm_fifo_n_subscribers (s->tx_fifo)))
693     return session_notify_subscribers (app_wrk->app_index, s, s->tx_fifo,
694                                        SESSION_IO_EVT_TX);
695
696   return 0;
697 }
698
699 /**
700  * Flushes queue of sessions that are to be notified of new data
701  * enqueued events.
702  *
703  * @param transport_proto transport protocol for which queue to be flushed
704  * @param thread_index Thread index for which the flush is to be performed.
705  * @return 0 on success or a positive number indicating the number of
706  *         failures due to API queue being full.
707  */
708 void
709 session_main_flush_enqueue_events (transport_proto_t transport_proto,
710                                    u32 thread_index)
711 {
712   session_worker_t *wrk = session_main_get_worker (thread_index);
713   session_handle_t *handles;
714   session_t *s;
715   u32 i, is_cl;
716
717   handles = wrk->session_to_enqueue[transport_proto];
718
719   for (i = 0; i < vec_len (handles); i++)
720     {
721       s = session_get_from_handle (handles[i]);
722       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
723                            0 /* TODO/not needed */);
724       is_cl =
725         s->thread_index != thread_index || (s->flags & SESSION_F_IS_CLESS);
726       if (!is_cl)
727         session_enqueue_notify_inline (s, 0);
728       else
729         session_enqueue_notify_inline (s, 1);
730     }
731
732   vec_reset_length (handles);
733   wrk->session_to_enqueue[transport_proto] = handles;
734 }
735
736 /*
737  * Enqueue data for delivery to app. If requested, it queues app notification
738  * event for later delivery.
739  *
740  * @param tc Transport connection which is to be enqueued data
741  * @param b Buffer to be enqueued
742  * @param offset Offset at which to start enqueueing if out-of-order
743  * @param queue_event Flag to indicate if peer is to be notified or if event
744  *                    is to be queued. The former is useful when more data is
745  *                    enqueued and only one event is to be generated.
746  * @param is_in_order Flag to indicate if data is in order
747  * @return Number of bytes enqueued or a negative value if enqueueing failed.
748  */
749 int
750 session_enqueue_stream_connection (transport_connection_t * tc,
751                                    vlib_buffer_t * b, u32 offset,
752                                    u8 queue_event, u8 is_in_order)
753 {
754   session_t *s;
755   int enqueued = 0, rv, in_order_off;
756
757   s = session_get (tc->s_index, tc->thread_index);
758
759   if (is_in_order)
760     {
761       enqueued = svm_fifo_enqueue (s->rx_fifo,
762                                    b->current_length,
763                                    vlib_buffer_get_current (b));
764       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
765                          && enqueued >= 0))
766         {
767           in_order_off = enqueued > b->current_length ? enqueued : 0;
768           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
769           if (rv > 0)
770             enqueued += rv;
771         }
772     }
773   else
774     {
775       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
776                                          b->current_length,
777                                          vlib_buffer_get_current (b));
778       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
779         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
780       /* if something was enqueued, report even this as success for ooo
781        * segment handling */
782       return rv;
783     }
784
785   if (queue_event)
786     {
787       /* Queue RX event on this fifo. Eventually these will need to be
788        * flushed by calling @ref session_main_flush_enqueue_events () */
789       if (!(s->flags & SESSION_F_RX_EVT))
790         {
791           session_worker_t *wrk = session_main_get_worker (s->thread_index);
792           ASSERT (s->thread_index == vlib_get_thread_index ());
793           s->flags |= SESSION_F_RX_EVT;
794           vec_add1 (wrk->session_to_enqueue[tc->proto], session_handle (s));
795         }
796
797       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
798     }
799
800   return enqueued;
801 }
802
803 always_inline int
804 session_enqueue_dgram_connection_inline (session_t *s,
805                                          session_dgram_hdr_t *hdr,
806                                          vlib_buffer_t *b, u8 proto,
807                                          u8 queue_event, u32 is_cl)
808 {
809   int rv;
810
811   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
812           >= b->current_length + sizeof (*hdr));
813
814   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
815     {
816       svm_fifo_seg_t segs[2] = {
817           { (u8 *) hdr, sizeof (*hdr) },
818           { vlib_buffer_get_current (b), b->current_length }
819       };
820
821       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
822                                       0 /* allow_partial */ );
823     }
824   else
825     {
826       vlib_main_t *vm = vlib_get_main ();
827       svm_fifo_seg_t *segs = 0, *seg;
828       vlib_buffer_t *it = b;
829       u32 n_segs = 1;
830
831       vec_add2 (segs, seg, 1);
832       seg->data = (u8 *) hdr;
833       seg->len = sizeof (*hdr);
834       while (it)
835         {
836           vec_add2 (segs, seg, 1);
837           seg->data = vlib_buffer_get_current (it);
838           seg->len = it->current_length;
839           n_segs++;
840           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
841             break;
842           it = vlib_get_buffer (vm, it->next_buffer);
843         }
844       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
845                                       0 /* allow partial */ );
846       vec_free (segs);
847     }
848
849   if (queue_event && rv > 0)
850     {
851       /* Queue RX event on this fifo. Eventually these will need to be
852        * flushed by calling @ref session_main_flush_enqueue_events () */
853       if (!(s->flags & SESSION_F_RX_EVT))
854         {
855           u32 thread_index =
856             is_cl ? vlib_get_thread_index () : s->thread_index;
857           session_worker_t *wrk = session_main_get_worker (thread_index);
858           ASSERT (s->thread_index == vlib_get_thread_index () || is_cl);
859           s->flags |= SESSION_F_RX_EVT;
860           vec_add1 (wrk->session_to_enqueue[proto], session_handle (s));
861         }
862
863       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
864     }
865   return rv > 0 ? rv : 0;
866 }
867
868 int
869 session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr,
870                                   vlib_buffer_t *b, u8 proto, u8 queue_event)
871 {
872   return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
873                                                   queue_event, 0 /* is_cl */);
874 }
875
876 int
877 session_enqueue_dgram_connection2 (session_t *s, session_dgram_hdr_t *hdr,
878                                    vlib_buffer_t *b, u8 proto, u8 queue_event)
879 {
880   return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
881                                                   queue_event, 1 /* is_cl */);
882 }
883
884 int
885 session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr,
886                                      vlib_buffer_t *b, u8 proto,
887                                      u8 queue_event)
888 {
889   session_t *awls;
890
891   awls = app_listener_select_wrk_cl_session (s, hdr);
892   return session_enqueue_dgram_connection_inline (awls, hdr, b, proto,
893                                                   queue_event, 1 /* is_cl */);
894 }
895
896 int
897 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
898                             u32 offset, u32 max_bytes)
899 {
900   session_t *s = session_get (tc->s_index, tc->thread_index);
901   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
902 }
903
904 u32
905 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
906 {
907   session_t *s = session_get (tc->s_index, tc->thread_index);
908   u32 rv;
909
910   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
911   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
912
913   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
914     session_dequeue_notify (s);
915
916   return rv;
917 }
918
919 int
920 session_stream_connect_notify (transport_connection_t * tc,
921                                session_error_t err)
922 {
923   u32 opaque = 0, new_ti, new_si;
924   app_worker_t *app_wrk;
925   session_t *s = 0, *ho;
926
927   /*
928    * Cleanup half-open table
929    */
930   session_lookup_del_half_open (tc);
931
932   ho = ho_session_get (tc->s_index);
933   session_set_state (ho, SESSION_STATE_TRANSPORT_CLOSED);
934   opaque = ho->opaque;
935   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
936   if (!app_wrk)
937     return -1;
938
939   if (err)
940     return app_worker_connect_notify (app_wrk, s, err, opaque);
941
942   s = session_alloc_for_connection (tc);
943   session_set_state (s, SESSION_STATE_CONNECTING);
944   s->app_wrk_index = app_wrk->wrk_index;
945   s->opaque = opaque;
946   new_si = s->session_index;
947   new_ti = s->thread_index;
948
949   if ((err = app_worker_init_connected (app_wrk, s)))
950     {
951       session_free (s);
952       app_worker_connect_notify (app_wrk, 0, err, opaque);
953       return -1;
954     }
955
956   s = session_get (new_si, new_ti);
957   session_set_state (s, SESSION_STATE_READY);
958   session_lookup_add_connection (tc, session_handle (s));
959
960   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
961     {
962       session_lookup_del_connection (tc);
963       /* Avoid notifying app about rejected session cleanup */
964       s = session_get (new_si, new_ti);
965       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
966       session_free (s);
967       return -1;
968     }
969
970   return 0;
971 }
972
973 static void
974 session_switch_pool_closed_rpc (void *arg)
975 {
976   session_handle_t sh;
977   session_t *s;
978
979   sh = pointer_to_uword (arg);
980   s = session_get_from_handle_if_valid (sh);
981   if (!s)
982     return;
983
984   transport_cleanup (session_get_transport_proto (s), s->connection_index,
985                      s->thread_index);
986   session_cleanup (s);
987 }
988
989 typedef struct _session_switch_pool_args
990 {
991   u32 session_index;
992   u32 thread_index;
993   u32 new_thread_index;
994   u32 new_session_index;
995 } session_switch_pool_args_t;
996
997 /**
998  * Notify old thread of the session pool switch
999  */
1000 static void
1001 session_switch_pool (void *cb_args)
1002 {
1003   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
1004   session_handle_t sh, new_sh;
1005   segment_manager_t *sm;
1006   app_worker_t *app_wrk;
1007   session_t *s;
1008
1009   ASSERT (args->thread_index == vlib_get_thread_index ());
1010   s = session_get (args->session_index, args->thread_index);
1011
1012   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1013   if (!app_wrk)
1014     goto app_closed;
1015
1016   /* Cleanup fifo segment slice state for fifos */
1017   sm = app_worker_get_connect_segment_manager (app_wrk);
1018   segment_manager_detach_fifo (sm, &s->rx_fifo);
1019   segment_manager_detach_fifo (sm, &s->tx_fifo);
1020
1021   /* Check if session closed during migration */
1022   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1023     goto app_closed;
1024
1025   new_sh =
1026     session_make_handle (args->new_session_index, args->new_thread_index);
1027   app_worker_migrate_notify (app_wrk, s, new_sh);
1028
1029   clib_mem_free (cb_args);
1030   return;
1031
1032 app_closed:
1033   /* Session closed during migration. Clean everything up */
1034   sh = session_handle (s);
1035   session_send_rpc_evt_to_thread (args->new_thread_index,
1036                                   session_switch_pool_closed_rpc,
1037                                   uword_to_pointer (sh, void *));
1038   clib_mem_free (cb_args);
1039 }
1040
1041 /**
1042  * Move dgram session to the right thread
1043  */
1044 int
1045 session_dgram_connect_notify (transport_connection_t * tc,
1046                               u32 old_thread_index, session_t ** new_session)
1047 {
1048   session_t *new_s;
1049   session_switch_pool_args_t *rpc_args;
1050   segment_manager_t *sm;
1051   app_worker_t *app_wrk;
1052
1053   /*
1054    * Clone half-open session to the right thread.
1055    */
1056   new_s = session_clone_safe (tc->s_index, old_thread_index);
1057   new_s->connection_index = tc->c_index;
1058   session_set_state (new_s, SESSION_STATE_READY);
1059   new_s->flags |= SESSION_F_IS_MIGRATING;
1060
1061   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1062     session_lookup_add_connection (tc, session_handle (new_s));
1063
1064   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1065   if (app_wrk)
1066     {
1067       /* New set of fifos attached to the same shared memory */
1068       sm = app_worker_get_connect_segment_manager (app_wrk);
1069       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1070       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1071     }
1072
1073   /*
1074    * Ask thread owning the old session to clean it up and make us the tx
1075    * fifo owner
1076    */
1077   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1078   rpc_args->new_session_index = new_s->session_index;
1079   rpc_args->new_thread_index = new_s->thread_index;
1080   rpc_args->session_index = tc->s_index;
1081   rpc_args->thread_index = old_thread_index;
1082   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1083                                   rpc_args);
1084
1085   tc->s_index = new_s->session_index;
1086   new_s->connection_index = tc->c_index;
1087   *new_session = new_s;
1088   return 0;
1089 }
1090
1091 /**
1092  * Notification from transport that connection is being closed.
1093  *
1094  * A disconnect is sent to application but state is not removed. Once
1095  * disconnect is acknowledged by application, session disconnect is called.
1096  * Ultimately this leads to close being called on transport (passive close).
1097  */
1098 void
1099 session_transport_closing_notify (transport_connection_t * tc)
1100 {
1101   app_worker_t *app_wrk;
1102   session_t *s;
1103
1104   s = session_get (tc->s_index, tc->thread_index);
1105   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1106     return;
1107
1108   /* Wait for reply from app before sending notification as the
1109    * accept might be rejected */
1110   if (s->session_state == SESSION_STATE_ACCEPTING)
1111     {
1112       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1113       return;
1114     }
1115
1116   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1117   app_wrk = app_worker_get (s->app_wrk_index);
1118   app_worker_close_notify (app_wrk, s);
1119 }
1120
1121 /**
1122  * Notification from transport that connection is being deleted
1123  *
1124  * This removes the session if it is still valid. It should be called only on
1125  * previously fully established sessions. For instance failed connects should
1126  * call stream_session_connect_notify and indicate that the connect has
1127  * failed.
1128  */
1129 void
1130 session_transport_delete_notify (transport_connection_t * tc)
1131 {
1132   session_t *s;
1133
1134   /* App might've been removed already */
1135   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1136     return;
1137
1138   switch (s->session_state)
1139     {
1140     case SESSION_STATE_CREATED:
1141       /* Session was created but accept notification was not yet sent to the
1142        * app. Cleanup everything. */
1143       session_lookup_del_session (s);
1144       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1145       session_free (s);
1146       break;
1147     case SESSION_STATE_ACCEPTING:
1148     case SESSION_STATE_TRANSPORT_CLOSING:
1149     case SESSION_STATE_CLOSING:
1150     case SESSION_STATE_TRANSPORT_CLOSED:
1151       /* If transport finishes or times out before we get a reply
1152        * from the app, mark transport as closed and wait for reply
1153        * before removing the session. Cleanup session table in advance
1154        * because transport will soon be closed and closed sessions
1155        * are assumed to have been removed from the lookup table */
1156       session_lookup_del_session (s);
1157       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1158       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1159       svm_fifo_dequeue_drop_all (s->tx_fifo);
1160       break;
1161     case SESSION_STATE_APP_CLOSED:
1162       /* Cleanup lookup table as transport needs to still be valid.
1163        * Program transport close to ensure that all session events
1164        * have been cleaned up. Once transport close is called, the
1165        * session is just removed because both transport and app have
1166        * confirmed the close*/
1167       session_lookup_del_session (s);
1168       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1169       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1170       svm_fifo_dequeue_drop_all (s->tx_fifo);
1171       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1172       break;
1173     case SESSION_STATE_TRANSPORT_DELETED:
1174       break;
1175     case SESSION_STATE_CLOSED:
1176       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1177       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1178       session_delete (s);
1179       break;
1180     default:
1181       clib_warning ("session state %u", s->session_state);
1182       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1183       session_delete (s);
1184       break;
1185     }
1186 }
1187
1188 /**
1189  * Notification from transport that it is closed
1190  *
1191  * Should be called by transport, prior to calling delete notify, once it
1192  * knows that no more data will be exchanged. This could serve as an
1193  * early acknowledgment of an active close especially if transport delete
1194  * can be delayed a long time, e.g., tcp time-wait.
1195  */
1196 void
1197 session_transport_closed_notify (transport_connection_t * tc)
1198 {
1199   app_worker_t *app_wrk;
1200   session_t *s;
1201
1202   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1203     return;
1204
1205   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1206     return;
1207
1208   /* Transport thinks that app requested close but it actually didn't.
1209    * Can happen for tcp:
1210    * 1)if fin and rst are received in close succession.
1211    * 2)if app shutdown the connection.  */
1212   if (s->session_state == SESSION_STATE_READY)
1213     {
1214       session_transport_closing_notify (tc);
1215       svm_fifo_dequeue_drop_all (s->tx_fifo);
1216       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1217     }
1218   /* If app close has not been received or has not yet resulted in
1219    * a transport close, only mark the session transport as closed */
1220   else if (s->session_state <= SESSION_STATE_CLOSING)
1221     session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1222   /* If app also closed, switch to closed */
1223   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1224     session_set_state (s, SESSION_STATE_CLOSED);
1225
1226   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1227   if (app_wrk)
1228     app_worker_transport_closed_notify (app_wrk, s);
1229 }
1230
1231 /**
1232  * Notify application that connection has been reset.
1233  */
1234 void
1235 session_transport_reset_notify (transport_connection_t * tc)
1236 {
1237   app_worker_t *app_wrk;
1238   session_t *s;
1239
1240   s = session_get (tc->s_index, tc->thread_index);
1241   svm_fifo_dequeue_drop_all (s->tx_fifo);
1242   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1243     return;
1244   if (s->session_state == SESSION_STATE_ACCEPTING)
1245     {
1246       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1247       return;
1248     }
1249   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1250   app_wrk = app_worker_get (s->app_wrk_index);
1251   app_worker_reset_notify (app_wrk, s);
1252 }
1253
1254 int
1255 session_stream_accept_notify (transport_connection_t * tc)
1256 {
1257   app_worker_t *app_wrk;
1258   session_t *s;
1259
1260   s = session_get (tc->s_index, tc->thread_index);
1261   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1262   if (!app_wrk)
1263     return -1;
1264   if (s->session_state != SESSION_STATE_CREATED)
1265     return 0;
1266   session_set_state (s, SESSION_STATE_ACCEPTING);
1267   if (app_worker_accept_notify (app_wrk, s))
1268     {
1269       /* On transport delete, no notifications should be sent. Unless, the
1270        * accept is retried and successful. */
1271       session_set_state (s, SESSION_STATE_CREATED);
1272       return -1;
1273     }
1274   return 0;
1275 }
1276
1277 /**
1278  * Accept a stream session. Optionally ping the server by callback.
1279  */
1280 int
1281 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1282                        u32 thread_index, u8 notify)
1283 {
1284   session_t *s;
1285   int rv;
1286
1287   s = session_alloc_for_connection (tc);
1288   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1289   session_set_state (s, SESSION_STATE_CREATED);
1290
1291   if ((rv = app_worker_init_accepted (s)))
1292     {
1293       session_free (s);
1294       return rv;
1295     }
1296
1297   session_lookup_add_connection (tc, session_handle (s));
1298
1299   /* Shoulder-tap the server */
1300   if (notify)
1301     {
1302       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1303       if ((rv = app_worker_accept_notify (app_wrk, s)))
1304         {
1305           session_lookup_del_session (s);
1306           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1307           session_free (s);
1308           return rv;
1309         }
1310     }
1311
1312   return 0;
1313 }
1314
1315 int
1316 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1317                       u32 thread_index)
1318 {
1319   app_worker_t *app_wrk;
1320   session_t *s;
1321   int rv;
1322
1323   s = session_alloc_for_connection (tc);
1324   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1325
1326   if ((rv = app_worker_init_accepted (s)))
1327     {
1328       session_free (s);
1329       return rv;
1330     }
1331
1332   session_lookup_add_connection (tc, session_handle (s));
1333   session_set_state (s, SESSION_STATE_ACCEPTING);
1334
1335   app_wrk = app_worker_get (s->app_wrk_index);
1336   if ((rv = app_worker_accept_notify (app_wrk, s)))
1337     {
1338       session_lookup_del_session (s);
1339       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1340       session_free (s);
1341       return rv;
1342     }
1343
1344   return 0;
1345 }
1346
1347 int
1348 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1349 {
1350   transport_connection_t *tc;
1351   transport_endpoint_cfg_t *tep;
1352   app_worker_t *app_wrk;
1353   session_handle_t sh;
1354   session_t *s;
1355   int rv;
1356
1357   tep = session_endpoint_to_transport_cfg (rmt);
1358   rv = transport_connect (rmt->transport_proto, tep);
1359   if (rv < 0)
1360     {
1361       SESSION_DBG ("Transport failed to open connection.");
1362       return rv;
1363     }
1364
1365   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1366
1367   /* For dgram type of service, allocate session and fifos now */
1368   app_wrk = app_worker_get (rmt->app_wrk_index);
1369   s = session_alloc_for_connection (tc);
1370   s->app_wrk_index = app_wrk->wrk_index;
1371   s->opaque = rmt->opaque;
1372   session_set_state (s, SESSION_STATE_OPENED);
1373   if (transport_connection_is_cless (tc))
1374     s->flags |= SESSION_F_IS_CLESS;
1375   if (app_worker_init_connected (app_wrk, s))
1376     {
1377       session_free (s);
1378       return -1;
1379     }
1380
1381   sh = session_handle (s);
1382   *rsh = sh;
1383
1384   session_lookup_add_connection (tc, sh);
1385   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1386 }
1387
1388 int
1389 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1390 {
1391   transport_connection_t *tc;
1392   transport_endpoint_cfg_t *tep;
1393   app_worker_t *app_wrk;
1394   session_t *ho;
1395   int rv;
1396
1397   tep = session_endpoint_to_transport_cfg (rmt);
1398   rv = transport_connect (rmt->transport_proto, tep);
1399   if (rv < 0)
1400     {
1401       SESSION_DBG ("Transport failed to open connection.");
1402       return rv;
1403     }
1404
1405   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1406
1407   app_wrk = app_worker_get (rmt->app_wrk_index);
1408
1409   /* If transport offers a vc service, only allocate established
1410    * session once the connection has been established.
1411    * In the meantime allocate half-open session for tracking purposes
1412    * associate half-open connection to it and add session to app-worker
1413    * half-open table. These are needed to allocate the established
1414    * session on transport notification, and to cleanup the half-open
1415    * session if the app detaches before connection establishment.
1416    */
1417   ho = session_alloc_for_half_open (tc);
1418   ho->app_wrk_index = app_wrk->wrk_index;
1419   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1420   ho->opaque = rmt->opaque;
1421   *rsh = session_handle (ho);
1422
1423   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1424     session_lookup_add_half_open (tc, tc->c_index);
1425
1426   return 0;
1427 }
1428
1429 int
1430 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1431 {
1432   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1433
1434   /* Not supported for now */
1435   *rsh = SESSION_INVALID_HANDLE;
1436   return transport_connect (rmt->transport_proto, tep_cfg);
1437 }
1438
1439 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1440                                         session_handle_t *);
1441
1442 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1443   session_open_vc,
1444   session_open_cl,
1445   session_open_app,
1446 };
1447
1448 /**
1449  * Ask transport to open connection to remote transport endpoint.
1450  *
1451  * Stores handle for matching request with reply since the call can be
1452  * asynchronous. For instance, for TCP the 3-way handshake must complete
1453  * before reply comes. Session is only created once connection is established.
1454  *
1455  * @param app_index Index of the application requesting the connect
1456  * @param st Session type requested.
1457  * @param tep Remote transport endpoint
1458  * @param opaque Opaque data (typically, api_context) the application expects
1459  *               on open completion.
1460  */
1461 int
1462 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1463 {
1464   transport_service_type_t tst;
1465   tst = transport_protocol_service_type (rmt->transport_proto);
1466   return session_open_srv_fns[tst](rmt, rsh);
1467 }
1468
1469 /**
1470  * Ask transport to listen on session endpoint.
1471  *
1472  * @param s Session for which listen will be called. Note that unlike
1473  *          established sessions, listen sessions are not associated to a
1474  *          thread.
1475  * @param sep Local endpoint to be listened on.
1476  */
1477 int
1478 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1479 {
1480   transport_endpoint_cfg_t *tep;
1481   int tc_index;
1482   u32 s_index;
1483
1484   /* Transport bind/listen */
1485   tep = session_endpoint_to_transport_cfg (sep);
1486   s_index = ls->session_index;
1487   tc_index = transport_start_listen (session_get_transport_proto (ls),
1488                                      s_index, tep);
1489
1490   if (tc_index < 0)
1491     return tc_index;
1492
1493   /* Attach transport to session. Lookup tables are populated by the app
1494    * worker because local tables (for ct sessions) are not backed by a fib */
1495   ls = listen_session_get (s_index);
1496   ls->connection_index = tc_index;
1497   ls->opaque = sep->opaque;
1498   if (transport_connection_is_cless (session_get_transport (ls)))
1499     ls->flags |= SESSION_F_IS_CLESS;
1500
1501   return 0;
1502 }
1503
1504 /**
1505  * Ask transport to stop listening on local transport endpoint.
1506  *
1507  * @param s Session to stop listening on. It must be in state LISTENING.
1508  */
1509 int
1510 session_stop_listen (session_t * s)
1511 {
1512   transport_proto_t tp = session_get_transport_proto (s);
1513   transport_connection_t *tc;
1514
1515   if (s->session_state != SESSION_STATE_LISTENING)
1516     return SESSION_E_NOLISTEN;
1517
1518   tc = transport_get_listener (tp, s->connection_index);
1519
1520   /* If no transport, assume everything was cleaned up already */
1521   if (!tc)
1522     return SESSION_E_NONE;
1523
1524   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1525     session_lookup_del_connection (tc);
1526
1527   transport_stop_listen (tp, s->connection_index);
1528   return 0;
1529 }
1530
1531 /**
1532  * Initialize session half-closing procedure.
1533  *
1534  * Note that half-closing will not change the state of the session.
1535  */
1536 void
1537 session_half_close (session_t *s)
1538 {
1539   if (!s)
1540     return;
1541
1542   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1543 }
1544
1545 /**
1546  * Initialize session closing procedure.
1547  *
1548  * Request is always sent to session node to ensure that all outstanding
1549  * requests are served before transport is notified.
1550  */
1551 void
1552 session_close (session_t * s)
1553 {
1554   if (!s || (s->flags & SESSION_F_APP_CLOSED))
1555     return;
1556
1557   /* Transports can close and delete their state independent of app closes
1558    * and transport initiated state transitions can hide app closes. Instead
1559    * of extending the state machine to support separate tracking of app and
1560    * transport initiated closes, use a flag. */
1561   s->flags |= SESSION_F_APP_CLOSED;
1562
1563   if (s->session_state >= SESSION_STATE_CLOSING)
1564     {
1565       /* Session will only be removed once both app and transport
1566        * acknowledge the close */
1567       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1568           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1569         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1570       return;
1571     }
1572
1573   /* App closed so stop propagating dequeue notifications.
1574    * App might disconnect session before connected, in this case,
1575    * tx_fifo may not be setup yet, so clear only it's inited. */
1576   if (s->tx_fifo)
1577     svm_fifo_clear_deq_ntf (s->tx_fifo);
1578   session_set_state (s, SESSION_STATE_CLOSING);
1579   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1580 }
1581
1582 /**
1583  * Force a close without waiting for data to be flushed
1584  */
1585 void
1586 session_reset (session_t * s)
1587 {
1588   if (s->session_state >= SESSION_STATE_CLOSING)
1589     return;
1590   /* Drop all outstanding tx data
1591    * App might disconnect session before connected, in this case,
1592    * tx_fifo may not be setup yet, so clear only it's inited. */
1593   if (s->tx_fifo)
1594     svm_fifo_dequeue_drop_all (s->tx_fifo);
1595   session_set_state (s, SESSION_STATE_CLOSING);
1596   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1597 }
1598
1599 void
1600 session_detach_app (session_t *s)
1601 {
1602   if (s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
1603     {
1604       session_close (s);
1605     }
1606   else if (s->session_state < SESSION_STATE_TRANSPORT_DELETED)
1607     {
1608       transport_connection_t *tc;
1609
1610       /* Transport is closing but it's not yet deleted. Confirm close and
1611        * subsequently detach transport from session and enqueue a session
1612        * cleanup notification. Transport closed and cleanup notifications are
1613        * going to be dropped by session layer apis */
1614       transport_close (session_get_transport_proto (s), s->connection_index,
1615                        s->thread_index);
1616       tc = session_get_transport (s);
1617       tc->s_index = SESSION_INVALID_INDEX;
1618       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1619       session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
1620     }
1621   else
1622     {
1623       session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
1624     }
1625
1626   s->flags |= SESSION_F_APP_CLOSED;
1627   s->app_wrk_index = APP_INVALID_INDEX;
1628 }
1629
1630 /**
1631  * Notify transport the session can be half-disconnected.
1632  *
1633  * Must be called from the session's thread.
1634  */
1635 void
1636 session_transport_half_close (session_t *s)
1637 {
1638   /* Only READY session can be half-closed */
1639   if (s->session_state != SESSION_STATE_READY)
1640     {
1641       return;
1642     }
1643
1644   transport_half_close (session_get_transport_proto (s), s->connection_index,
1645                         s->thread_index);
1646 }
1647
1648 /**
1649  * Notify transport the session can be disconnected. This should eventually
1650  * result in a delete notification that allows us to cleanup session state.
1651  * Called for both active/passive disconnects.
1652  *
1653  * Must be called from the session's thread.
1654  */
1655 void
1656 session_transport_close (session_t * s)
1657 {
1658   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1659     {
1660       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1661         session_set_state (s, SESSION_STATE_CLOSED);
1662       /* If transport is already deleted, just free the session */
1663       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1664         session_program_cleanup (s);
1665       return;
1666     }
1667
1668   /* If the tx queue wasn't drained, the transport can continue to try
1669    * sending the outstanding data (in closed state it cannot). It MUST however
1670    * at one point, either after sending everything or after a timeout, call
1671    * delete notify. This will finally lead to the complete cleanup of the
1672    * session.
1673    */
1674   session_set_state (s, SESSION_STATE_APP_CLOSED);
1675
1676   transport_close (session_get_transport_proto (s), s->connection_index,
1677                    s->thread_index);
1678 }
1679
1680 /**
1681  * Force transport close
1682  */
1683 void
1684 session_transport_reset (session_t * s)
1685 {
1686   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1687     {
1688       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1689         session_set_state (s, SESSION_STATE_CLOSED);
1690       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1691         session_program_cleanup (s);
1692       return;
1693     }
1694
1695   session_set_state (s, SESSION_STATE_APP_CLOSED);
1696   transport_reset (session_get_transport_proto (s), s->connection_index,
1697                    s->thread_index);
1698 }
1699
1700 /**
1701  * Cleanup transport and session state.
1702  *
1703  * Notify transport of the cleanup and free the session. This should
1704  * be called only if transport reported some error and is already
1705  * closed.
1706  */
1707 void
1708 session_transport_cleanup (session_t * s)
1709 {
1710   /* Delete from main lookup table before we axe the the transport */
1711   session_lookup_del_session (s);
1712   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1713     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1714                        s->thread_index);
1715   /* Since we called cleanup, no delete notification will come. So, make
1716    * sure the session is properly freed. */
1717   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1718   session_free (s);
1719 }
1720
1721 /**
1722  * Allocate worker mqs in share-able segment
1723  *
1724  * That can only be a newly created memfd segment, that must be mapped
1725  * by all apps/stack users unless private rx mqs are enabled.
1726  */
1727 void
1728 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1729 {
1730   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1731   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1732   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1733   uword mqs_seg_size;
1734   int i;
1735
1736   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1737
1738   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1739     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1740   };
1741   cfg->consumer_pid = 0;
1742   cfg->n_rings = 2;
1743   cfg->q_nitems = mq_q_length;
1744   cfg->ring_cfgs = rc;
1745
1746   /*
1747    * Compute mqs segment size based on rings config and leave space
1748    * for passing extended configuration messages, i.e., data allocated
1749    * outside of the rings. If provided with a config value, accept it
1750    * if larger than minimum size.
1751    */
1752   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1753   mqs_seg_size = mqs_seg_size + (1 << 20);
1754   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1755
1756   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1757   mqs_seg->ssvm.my_pid = getpid ();
1758   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1759
1760   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1761     {
1762       clib_warning ("failed to initialize queue segment");
1763       return;
1764     }
1765
1766   fifo_segment_init (mqs_seg);
1767
1768   /* Special fifo segment that's filled only with mqs */
1769   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1770
1771   for (i = 0; i < vec_len (smm->wrk); i++)
1772     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1773 }
1774
1775 fifo_segment_t *
1776 session_main_get_wrk_mqs_segment (void)
1777 {
1778   return &session_main.wrk_mqs_segment;
1779 }
1780
1781 u64
1782 session_segment_handle (session_t * s)
1783 {
1784   svm_fifo_t *f;
1785
1786   if (!s->rx_fifo)
1787     return SESSION_INVALID_HANDLE;
1788
1789   f = s->rx_fifo;
1790   return segment_manager_make_segment_handle (f->segment_manager,
1791                                               f->segment_index);
1792 }
1793
1794 void
1795 session_get_original_dst (transport_endpoint_t *i2o_src,
1796                           transport_endpoint_t *i2o_dst,
1797                           transport_proto_t transport_proto, u32 *original_dst,
1798                           u16 *original_dst_port)
1799 {
1800   session_main_t *smm = vnet_get_session_main ();
1801   ip_protocol_t proto =
1802     (transport_proto == TRANSPORT_PROTO_TCP ? IPPROTO_TCP : IPPROTO_UDP);
1803   if (!smm->original_dst_lookup || !i2o_dst->is_ip4)
1804     return;
1805   smm->original_dst_lookup (&i2o_src->ip.ip4, i2o_src->port, &i2o_dst->ip.ip4,
1806                             i2o_dst->port, proto, original_dst,
1807                             original_dst_port);
1808 }
1809
1810 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1811     session_tx_fifo_peek_and_snd,
1812     session_tx_fifo_dequeue_and_snd,
1813     session_tx_fifo_dequeue_internal,
1814     session_tx_fifo_dequeue_and_snd
1815 };
1816
1817 void
1818 session_register_transport (transport_proto_t transport_proto,
1819                             const transport_proto_vft_t * vft, u8 is_ip4,
1820                             u32 output_node)
1821 {
1822   session_main_t *smm = &session_main;
1823   session_type_t session_type;
1824   u32 next_index = ~0;
1825
1826   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1827
1828   vec_validate (smm->session_type_to_next, session_type);
1829   vec_validate (smm->session_tx_fns, session_type);
1830
1831   if (output_node != ~0)
1832     next_index = vlib_node_add_next (vlib_get_main (),
1833                                      session_queue_node.index, output_node);
1834
1835   smm->session_type_to_next[session_type] = next_index;
1836   smm->session_tx_fns[session_type] =
1837     session_tx_fns[vft->transport_options.tx_type];
1838 }
1839
1840 void
1841 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1842 {
1843   session_main_t *smm = &session_main;
1844   session_update_time_fn *fi;
1845   u32 fi_pos = ~0;
1846   u8 found = 0;
1847
1848   vec_foreach (fi, smm->update_time_fns)
1849     {
1850       if (*fi == fn)
1851         {
1852           fi_pos = fi - smm->update_time_fns;
1853           found = 1;
1854           break;
1855         }
1856     }
1857
1858   if (is_add)
1859     {
1860       if (found)
1861         {
1862           clib_warning ("update time fn %p already registered", fn);
1863           return;
1864         }
1865       vec_add1 (smm->update_time_fns, fn);
1866     }
1867   else
1868     {
1869       vec_del1 (smm->update_time_fns, fi_pos);
1870     }
1871 }
1872
1873 transport_proto_t
1874 session_add_transport_proto (void)
1875 {
1876   session_main_t *smm = &session_main;
1877   session_worker_t *wrk;
1878   u32 thread;
1879
1880   smm->last_transport_proto_type += 1;
1881
1882   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1883     {
1884       wrk = session_main_get_worker (thread);
1885       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1886     }
1887
1888   return smm->last_transport_proto_type;
1889 }
1890
1891 transport_connection_t *
1892 session_get_transport (session_t * s)
1893 {
1894   if (s->session_state != SESSION_STATE_LISTENING)
1895     return transport_get_connection (session_get_transport_proto (s),
1896                                      s->connection_index, s->thread_index);
1897   else
1898     return transport_get_listener (session_get_transport_proto (s),
1899                                    s->connection_index);
1900 }
1901
1902 void
1903 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1904 {
1905   if (s->session_state != SESSION_STATE_LISTENING)
1906     return transport_get_endpoint (session_get_transport_proto (s),
1907                                    s->connection_index, s->thread_index, tep,
1908                                    is_lcl);
1909   else
1910     return transport_get_listener_endpoint (session_get_transport_proto (s),
1911                                             s->connection_index, tep, is_lcl);
1912 }
1913
1914 int
1915 session_transport_attribute (session_t *s, u8 is_get,
1916                              transport_endpt_attr_t *attr)
1917 {
1918   if (s->session_state < SESSION_STATE_READY)
1919     return -1;
1920
1921   return transport_connection_attribute (session_get_transport_proto (s),
1922                                          s->connection_index, s->thread_index,
1923                                          is_get, attr);
1924 }
1925
1926 transport_connection_t *
1927 listen_session_get_transport (session_t * s)
1928 {
1929   return transport_get_listener (session_get_transport_proto (s),
1930                                  s->connection_index);
1931 }
1932
1933 void
1934 session_queue_run_on_main_thread (vlib_main_t * vm)
1935 {
1936   ASSERT (vlib_get_thread_index () == 0);
1937   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1938 }
1939
1940 static void
1941 session_stats_collector_fn (vlib_stats_collector_data_t *d)
1942 {
1943   u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1944   session_main_t *smm = &session_main;
1945   session_worker_t *wrk;
1946   counter_t **counters;
1947   counter_t *cb;
1948
1949   n_workers = vec_len (smm->wrk);
1950   vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1951   counters = d->entry->data;
1952   cb = counters[0];
1953
1954   for (i = 0; i < vec_len (smm->wrk); i++)
1955     {
1956       wrk = session_main_get_worker (i);
1957       n_wrk_sessions = pool_elts (wrk->sessions);
1958       cb[i] = n_wrk_sessions;
1959       n_sessions += n_wrk_sessions;
1960     }
1961
1962   vlib_stats_set_gauge (d->private_data, n_sessions);
1963 }
1964
1965 static void
1966 session_stats_collector_init (void)
1967 {
1968   vlib_stats_collector_reg_t reg = {};
1969
1970   reg.entry_index =
1971     vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1972   reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1973   reg.collect_fn = session_stats_collector_fn;
1974   vlib_stats_register_collector_fn (&reg);
1975   vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1976 }
1977
1978 static clib_error_t *
1979 session_manager_main_enable (vlib_main_t * vm)
1980 {
1981   session_main_t *smm = &session_main;
1982   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1983   u32 num_threads, preallocated_sessions_per_worker;
1984   session_worker_t *wrk;
1985   int i;
1986
1987   /* We only initialize once and do not de-initialized on disable */
1988   if (smm->is_initialized)
1989     goto done;
1990
1991   num_threads = 1 /* main thread */  + vtm->n_threads;
1992
1993   if (num_threads < 1)
1994     return clib_error_return (0, "n_thread_stacks not set");
1995
1996   /* Allocate cache line aligned worker contexts */
1997   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1998   clib_spinlock_init (&session_main.pool_realloc_lock);
1999
2000   for (i = 0; i < num_threads; i++)
2001     {
2002       wrk = &smm->wrk[i];
2003       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
2004       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
2005       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
2006       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
2007       wrk->evts_pending_main =
2008         clib_llist_make_head (wrk->event_elts, evt_list);
2009       wrk->vm = vlib_get_main_by_index (i);
2010       wrk->last_vlib_time = vlib_time_now (vm);
2011       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
2012       wrk->timerfd = -1;
2013       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
2014
2015       if (!smm->no_adaptive && smm->use_private_rx_mqs)
2016         session_wrk_enable_adaptive_mode (wrk);
2017     }
2018
2019   /* Allocate vpp event queues segment and queue */
2020   session_vpp_wrk_mqs_alloc (smm);
2021
2022   /* Initialize segment manager properties */
2023   segment_manager_main_init ();
2024
2025   /* Preallocate sessions */
2026   if (smm->preallocated_sessions)
2027     {
2028       if (num_threads == 1)
2029         {
2030           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
2031         }
2032       else
2033         {
2034           int j;
2035           preallocated_sessions_per_worker =
2036             (1.1 * (f64) smm->preallocated_sessions /
2037              (f64) (num_threads - 1));
2038
2039           for (j = 1; j < num_threads; j++)
2040             {
2041               pool_init_fixed (smm->wrk[j].sessions,
2042                                preallocated_sessions_per_worker);
2043             }
2044         }
2045     }
2046
2047   session_lookup_init ();
2048   app_namespaces_init ();
2049   transport_init ();
2050   session_stats_collector_init ();
2051   smm->is_initialized = 1;
2052
2053 done:
2054
2055   smm->is_enabled = 1;
2056
2057   /* Enable transports */
2058   transport_enable_disable (vm, 1);
2059   session_debug_init ();
2060
2061   return 0;
2062 }
2063
2064 static void
2065 session_manager_main_disable (vlib_main_t * vm)
2066 {
2067   transport_enable_disable (vm, 0 /* is_en */ );
2068 }
2069
2070 /* in this new callback, cookie hint the index */
2071 void
2072 session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
2073 {
2074   session_worker_t *wrk;
2075   wrk = session_main_get_worker (vm->thread_index);
2076   session_dma_transfer *dma_transfer;
2077
2078   dma_transfer = &wrk->dma_trans[wrk->trans_head];
2079   vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2080            vec_len (dma_transfer->pending_tx_buffers));
2081   vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2082            vec_len (dma_transfer->pending_tx_nexts));
2083   vec_reset_length (dma_transfer->pending_tx_buffers);
2084   vec_reset_length (dma_transfer->pending_tx_nexts);
2085   wrk->trans_head++;
2086   if (wrk->trans_head == wrk->trans_size)
2087     wrk->trans_head = 0;
2088   return;
2089 }
2090
2091 static void
2092 session_prepare_dma_args (vlib_dma_config_t *args)
2093 {
2094   args->max_batches = 16;
2095   args->max_transfers = DMA_TRANS_SIZE;
2096   args->max_transfer_size = 65536;
2097   args->features = 0;
2098   args->sw_fallback = 1;
2099   args->barrier_before_last = 1;
2100   args->callback_fn = session_dma_completion_cb;
2101 }
2102
2103 static void
2104 session_node_enable_dma (u8 is_en, int n_vlibs)
2105 {
2106   vlib_dma_config_t args;
2107   session_prepare_dma_args (&args);
2108   session_worker_t *wrk;
2109   vlib_main_t *vm;
2110
2111   int config_index = -1;
2112
2113   if (is_en)
2114     {
2115       vm = vlib_get_main_by_index (0);
2116       config_index = vlib_dma_config_add (vm, &args);
2117     }
2118   else
2119     {
2120       vm = vlib_get_main_by_index (0);
2121       wrk = session_main_get_worker (0);
2122       if (wrk->config_index >= 0)
2123         vlib_dma_config_del (vm, wrk->config_index);
2124     }
2125   int i;
2126   for (i = 0; i < n_vlibs; i++)
2127     {
2128       vm = vlib_get_main_by_index (i);
2129       wrk = session_main_get_worker (vm->thread_index);
2130       wrk->config_index = config_index;
2131       if (is_en)
2132         {
2133           if (config_index >= 0)
2134             wrk->dma_enabled = true;
2135           wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2136             sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2137           bzero (wrk->dma_trans,
2138                  sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2139         }
2140       else
2141         {
2142           if (wrk->dma_trans)
2143             clib_mem_free (wrk->dma_trans);
2144         }
2145       wrk->trans_head = 0;
2146       wrk->trans_tail = 0;
2147       wrk->trans_size = DMA_TRANS_SIZE;
2148     }
2149 }
2150
2151 void
2152 session_node_enable_disable (u8 is_en)
2153 {
2154   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2155   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2156   session_main_t *sm = &session_main;
2157   vlib_main_t *vm;
2158   vlib_node_t *n;
2159   int n_vlibs, i;
2160
2161   n_vlibs = vlib_get_n_threads ();
2162   for (i = 0; i < n_vlibs; i++)
2163     {
2164       vm = vlib_get_main_by_index (i);
2165       /* main thread with workers and not polling */
2166       if (i == 0 && n_vlibs > 1)
2167         {
2168           vlib_node_set_state (vm, session_queue_node.index, mstate);
2169           if (is_en)
2170             {
2171               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2172               vlib_node_set_state (vm, session_queue_process_node.index,
2173                                    state);
2174               n = vlib_get_node (vm, session_queue_process_node.index);
2175               vlib_start_process (vm, n->runtime_index);
2176             }
2177           else
2178             {
2179               vlib_process_signal_event_mt (vm,
2180                                             session_queue_process_node.index,
2181                                             SESSION_Q_PROCESS_STOP, 0);
2182             }
2183           if (!sm->poll_main)
2184             continue;
2185         }
2186       vlib_node_set_state (vm, session_input_node.index, mstate);
2187       vlib_node_set_state (vm, session_queue_node.index, state);
2188     }
2189
2190   if (sm->use_private_rx_mqs)
2191     application_enable_rx_mqs_nodes (is_en);
2192
2193   if (sm->dma_enabled)
2194     session_node_enable_dma (is_en, n_vlibs);
2195 }
2196
2197 clib_error_t *
2198 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2199 {
2200   clib_error_t *error = 0;
2201   if (is_en)
2202     {
2203       if (session_main.is_enabled)
2204         return 0;
2205
2206       error = session_manager_main_enable (vm);
2207       session_node_enable_disable (is_en);
2208     }
2209   else
2210     {
2211       session_main.is_enabled = 0;
2212       session_manager_main_disable (vm);
2213       session_node_enable_disable (is_en);
2214     }
2215
2216   return error;
2217 }
2218
2219 clib_error_t *
2220 session_main_init (vlib_main_t * vm)
2221 {
2222   session_main_t *smm = &session_main;
2223
2224   smm->is_enabled = 0;
2225   smm->session_enable_asap = 0;
2226   smm->poll_main = 0;
2227   smm->use_private_rx_mqs = 0;
2228   smm->no_adaptive = 0;
2229   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2230   smm->port_allocator_min_src_port = 1024;
2231   smm->port_allocator_max_src_port = 65535;
2232
2233   return 0;
2234 }
2235
2236 static clib_error_t *
2237 session_main_loop_init (vlib_main_t * vm)
2238 {
2239   session_main_t *smm = &session_main;
2240   if (smm->session_enable_asap)
2241     {
2242       vlib_worker_thread_barrier_sync (vm);
2243       vnet_session_enable_disable (vm, 1 /* is_en */ );
2244       vlib_worker_thread_barrier_release (vm);
2245     }
2246   return 0;
2247 }
2248
2249 VLIB_INIT_FUNCTION (session_main_init);
2250 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2251
2252 static clib_error_t *
2253 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2254 {
2255   session_main_t *smm = &session_main;
2256   u32 nitems;
2257   uword tmp;
2258
2259   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2260     {
2261       if (unformat (input, "wrk-mq-length %d", &nitems))
2262         {
2263           if (nitems >= 2048)
2264             smm->configured_wrk_mq_length = nitems;
2265           else
2266             clib_warning ("event queue length %d too small, ignored", nitems);
2267         }
2268       else if (unformat (input, "wrk-mqs-segment-size %U",
2269                          unformat_memory_size, &smm->wrk_mqs_segment_size))
2270         ;
2271       else if (unformat (input, "preallocated-sessions %d",
2272                          &smm->preallocated_sessions))
2273         ;
2274       else if (unformat (input, "v4-session-table-buckets %d",
2275                          &smm->configured_v4_session_table_buckets))
2276         ;
2277       else if (unformat (input, "v4-halfopen-table-buckets %d",
2278                          &smm->configured_v4_halfopen_table_buckets))
2279         ;
2280       else if (unformat (input, "v6-session-table-buckets %d",
2281                          &smm->configured_v6_session_table_buckets))
2282         ;
2283       else if (unformat (input, "v6-halfopen-table-buckets %d",
2284                          &smm->configured_v6_halfopen_table_buckets))
2285         ;
2286       else if (unformat (input, "v4-session-table-memory %U",
2287                          unformat_memory_size, &tmp))
2288         {
2289           if (tmp >= 0x100000000)
2290             return clib_error_return (0, "memory size %llx (%lld) too large",
2291                                       tmp, tmp);
2292           smm->configured_v4_session_table_memory = tmp;
2293         }
2294       else if (unformat (input, "v4-halfopen-table-memory %U",
2295                          unformat_memory_size, &tmp))
2296         {
2297           if (tmp >= 0x100000000)
2298             return clib_error_return (0, "memory size %llx (%lld) too large",
2299                                       tmp, tmp);
2300           smm->configured_v4_halfopen_table_memory = tmp;
2301         }
2302       else if (unformat (input, "v6-session-table-memory %U",
2303                          unformat_memory_size, &tmp))
2304         {
2305           if (tmp >= 0x100000000)
2306             return clib_error_return (0, "memory size %llx (%lld) too large",
2307                                       tmp, tmp);
2308           smm->configured_v6_session_table_memory = tmp;
2309         }
2310       else if (unformat (input, "v6-halfopen-table-memory %U",
2311                          unformat_memory_size, &tmp))
2312         {
2313           if (tmp >= 0x100000000)
2314             return clib_error_return (0, "memory size %llx (%lld) too large",
2315                                       tmp, tmp);
2316           smm->configured_v6_halfopen_table_memory = tmp;
2317         }
2318       else if (unformat (input, "local-endpoints-table-memory %U",
2319                          unformat_memory_size, &tmp))
2320         {
2321           if (tmp >= 0x100000000)
2322             return clib_error_return (0, "memory size %llx (%lld) too large",
2323                                       tmp, tmp);
2324           smm->local_endpoints_table_memory = tmp;
2325         }
2326       else if (unformat (input, "local-endpoints-table-buckets %d",
2327                          &smm->local_endpoints_table_buckets))
2328         ;
2329       else if (unformat (input, "min-src-port %d", &tmp))
2330         smm->port_allocator_min_src_port = tmp;
2331       else if (unformat (input, "max-src-port %d", &tmp))
2332         smm->port_allocator_max_src_port = tmp;
2333       else if (unformat (input, "enable"))
2334         smm->session_enable_asap = 1;
2335       else if (unformat (input, "use-app-socket-api"))
2336         (void) appns_sapi_enable_disable (1 /* is_enable */);
2337       else if (unformat (input, "poll-main"))
2338         smm->poll_main = 1;
2339       else if (unformat (input, "use-private-rx-mqs"))
2340         smm->use_private_rx_mqs = 1;
2341       else if (unformat (input, "no-adaptive"))
2342         smm->no_adaptive = 1;
2343       else if (unformat (input, "use-dma"))
2344         smm->dma_enabled = 1;
2345       else if (unformat (input, "nat44-original-dst-enable"))
2346         {
2347           smm->original_dst_lookup = vlib_get_plugin_symbol (
2348             "nat_plugin.so", "nat44_original_dst_lookup");
2349         }
2350       /*
2351        * Deprecated but maintained for compatibility
2352        */
2353       else if (unformat (input, "evt_qs_memfd_seg"))
2354         ;
2355       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2356         ;
2357       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2358                          &smm->wrk_mqs_segment_size))
2359         ;
2360       else if (unformat (input, "event-queue-length %d", &nitems))
2361         {
2362           if (nitems >= 2048)
2363             smm->configured_wrk_mq_length = nitems;
2364           else
2365             clib_warning ("event queue length %d too small, ignored", nitems);
2366         }
2367       else
2368         return clib_error_return (0, "unknown input `%U'",
2369                                   format_unformat_error, input);
2370     }
2371   return 0;
2372 }
2373
2374 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2375
2376 /*
2377  * fd.io coding-style-patch-verification: ON
2378  *
2379  * Local Variables:
2380  * eval: (c-set-style "gnu")
2381  * End:
2382  */