session: avoid spurious closed notifications
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/plugin/plugin.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/dpo/load_balance.h>
24 #include <vnet/fib/ip4_fib.h>
25 #include <vlib/stats/stats.h>
26 #include <vlib/dma/dma.h>
27
28 session_main_t session_main;
29
30 static inline int
31 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
32                             session_evt_type_t evt_type)
33 {
34   session_worker_t *wrk = session_main_get_worker (thread_index);
35   session_event_t *evt;
36   svm_msg_q_msg_t msg;
37   svm_msg_q_t *mq;
38
39   mq = wrk->vpp_event_queue;
40   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
41     return -1;
42   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
43     {
44       svm_msg_q_unlock (mq);
45       return -2;
46     }
47   switch (evt_type)
48     {
49     case SESSION_CTRL_EVT_RPC:
50       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
51       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
52       evt->rpc_args.fp = data;
53       evt->rpc_args.arg = args;
54       break;
55     case SESSION_IO_EVT_RX:
56     case SESSION_IO_EVT_TX:
57     case SESSION_IO_EVT_TX_FLUSH:
58     case SESSION_IO_EVT_BUILTIN_RX:
59       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
60       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
61       evt->session_index = *(u32 *) data;
62       break;
63     case SESSION_IO_EVT_TX_MAIN:
64     case SESSION_CTRL_EVT_CLOSE:
65     case SESSION_CTRL_EVT_RESET:
66       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
67       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
68       evt->session_handle = session_handle ((session_t *) data);
69       break;
70     default:
71       clib_warning ("evt unhandled!");
72       svm_msg_q_unlock (mq);
73       return -1;
74     }
75   evt->event_type = evt_type;
76
77   svm_msg_q_add_and_unlock (mq, &msg);
78
79   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
80     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
81
82   return 0;
83 }
84
85 int
86 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
87 {
88   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
89                                      f->master_thread_index, evt_type);
90 }
91
92 int
93 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
94                                       session_evt_type_t evt_type)
95 {
96   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
97 }
98
99 int
100 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
101 {
102   /* only events supported are disconnect, shutdown and reset */
103   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
104           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
105           evt_type == SESSION_CTRL_EVT_RESET);
106   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
107 }
108
109 void
110 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
111                                       void *rpc_args)
112 {
113   session_send_evt_to_thread (fp, rpc_args, thread_index,
114                               SESSION_CTRL_EVT_RPC);
115 }
116
117 void
118 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
119 {
120   if (thread_index != vlib_get_thread_index ())
121     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
122   else
123     {
124       void (*fnp) (void *) = fp;
125       fnp (rpc_args);
126     }
127 }
128
129 void
130 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
131 {
132   session_t *s = session_get (tc->s_index, tc->thread_index);
133
134   ASSERT (s->thread_index == vlib_get_thread_index ());
135   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
136
137   if (!(s->flags & SESSION_F_CUSTOM_TX))
138     {
139       s->flags |= SESSION_F_CUSTOM_TX;
140       if (svm_fifo_set_event (s->tx_fifo)
141           || transport_connection_is_descheduled (tc))
142         {
143           session_evt_elt_t *elt;
144           session_worker_t *wrk;
145
146           wrk = session_main_get_worker (tc->thread_index);
147           if (has_prio)
148             elt = session_evt_alloc_new (wrk);
149           else
150             elt = session_evt_alloc_old (wrk);
151           elt->evt.session_index = tc->s_index;
152           elt->evt.event_type = SESSION_IO_EVT_TX;
153           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
154
155           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
156             vlib_node_set_interrupt_pending (wrk->vm,
157                                              session_queue_node.index);
158         }
159     }
160 }
161
162 void
163 sesssion_reschedule_tx (transport_connection_t * tc)
164 {
165   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
166   session_evt_elt_t *elt;
167
168   ASSERT (tc->thread_index == vlib_get_thread_index ());
169
170   elt = session_evt_alloc_new (wrk);
171   elt->evt.session_index = tc->s_index;
172   elt->evt.event_type = SESSION_IO_EVT_TX;
173
174   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
175     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
176 }
177
178 static void
179 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
180 {
181   u32 thread_index = vlib_get_thread_index ();
182   session_evt_elt_t *elt;
183   session_worker_t *wrk;
184
185   /* If we are in the handler thread, or being called with the worker barrier
186    * held, just append a new event to pending disconnects vector. */
187   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
188     {
189       wrk = session_main_get_worker (s->thread_index);
190       elt = session_evt_alloc_ctrl (wrk);
191       clib_memset (&elt->evt, 0, sizeof (session_event_t));
192       elt->evt.session_handle = session_handle (s);
193       elt->evt.event_type = evt;
194
195       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
196         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
197     }
198   else
199     session_send_ctrl_evt_to_thread (s, evt);
200 }
201
202 session_t *
203 session_alloc (u32 thread_index)
204 {
205   session_worker_t *wrk = &session_main.wrk[thread_index];
206   session_t *s;
207
208   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
209   clib_memset (s, 0, sizeof (*s));
210   s->session_index = s - wrk->sessions;
211   s->thread_index = thread_index;
212   s->app_index = APP_INVALID_INDEX;
213
214   return s;
215 }
216
217 void
218 session_free (session_t * s)
219 {
220   session_worker_t *wrk = &session_main.wrk[s->thread_index];
221
222   SESSION_EVT (SESSION_EVT_FREE, s);
223   if (CLIB_DEBUG)
224     clib_memset (s, 0xFA, sizeof (*s));
225   pool_put (wrk->sessions, s);
226 }
227
228 u8
229 session_is_valid (u32 si, u8 thread_index)
230 {
231   session_t *s;
232   transport_connection_t *tc;
233
234   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
235
236   if (s->thread_index != thread_index || s->session_index != si)
237     return 0;
238
239   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
240       || s->session_state <= SESSION_STATE_LISTENING)
241     return 1;
242
243   if ((s->session_state == SESSION_STATE_CONNECTING ||
244        s->session_state == SESSION_STATE_TRANSPORT_CLOSED) &&
245       (s->flags & SESSION_F_HALF_OPEN))
246     return 1;
247
248   tc = session_get_transport (s);
249   if (s->connection_index != tc->c_index ||
250       s->thread_index != tc->thread_index || tc->s_index != si)
251     return 0;
252
253   return 1;
254 }
255
256 void
257 session_cleanup (session_t *s)
258 {
259   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
260   session_free (s);
261 }
262
263 static void
264 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
265 {
266   app_worker_t *app_wrk;
267
268   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
269   if (!app_wrk)
270     {
271       if (ntf == SESSION_CLEANUP_TRANSPORT)
272         return;
273
274       session_cleanup (s);
275       return;
276     }
277   app_worker_cleanup_notify (app_wrk, s, ntf);
278 }
279
280 void
281 session_program_cleanup (session_t *s)
282 {
283   ASSERT (s->session_state == SESSION_STATE_TRANSPORT_DELETED);
284   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
285 }
286
287 /**
288  * Cleans up session and lookup table.
289  *
290  * Transport connection must still be valid.
291  */
292 static void
293 session_delete (session_t * s)
294 {
295   int rv;
296
297   /* Delete from the main lookup table. */
298   if ((rv = session_lookup_del_session (s)))
299     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
300
301   session_program_cleanup (s);
302 }
303
304 void
305 session_cleanup_half_open (session_handle_t ho_handle)
306 {
307   session_t *ho = session_get_from_handle (ho_handle);
308
309   /* App transports can migrate their half-opens */
310   if (ho->flags & SESSION_F_IS_MIGRATING)
311     {
312       /* Session still migrating, move to closed state to signal that the
313        * session should be removed. */
314       if (ho->connection_index == ~0)
315         {
316           session_set_state (ho, SESSION_STATE_CLOSED);
317           return;
318         }
319       /* Migrated transports are no longer half-opens */
320       transport_cleanup (session_get_transport_proto (ho),
321                          ho->connection_index, ho->app_index /* overloaded */);
322     }
323   else if (ho->session_state != SESSION_STATE_TRANSPORT_DELETED)
324     {
325       /* Cleanup half-open session lookup table if need be */
326       if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
327         {
328           transport_connection_t *tc;
329           tc = transport_get_half_open (session_get_transport_proto (ho),
330                                         ho->connection_index);
331           if (tc && !(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
332             session_lookup_del_half_open (tc);
333         }
334       transport_cleanup_half_open (session_get_transport_proto (ho),
335                                    ho->connection_index);
336     }
337   session_free (ho);
338 }
339
340 static void
341 session_half_open_free (session_t *ho)
342 {
343   app_worker_t *app_wrk;
344
345   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
346   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
347   if (app_wrk)
348     app_worker_del_half_open (app_wrk, ho);
349   else
350     session_free (ho);
351 }
352
353 static void
354 session_half_open_free_rpc (void *args)
355 {
356   session_t *ho = ho_session_get (pointer_to_uword (args));
357   session_half_open_free (ho);
358 }
359
360 void
361 session_half_open_delete_notify (transport_connection_t *tc)
362 {
363   session_t *ho = ho_session_get (tc->s_index);
364
365   /* Cleanup half-open lookup table if need be */
366   if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
367     {
368       if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
369         session_lookup_del_half_open (tc);
370     }
371   session_set_state (ho, SESSION_STATE_TRANSPORT_DELETED);
372
373   /* Notification from ctrl thread accepted without rpc */
374   if (tc->thread_index == transport_cl_thread ())
375     {
376       session_half_open_free (ho);
377     }
378   else
379     {
380       void *args = uword_to_pointer ((uword) tc->s_index, void *);
381       session_send_rpc_evt_to_thread_force (transport_cl_thread (),
382                                             session_half_open_free_rpc, args);
383     }
384 }
385
386 void
387 session_half_open_migrate_notify (transport_connection_t *tc)
388 {
389   session_t *ho;
390
391   /* Support half-open migrations only for transports with no lookup */
392   ASSERT (tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP);
393
394   ho = ho_session_get (tc->s_index);
395   ho->flags |= SESSION_F_IS_MIGRATING;
396   ho->connection_index = ~0;
397 }
398
399 int
400 session_half_open_migrated_notify (transport_connection_t *tc)
401 {
402   session_t *ho;
403
404   ho = ho_session_get (tc->s_index);
405
406   /* App probably detached so the half-open must be cleaned up */
407   if (ho->session_state == SESSION_STATE_CLOSED)
408     {
409       session_half_open_delete_notify (tc);
410       return -1;
411     }
412   ho->connection_index = tc->c_index;
413   /* Overload app index for half-open with new thread */
414   ho->app_index = tc->thread_index;
415   return 0;
416 }
417
418 session_t *
419 session_alloc_for_connection (transport_connection_t * tc)
420 {
421   session_t *s;
422   u32 thread_index = tc->thread_index;
423
424   ASSERT (thread_index == vlib_get_thread_index ()
425           || transport_protocol_is_cl (tc->proto));
426
427   s = session_alloc (thread_index);
428   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
429   session_set_state (s, SESSION_STATE_CLOSED);
430
431   /* Attach transport to session and vice versa */
432   s->connection_index = tc->c_index;
433   tc->s_index = s->session_index;
434   return s;
435 }
436
437 session_t *
438 session_alloc_for_half_open (transport_connection_t *tc)
439 {
440   session_t *s;
441
442   s = ho_session_alloc ();
443   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
444   s->connection_index = tc->c_index;
445   tc->s_index = s->session_index;
446   return s;
447 }
448
449 /**
450  * Discards bytes from buffer chain
451  *
452  * It discards n_bytes_to_drop starting at first buffer after chain_b
453  */
454 always_inline void
455 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
456                                      vlib_buffer_t ** chain_b,
457                                      u32 n_bytes_to_drop)
458 {
459   vlib_buffer_t *next = *chain_b;
460   u32 to_drop = n_bytes_to_drop;
461   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
462   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
463     {
464       next = vlib_get_buffer (vm, next->next_buffer);
465       if (next->current_length > to_drop)
466         {
467           vlib_buffer_advance (next, to_drop);
468           to_drop = 0;
469         }
470       else
471         {
472           to_drop -= next->current_length;
473           next->current_length = 0;
474         }
475     }
476   *chain_b = next;
477
478   if (to_drop == 0)
479     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
480 }
481
482 /**
483  * Enqueue buffer chain tail
484  */
485 always_inline int
486 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
487                             u32 offset, u8 is_in_order)
488 {
489   vlib_buffer_t *chain_b;
490   u32 chain_bi, len, diff;
491   vlib_main_t *vm = vlib_get_main ();
492   u8 *data;
493   u32 written = 0;
494   int rv = 0;
495
496   if (is_in_order && offset)
497     {
498       diff = offset - b->current_length;
499       if (diff > b->total_length_not_including_first_buffer)
500         return 0;
501       chain_b = b;
502       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
503       chain_bi = vlib_get_buffer_index (vm, chain_b);
504     }
505   else
506     chain_bi = b->next_buffer;
507
508   do
509     {
510       chain_b = vlib_get_buffer (vm, chain_bi);
511       data = vlib_buffer_get_current (chain_b);
512       len = chain_b->current_length;
513       if (!len)
514         continue;
515       if (is_in_order)
516         {
517           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
518           if (rv == len)
519             {
520               written += rv;
521             }
522           else if (rv < len)
523             {
524               return (rv > 0) ? (written + rv) : written;
525             }
526           else if (rv > len)
527             {
528               written += rv;
529
530               /* written more than what was left in chain */
531               if (written > b->total_length_not_including_first_buffer)
532                 return written;
533
534               /* drop the bytes that have already been delivered */
535               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
536             }
537         }
538       else
539         {
540           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
541           if (rv)
542             {
543               clib_warning ("failed to enqueue multi-buffer seg");
544               return -1;
545             }
546           offset += len;
547         }
548     }
549   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
550           ? chain_b->next_buffer : 0));
551
552   if (is_in_order)
553     return written;
554
555   return 0;
556 }
557
558 void
559 session_fifo_tuning (session_t * s, svm_fifo_t * f,
560                      session_ft_action_t act, u32 len)
561 {
562   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
563     {
564       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
565       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
566       if (CLIB_ASSERT_ENABLE)
567         {
568           segment_manager_t *sm;
569           sm = segment_manager_get (f->segment_manager);
570           ASSERT (f->shr->size >= 4096);
571           ASSERT (f->shr->size <= sm->max_fifo_size);
572         }
573     }
574 }
575
576 void
577 session_wrk_program_app_wrk_evts (session_worker_t *wrk, u32 app_wrk_index)
578 {
579   u8 need_interrupt;
580
581   ASSERT ((wrk - session_main.wrk) == vlib_get_thread_index ());
582   need_interrupt = clib_bitmap_is_zero (wrk->app_wrks_pending_ntf);
583   wrk->app_wrks_pending_ntf =
584     clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk_index, 1);
585
586   if (need_interrupt)
587     vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
588 }
589
590 always_inline void
591 session_program_io_event (app_worker_t *app_wrk, session_t *s,
592                           session_evt_type_t et, u8 is_cl)
593 {
594   if (is_cl)
595     {
596       /* Special events for connectionless sessions */
597       et += SESSION_IO_EVT_BUILTIN_RX - SESSION_IO_EVT_RX;
598
599       ASSERT (s->thread_index == 0 || et == SESSION_IO_EVT_TX_MAIN);
600       session_event_t evt = {
601         .event_type = et,
602         .session_handle = session_handle (s),
603       };
604
605       app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
606     }
607   else
608     {
609       app_worker_add_event (app_wrk, s, et);
610     }
611 }
612
613 static inline int
614 session_notify_subscribers (u32 app_index, session_t *s, svm_fifo_t *f,
615                             session_evt_type_t evt_type)
616 {
617   app_worker_t *app_wrk;
618   application_t *app;
619   u8 is_cl;
620   int i;
621
622   app = application_get (app_index);
623   if (!app)
624     return -1;
625
626   is_cl = s->thread_index != vlib_get_thread_index ();
627   for (i = 0; i < f->shr->n_subscribers; i++)
628     {
629       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
630       if (!app_wrk)
631         continue;
632       session_program_io_event (app_wrk, s, evt_type, is_cl ? 1 : 0);
633     }
634
635   return 0;
636 }
637
638 always_inline int
639 session_enqueue_notify_inline (session_t *s, u8 is_cl)
640 {
641   app_worker_t *app_wrk;
642
643   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
644   if (PREDICT_FALSE (!app_wrk))
645     return -1;
646
647   session_program_io_event (app_wrk, s, SESSION_IO_EVT_RX, is_cl);
648
649   if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo)))
650     return session_notify_subscribers (app_wrk->app_index, s, s->rx_fifo,
651                                        SESSION_IO_EVT_RX);
652
653   return 0;
654 }
655
656 int
657 session_enqueue_notify (session_t *s)
658 {
659   return session_enqueue_notify_inline (s, 0 /* is_cl */);
660 }
661
662 int
663 session_enqueue_notify_cl (session_t *s)
664 {
665   return session_enqueue_notify_inline (s, 1 /* is_cl */);
666 }
667
668 int
669 session_dequeue_notify (session_t *s)
670 {
671   app_worker_t *app_wrk;
672   u8 is_cl;
673
674   /* Unset as soon as event is requested */
675   svm_fifo_clear_deq_ntf (s->tx_fifo);
676
677   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
678   if (PREDICT_FALSE (!app_wrk))
679     return -1;
680
681   is_cl = s->session_state == SESSION_STATE_LISTENING ||
682           s->session_state == SESSION_STATE_OPENED;
683   session_program_io_event (app_wrk, s, SESSION_IO_EVT_TX, is_cl ? 1 : 0);
684
685   if (PREDICT_FALSE (svm_fifo_n_subscribers (s->tx_fifo)))
686     return session_notify_subscribers (app_wrk->app_index, s, s->tx_fifo,
687                                        SESSION_IO_EVT_TX);
688
689   return 0;
690 }
691
692 /**
693  * Flushes queue of sessions that are to be notified of new data
694  * enqueued events.
695  *
696  * @param transport_proto transport protocol for which queue to be flushed
697  * @param thread_index Thread index for which the flush is to be performed.
698  * @return 0 on success or a positive number indicating the number of
699  *         failures due to API queue being full.
700  */
701 void
702 session_main_flush_enqueue_events (transport_proto_t transport_proto,
703                                    u32 thread_index)
704 {
705   session_worker_t *wrk = session_main_get_worker (thread_index);
706   session_handle_t *handles;
707   session_t *s;
708   u32 i, is_cl;
709
710   handles = wrk->session_to_enqueue[transport_proto];
711
712   for (i = 0; i < vec_len (handles); i++)
713     {
714       s = session_get_from_handle (handles[i]);
715       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
716                            0 /* TODO/not needed */);
717       is_cl =
718         s->thread_index != thread_index || (s->flags & SESSION_F_IS_CLESS);
719       if (!is_cl)
720         session_enqueue_notify_inline (s, 0);
721       else
722         session_enqueue_notify_inline (s, 1);
723     }
724
725   vec_reset_length (handles);
726   wrk->session_to_enqueue[transport_proto] = handles;
727 }
728
729 /*
730  * Enqueue data for delivery to app. If requested, it queues app notification
731  * event for later delivery.
732  *
733  * @param tc Transport connection which is to be enqueued data
734  * @param b Buffer to be enqueued
735  * @param offset Offset at which to start enqueueing if out-of-order
736  * @param queue_event Flag to indicate if peer is to be notified or if event
737  *                    is to be queued. The former is useful when more data is
738  *                    enqueued and only one event is to be generated.
739  * @param is_in_order Flag to indicate if data is in order
740  * @return Number of bytes enqueued or a negative value if enqueueing failed.
741  */
742 int
743 session_enqueue_stream_connection (transport_connection_t * tc,
744                                    vlib_buffer_t * b, u32 offset,
745                                    u8 queue_event, u8 is_in_order)
746 {
747   session_t *s;
748   int enqueued = 0, rv, in_order_off;
749
750   s = session_get (tc->s_index, tc->thread_index);
751
752   if (is_in_order)
753     {
754       enqueued = svm_fifo_enqueue (s->rx_fifo,
755                                    b->current_length,
756                                    vlib_buffer_get_current (b));
757       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
758                          && enqueued >= 0))
759         {
760           in_order_off = enqueued > b->current_length ? enqueued : 0;
761           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
762           if (rv > 0)
763             enqueued += rv;
764         }
765     }
766   else
767     {
768       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
769                                          b->current_length,
770                                          vlib_buffer_get_current (b));
771       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
772         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
773       /* if something was enqueued, report even this as success for ooo
774        * segment handling */
775       return rv;
776     }
777
778   if (queue_event)
779     {
780       /* Queue RX event on this fifo. Eventually these will need to be
781        * flushed by calling @ref session_main_flush_enqueue_events () */
782       if (!(s->flags & SESSION_F_RX_EVT))
783         {
784           session_worker_t *wrk = session_main_get_worker (s->thread_index);
785           ASSERT (s->thread_index == vlib_get_thread_index ());
786           s->flags |= SESSION_F_RX_EVT;
787           vec_add1 (wrk->session_to_enqueue[tc->proto], session_handle (s));
788         }
789
790       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
791     }
792
793   return enqueued;
794 }
795
796 always_inline int
797 session_enqueue_dgram_connection_inline (session_t *s,
798                                          session_dgram_hdr_t *hdr,
799                                          vlib_buffer_t *b, u8 proto,
800                                          u8 queue_event, u32 is_cl)
801 {
802   int rv;
803
804   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
805           >= b->current_length + sizeof (*hdr));
806
807   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
808     {
809       svm_fifo_seg_t segs[2] = {
810           { (u8 *) hdr, sizeof (*hdr) },
811           { vlib_buffer_get_current (b), b->current_length }
812       };
813
814       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
815                                       0 /* allow_partial */ );
816     }
817   else
818     {
819       vlib_main_t *vm = vlib_get_main ();
820       svm_fifo_seg_t *segs = 0, *seg;
821       vlib_buffer_t *it = b;
822       u32 n_segs = 1;
823
824       vec_add2 (segs, seg, 1);
825       seg->data = (u8 *) hdr;
826       seg->len = sizeof (*hdr);
827       while (it)
828         {
829           vec_add2 (segs, seg, 1);
830           seg->data = vlib_buffer_get_current (it);
831           seg->len = it->current_length;
832           n_segs++;
833           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
834             break;
835           it = vlib_get_buffer (vm, it->next_buffer);
836         }
837       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
838                                       0 /* allow partial */ );
839       vec_free (segs);
840     }
841
842   if (queue_event && rv > 0)
843     {
844       /* Queue RX event on this fifo. Eventually these will need to be
845        * flushed by calling @ref session_main_flush_enqueue_events () */
846       if (!(s->flags & SESSION_F_RX_EVT))
847         {
848           u32 thread_index =
849             is_cl ? vlib_get_thread_index () : s->thread_index;
850           session_worker_t *wrk = session_main_get_worker (thread_index);
851           ASSERT (s->thread_index == vlib_get_thread_index () || is_cl);
852           s->flags |= SESSION_F_RX_EVT;
853           vec_add1 (wrk->session_to_enqueue[proto], session_handle (s));
854         }
855
856       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
857     }
858   return rv > 0 ? rv : 0;
859 }
860
861 int
862 session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr,
863                                   vlib_buffer_t *b, u8 proto, u8 queue_event)
864 {
865   return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
866                                                   queue_event, 0 /* is_cl */);
867 }
868
869 int
870 session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr,
871                                      vlib_buffer_t *b, u8 proto,
872                                      u8 queue_event)
873 {
874   return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
875                                                   queue_event, 1 /* is_cl */);
876 }
877
878 int
879 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
880                             u32 offset, u32 max_bytes)
881 {
882   session_t *s = session_get (tc->s_index, tc->thread_index);
883   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
884 }
885
886 u32
887 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
888 {
889   session_t *s = session_get (tc->s_index, tc->thread_index);
890   u32 rv;
891
892   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
893   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
894
895   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
896     session_dequeue_notify (s);
897
898   return rv;
899 }
900
901 int
902 session_stream_connect_notify (transport_connection_t * tc,
903                                session_error_t err)
904 {
905   u32 opaque = 0, new_ti, new_si;
906   app_worker_t *app_wrk;
907   session_t *s = 0, *ho;
908
909   /*
910    * Cleanup half-open table
911    */
912   session_lookup_del_half_open (tc);
913
914   ho = ho_session_get (tc->s_index);
915   session_set_state (ho, SESSION_STATE_TRANSPORT_CLOSED);
916   opaque = ho->opaque;
917   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
918   if (!app_wrk)
919     return -1;
920
921   if (err)
922     return app_worker_connect_notify (app_wrk, s, err, opaque);
923
924   s = session_alloc_for_connection (tc);
925   session_set_state (s, SESSION_STATE_CONNECTING);
926   s->app_wrk_index = app_wrk->wrk_index;
927   s->opaque = opaque;
928   new_si = s->session_index;
929   new_ti = s->thread_index;
930
931   if ((err = app_worker_init_connected (app_wrk, s)))
932     {
933       session_free (s);
934       app_worker_connect_notify (app_wrk, 0, err, opaque);
935       return -1;
936     }
937
938   s = session_get (new_si, new_ti);
939   session_set_state (s, SESSION_STATE_READY);
940   session_lookup_add_connection (tc, session_handle (s));
941
942   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
943     {
944       session_lookup_del_connection (tc);
945       /* Avoid notifying app about rejected session cleanup */
946       s = session_get (new_si, new_ti);
947       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
948       session_free (s);
949       return -1;
950     }
951
952   return 0;
953 }
954
955 static void
956 session_switch_pool_closed_rpc (void *arg)
957 {
958   session_handle_t sh;
959   session_t *s;
960
961   sh = pointer_to_uword (arg);
962   s = session_get_from_handle_if_valid (sh);
963   if (!s)
964     return;
965
966   transport_cleanup (session_get_transport_proto (s), s->connection_index,
967                      s->thread_index);
968   session_cleanup (s);
969 }
970
971 typedef struct _session_switch_pool_args
972 {
973   u32 session_index;
974   u32 thread_index;
975   u32 new_thread_index;
976   u32 new_session_index;
977 } session_switch_pool_args_t;
978
979 /**
980  * Notify old thread of the session pool switch
981  */
982 static void
983 session_switch_pool (void *cb_args)
984 {
985   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
986   session_handle_t sh, new_sh;
987   segment_manager_t *sm;
988   app_worker_t *app_wrk;
989   session_t *s;
990
991   ASSERT (args->thread_index == vlib_get_thread_index ());
992   s = session_get (args->session_index, args->thread_index);
993
994   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
995   if (!app_wrk)
996     goto app_closed;
997
998   /* Cleanup fifo segment slice state for fifos */
999   sm = app_worker_get_connect_segment_manager (app_wrk);
1000   segment_manager_detach_fifo (sm, &s->rx_fifo);
1001   segment_manager_detach_fifo (sm, &s->tx_fifo);
1002
1003   /* Check if session closed during migration */
1004   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1005     goto app_closed;
1006
1007   new_sh =
1008     session_make_handle (args->new_session_index, args->new_thread_index);
1009   app_worker_migrate_notify (app_wrk, s, new_sh);
1010
1011   clib_mem_free (cb_args);
1012   return;
1013
1014 app_closed:
1015   /* Session closed during migration. Clean everything up */
1016   sh = session_handle (s);
1017   session_send_rpc_evt_to_thread (args->new_thread_index,
1018                                   session_switch_pool_closed_rpc,
1019                                   uword_to_pointer (sh, void *));
1020   clib_mem_free (cb_args);
1021 }
1022
1023 /**
1024  * Move dgram session to the right thread
1025  */
1026 int
1027 session_dgram_connect_notify (transport_connection_t * tc,
1028                               u32 old_thread_index, session_t ** new_session)
1029 {
1030   session_t *new_s;
1031   session_switch_pool_args_t *rpc_args;
1032   segment_manager_t *sm;
1033   app_worker_t *app_wrk;
1034
1035   /*
1036    * Clone half-open session to the right thread.
1037    */
1038   new_s = session_clone_safe (tc->s_index, old_thread_index);
1039   new_s->connection_index = tc->c_index;
1040   session_set_state (new_s, SESSION_STATE_READY);
1041   new_s->flags |= SESSION_F_IS_MIGRATING;
1042
1043   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1044     session_lookup_add_connection (tc, session_handle (new_s));
1045
1046   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1047   if (app_wrk)
1048     {
1049       /* New set of fifos attached to the same shared memory */
1050       sm = app_worker_get_connect_segment_manager (app_wrk);
1051       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1052       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1053     }
1054
1055   /*
1056    * Ask thread owning the old session to clean it up and make us the tx
1057    * fifo owner
1058    */
1059   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1060   rpc_args->new_session_index = new_s->session_index;
1061   rpc_args->new_thread_index = new_s->thread_index;
1062   rpc_args->session_index = tc->s_index;
1063   rpc_args->thread_index = old_thread_index;
1064   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1065                                   rpc_args);
1066
1067   tc->s_index = new_s->session_index;
1068   new_s->connection_index = tc->c_index;
1069   *new_session = new_s;
1070   return 0;
1071 }
1072
1073 /**
1074  * Notification from transport that connection is being closed.
1075  *
1076  * A disconnect is sent to application but state is not removed. Once
1077  * disconnect is acknowledged by application, session disconnect is called.
1078  * Ultimately this leads to close being called on transport (passive close).
1079  */
1080 void
1081 session_transport_closing_notify (transport_connection_t * tc)
1082 {
1083   app_worker_t *app_wrk;
1084   session_t *s;
1085
1086   s = session_get (tc->s_index, tc->thread_index);
1087   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1088     return;
1089
1090   /* Wait for reply from app before sending notification as the
1091    * accept might be rejected */
1092   if (s->session_state == SESSION_STATE_ACCEPTING)
1093     {
1094       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1095       return;
1096     }
1097
1098   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1099   app_wrk = app_worker_get (s->app_wrk_index);
1100   app_worker_close_notify (app_wrk, s);
1101 }
1102
1103 /**
1104  * Notification from transport that connection is being deleted
1105  *
1106  * This removes the session if it is still valid. It should be called only on
1107  * previously fully established sessions. For instance failed connects should
1108  * call stream_session_connect_notify and indicate that the connect has
1109  * failed.
1110  */
1111 void
1112 session_transport_delete_notify (transport_connection_t * tc)
1113 {
1114   session_t *s;
1115
1116   /* App might've been removed already */
1117   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1118     return;
1119
1120   switch (s->session_state)
1121     {
1122     case SESSION_STATE_CREATED:
1123       /* Session was created but accept notification was not yet sent to the
1124        * app. Cleanup everything. */
1125       session_lookup_del_session (s);
1126       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1127       session_free (s);
1128       break;
1129     case SESSION_STATE_ACCEPTING:
1130     case SESSION_STATE_TRANSPORT_CLOSING:
1131     case SESSION_STATE_CLOSING:
1132     case SESSION_STATE_TRANSPORT_CLOSED:
1133       /* If transport finishes or times out before we get a reply
1134        * from the app, mark transport as closed and wait for reply
1135        * before removing the session. Cleanup session table in advance
1136        * because transport will soon be closed and closed sessions
1137        * are assumed to have been removed from the lookup table */
1138       session_lookup_del_session (s);
1139       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1140       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1141       svm_fifo_dequeue_drop_all (s->tx_fifo);
1142       break;
1143     case SESSION_STATE_APP_CLOSED:
1144       /* Cleanup lookup table as transport needs to still be valid.
1145        * Program transport close to ensure that all session events
1146        * have been cleaned up. Once transport close is called, the
1147        * session is just removed because both transport and app have
1148        * confirmed the close*/
1149       session_lookup_del_session (s);
1150       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1151       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1152       svm_fifo_dequeue_drop_all (s->tx_fifo);
1153       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1154       break;
1155     case SESSION_STATE_TRANSPORT_DELETED:
1156       break;
1157     case SESSION_STATE_CLOSED:
1158       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1159       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1160       session_delete (s);
1161       break;
1162     default:
1163       clib_warning ("session state %u", s->session_state);
1164       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1165       session_delete (s);
1166       break;
1167     }
1168 }
1169
1170 /**
1171  * Notification from transport that it is closed
1172  *
1173  * Should be called by transport, prior to calling delete notify, once it
1174  * knows that no more data will be exchanged. This could serve as an
1175  * early acknowledgment of an active close especially if transport delete
1176  * can be delayed a long time, e.g., tcp time-wait.
1177  */
1178 void
1179 session_transport_closed_notify (transport_connection_t * tc)
1180 {
1181   app_worker_t *app_wrk;
1182   session_t *s;
1183
1184   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1185     return;
1186
1187   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1188     return;
1189
1190   /* Transport thinks that app requested close but it actually didn't.
1191    * Can happen for tcp:
1192    * 1)if fin and rst are received in close succession.
1193    * 2)if app shutdown the connection.  */
1194   if (s->session_state == SESSION_STATE_READY)
1195     {
1196       session_transport_closing_notify (tc);
1197       svm_fifo_dequeue_drop_all (s->tx_fifo);
1198       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1199     }
1200   /* If app close has not been received or has not yet resulted in
1201    * a transport close, only mark the session transport as closed */
1202   else if (s->session_state <= SESSION_STATE_CLOSING)
1203     session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1204   /* If app also closed, switch to closed */
1205   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1206     session_set_state (s, SESSION_STATE_CLOSED);
1207
1208   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1209   if (app_wrk)
1210     app_worker_transport_closed_notify (app_wrk, s);
1211 }
1212
1213 /**
1214  * Notify application that connection has been reset.
1215  */
1216 void
1217 session_transport_reset_notify (transport_connection_t * tc)
1218 {
1219   app_worker_t *app_wrk;
1220   session_t *s;
1221
1222   s = session_get (tc->s_index, tc->thread_index);
1223   svm_fifo_dequeue_drop_all (s->tx_fifo);
1224   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1225     return;
1226   if (s->session_state == SESSION_STATE_ACCEPTING)
1227     {
1228       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1229       return;
1230     }
1231   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1232   app_wrk = app_worker_get (s->app_wrk_index);
1233   app_worker_reset_notify (app_wrk, s);
1234 }
1235
1236 int
1237 session_stream_accept_notify (transport_connection_t * tc)
1238 {
1239   app_worker_t *app_wrk;
1240   session_t *s;
1241
1242   s = session_get (tc->s_index, tc->thread_index);
1243   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1244   if (!app_wrk)
1245     return -1;
1246   if (s->session_state != SESSION_STATE_CREATED)
1247     return 0;
1248   session_set_state (s, SESSION_STATE_ACCEPTING);
1249   if (app_worker_accept_notify (app_wrk, s))
1250     {
1251       /* On transport delete, no notifications should be sent. Unless, the
1252        * accept is retried and successful. */
1253       session_set_state (s, SESSION_STATE_CREATED);
1254       return -1;
1255     }
1256   return 0;
1257 }
1258
1259 /**
1260  * Accept a stream session. Optionally ping the server by callback.
1261  */
1262 int
1263 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1264                        u32 thread_index, u8 notify)
1265 {
1266   session_t *s;
1267   int rv;
1268
1269   s = session_alloc_for_connection (tc);
1270   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1271   session_set_state (s, SESSION_STATE_CREATED);
1272
1273   if ((rv = app_worker_init_accepted (s)))
1274     {
1275       session_free (s);
1276       return rv;
1277     }
1278
1279   session_lookup_add_connection (tc, session_handle (s));
1280
1281   /* Shoulder-tap the server */
1282   if (notify)
1283     {
1284       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1285       if ((rv = app_worker_accept_notify (app_wrk, s)))
1286         {
1287           session_lookup_del_session (s);
1288           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1289           session_free (s);
1290           return rv;
1291         }
1292     }
1293
1294   return 0;
1295 }
1296
1297 int
1298 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1299                       u32 thread_index)
1300 {
1301   app_worker_t *app_wrk;
1302   session_t *s;
1303   int rv;
1304
1305   s = session_alloc_for_connection (tc);
1306   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1307
1308   if ((rv = app_worker_init_accepted (s)))
1309     {
1310       session_free (s);
1311       return rv;
1312     }
1313
1314   session_lookup_add_connection (tc, session_handle (s));
1315   session_set_state (s, SESSION_STATE_ACCEPTING);
1316
1317   app_wrk = app_worker_get (s->app_wrk_index);
1318   if ((rv = app_worker_accept_notify (app_wrk, s)))
1319     {
1320       session_lookup_del_session (s);
1321       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1322       session_free (s);
1323       return rv;
1324     }
1325
1326   return 0;
1327 }
1328
1329 int
1330 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1331 {
1332   transport_connection_t *tc;
1333   transport_endpoint_cfg_t *tep;
1334   app_worker_t *app_wrk;
1335   session_handle_t sh;
1336   session_t *s;
1337   int rv;
1338
1339   tep = session_endpoint_to_transport_cfg (rmt);
1340   rv = transport_connect (rmt->transport_proto, tep);
1341   if (rv < 0)
1342     {
1343       SESSION_DBG ("Transport failed to open connection.");
1344       return rv;
1345     }
1346
1347   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1348
1349   /* For dgram type of service, allocate session and fifos now */
1350   app_wrk = app_worker_get (rmt->app_wrk_index);
1351   s = session_alloc_for_connection (tc);
1352   s->app_wrk_index = app_wrk->wrk_index;
1353   s->opaque = rmt->opaque;
1354   session_set_state (s, SESSION_STATE_OPENED);
1355   if (transport_connection_is_cless (tc))
1356     s->flags |= SESSION_F_IS_CLESS;
1357   if (app_worker_init_connected (app_wrk, s))
1358     {
1359       session_free (s);
1360       return -1;
1361     }
1362
1363   sh = session_handle (s);
1364   *rsh = sh;
1365
1366   session_lookup_add_connection (tc, sh);
1367   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1368 }
1369
1370 int
1371 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1372 {
1373   transport_connection_t *tc;
1374   transport_endpoint_cfg_t *tep;
1375   app_worker_t *app_wrk;
1376   session_t *ho;
1377   int rv;
1378
1379   tep = session_endpoint_to_transport_cfg (rmt);
1380   rv = transport_connect (rmt->transport_proto, tep);
1381   if (rv < 0)
1382     {
1383       SESSION_DBG ("Transport failed to open connection.");
1384       return rv;
1385     }
1386
1387   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1388
1389   app_wrk = app_worker_get (rmt->app_wrk_index);
1390
1391   /* If transport offers a vc service, only allocate established
1392    * session once the connection has been established.
1393    * In the meantime allocate half-open session for tracking purposes
1394    * associate half-open connection to it and add session to app-worker
1395    * half-open table. These are needed to allocate the established
1396    * session on transport notification, and to cleanup the half-open
1397    * session if the app detaches before connection establishment.
1398    */
1399   ho = session_alloc_for_half_open (tc);
1400   ho->app_wrk_index = app_wrk->wrk_index;
1401   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1402   ho->opaque = rmt->opaque;
1403   *rsh = session_handle (ho);
1404
1405   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1406     session_lookup_add_half_open (tc, tc->c_index);
1407
1408   return 0;
1409 }
1410
1411 int
1412 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1413 {
1414   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1415
1416   /* Not supported for now */
1417   *rsh = SESSION_INVALID_HANDLE;
1418   return transport_connect (rmt->transport_proto, tep_cfg);
1419 }
1420
1421 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1422                                         session_handle_t *);
1423
1424 /* *INDENT-OFF* */
1425 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1426   session_open_vc,
1427   session_open_cl,
1428   session_open_app,
1429 };
1430 /* *INDENT-ON* */
1431
1432 /**
1433  * Ask transport to open connection to remote transport endpoint.
1434  *
1435  * Stores handle for matching request with reply since the call can be
1436  * asynchronous. For instance, for TCP the 3-way handshake must complete
1437  * before reply comes. Session is only created once connection is established.
1438  *
1439  * @param app_index Index of the application requesting the connect
1440  * @param st Session type requested.
1441  * @param tep Remote transport endpoint
1442  * @param opaque Opaque data (typically, api_context) the application expects
1443  *               on open completion.
1444  */
1445 int
1446 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1447 {
1448   transport_service_type_t tst;
1449   tst = transport_protocol_service_type (rmt->transport_proto);
1450   return session_open_srv_fns[tst](rmt, rsh);
1451 }
1452
1453 /**
1454  * Ask transport to listen on session endpoint.
1455  *
1456  * @param s Session for which listen will be called. Note that unlike
1457  *          established sessions, listen sessions are not associated to a
1458  *          thread.
1459  * @param sep Local endpoint to be listened on.
1460  */
1461 int
1462 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1463 {
1464   transport_endpoint_cfg_t *tep;
1465   int tc_index;
1466   u32 s_index;
1467
1468   /* Transport bind/listen */
1469   tep = session_endpoint_to_transport_cfg (sep);
1470   s_index = ls->session_index;
1471   tc_index = transport_start_listen (session_get_transport_proto (ls),
1472                                      s_index, tep);
1473
1474   if (tc_index < 0)
1475     return tc_index;
1476
1477   /* Attach transport to session. Lookup tables are populated by the app
1478    * worker because local tables (for ct sessions) are not backed by a fib */
1479   ls = listen_session_get (s_index);
1480   ls->connection_index = tc_index;
1481   ls->opaque = sep->opaque;
1482   if (transport_connection_is_cless (session_get_transport (ls)))
1483     ls->flags |= SESSION_F_IS_CLESS;
1484
1485   return 0;
1486 }
1487
1488 /**
1489  * Ask transport to stop listening on local transport endpoint.
1490  *
1491  * @param s Session to stop listening on. It must be in state LISTENING.
1492  */
1493 int
1494 session_stop_listen (session_t * s)
1495 {
1496   transport_proto_t tp = session_get_transport_proto (s);
1497   transport_connection_t *tc;
1498
1499   if (s->session_state != SESSION_STATE_LISTENING)
1500     return SESSION_E_NOLISTEN;
1501
1502   tc = transport_get_listener (tp, s->connection_index);
1503
1504   /* If no transport, assume everything was cleaned up already */
1505   if (!tc)
1506     return SESSION_E_NONE;
1507
1508   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1509     session_lookup_del_connection (tc);
1510
1511   transport_stop_listen (tp, s->connection_index);
1512   return 0;
1513 }
1514
1515 /**
1516  * Initialize session half-closing procedure.
1517  *
1518  * Note that half-closing will not change the state of the session.
1519  */
1520 void
1521 session_half_close (session_t *s)
1522 {
1523   if (!s)
1524     return;
1525
1526   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1527 }
1528
1529 /**
1530  * Initialize session closing procedure.
1531  *
1532  * Request is always sent to session node to ensure that all outstanding
1533  * requests are served before transport is notified.
1534  */
1535 void
1536 session_close (session_t * s)
1537 {
1538   if (!s || (s->flags & SESSION_F_APP_CLOSED))
1539     return;
1540
1541   /* Transports can close and delete their state independent of app closes
1542    * and transport initiated state transitions can hide app closes. Instead
1543    * of extending the state machine to support separate tracking of app and
1544    * transport initiated closes, use a flag. */
1545   s->flags |= SESSION_F_APP_CLOSED;
1546
1547   if (s->session_state >= SESSION_STATE_CLOSING)
1548     {
1549       /* Session will only be removed once both app and transport
1550        * acknowledge the close */
1551       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1552           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1553         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1554       return;
1555     }
1556
1557   /* App closed so stop propagating dequeue notifications.
1558    * App might disconnect session before connected, in this case,
1559    * tx_fifo may not be setup yet, so clear only it's inited. */
1560   if (s->tx_fifo)
1561     svm_fifo_clear_deq_ntf (s->tx_fifo);
1562   session_set_state (s, SESSION_STATE_CLOSING);
1563   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1564 }
1565
1566 /**
1567  * Force a close without waiting for data to be flushed
1568  */
1569 void
1570 session_reset (session_t * s)
1571 {
1572   if (s->session_state >= SESSION_STATE_CLOSING)
1573     return;
1574   /* Drop all outstanding tx data
1575    * App might disconnect session before connected, in this case,
1576    * tx_fifo may not be setup yet, so clear only it's inited. */
1577   if (s->tx_fifo)
1578     svm_fifo_dequeue_drop_all (s->tx_fifo);
1579   session_set_state (s, SESSION_STATE_CLOSING);
1580   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1581 }
1582
1583 /**
1584  * Notify transport the session can be half-disconnected.
1585  *
1586  * Must be called from the session's thread.
1587  */
1588 void
1589 session_transport_half_close (session_t *s)
1590 {
1591   /* Only READY session can be half-closed */
1592   if (s->session_state != SESSION_STATE_READY)
1593     {
1594       return;
1595     }
1596
1597   transport_half_close (session_get_transport_proto (s), s->connection_index,
1598                         s->thread_index);
1599 }
1600
1601 /**
1602  * Notify transport the session can be disconnected. This should eventually
1603  * result in a delete notification that allows us to cleanup session state.
1604  * Called for both active/passive disconnects.
1605  *
1606  * Must be called from the session's thread.
1607  */
1608 void
1609 session_transport_close (session_t * s)
1610 {
1611   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1612     {
1613       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1614         session_set_state (s, SESSION_STATE_CLOSED);
1615       /* If transport is already deleted, just free the session */
1616       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1617         session_program_cleanup (s);
1618       return;
1619     }
1620
1621   /* If the tx queue wasn't drained, the transport can continue to try
1622    * sending the outstanding data (in closed state it cannot). It MUST however
1623    * at one point, either after sending everything or after a timeout, call
1624    * delete notify. This will finally lead to the complete cleanup of the
1625    * session.
1626    */
1627   session_set_state (s, SESSION_STATE_APP_CLOSED);
1628
1629   transport_close (session_get_transport_proto (s), s->connection_index,
1630                    s->thread_index);
1631 }
1632
1633 /**
1634  * Force transport close
1635  */
1636 void
1637 session_transport_reset (session_t * s)
1638 {
1639   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1640     {
1641       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1642         session_set_state (s, SESSION_STATE_CLOSED);
1643       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1644         session_program_cleanup (s);
1645       return;
1646     }
1647
1648   session_set_state (s, SESSION_STATE_APP_CLOSED);
1649   transport_reset (session_get_transport_proto (s), s->connection_index,
1650                    s->thread_index);
1651 }
1652
1653 /**
1654  * Cleanup transport and session state.
1655  *
1656  * Notify transport of the cleanup and free the session. This should
1657  * be called only if transport reported some error and is already
1658  * closed.
1659  */
1660 void
1661 session_transport_cleanup (session_t * s)
1662 {
1663   /* Delete from main lookup table before we axe the the transport */
1664   session_lookup_del_session (s);
1665   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1666     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1667                        s->thread_index);
1668   /* Since we called cleanup, no delete notification will come. So, make
1669    * sure the session is properly freed. */
1670   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1671   session_free (s);
1672 }
1673
1674 /**
1675  * Allocate worker mqs in share-able segment
1676  *
1677  * That can only be a newly created memfd segment, that must be mapped
1678  * by all apps/stack users unless private rx mqs are enabled.
1679  */
1680 void
1681 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1682 {
1683   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1684   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1685   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1686   uword mqs_seg_size;
1687   int i;
1688
1689   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1690
1691   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1692     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1693   };
1694   cfg->consumer_pid = 0;
1695   cfg->n_rings = 2;
1696   cfg->q_nitems = mq_q_length;
1697   cfg->ring_cfgs = rc;
1698
1699   /*
1700    * Compute mqs segment size based on rings config and leave space
1701    * for passing extended configuration messages, i.e., data allocated
1702    * outside of the rings. If provided with a config value, accept it
1703    * if larger than minimum size.
1704    */
1705   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1706   mqs_seg_size = mqs_seg_size + (1 << 20);
1707   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1708
1709   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1710   mqs_seg->ssvm.my_pid = getpid ();
1711   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1712
1713   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1714     {
1715       clib_warning ("failed to initialize queue segment");
1716       return;
1717     }
1718
1719   fifo_segment_init (mqs_seg);
1720
1721   /* Special fifo segment that's filled only with mqs */
1722   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1723
1724   for (i = 0; i < vec_len (smm->wrk); i++)
1725     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1726 }
1727
1728 fifo_segment_t *
1729 session_main_get_wrk_mqs_segment (void)
1730 {
1731   return &session_main.wrk_mqs_segment;
1732 }
1733
1734 u64
1735 session_segment_handle (session_t * s)
1736 {
1737   svm_fifo_t *f;
1738
1739   if (!s->rx_fifo)
1740     return SESSION_INVALID_HANDLE;
1741
1742   f = s->rx_fifo;
1743   return segment_manager_make_segment_handle (f->segment_manager,
1744                                               f->segment_index);
1745 }
1746
1747 void
1748 session_get_original_dst (transport_endpoint_t *i2o_src,
1749                           transport_endpoint_t *i2o_dst,
1750                           transport_proto_t transport_proto, u32 *original_dst,
1751                           u16 *original_dst_port)
1752 {
1753   session_main_t *smm = vnet_get_session_main ();
1754   ip_protocol_t proto =
1755     (transport_proto == TRANSPORT_PROTO_TCP ? IPPROTO_TCP : IPPROTO_UDP);
1756   if (!smm->original_dst_lookup || !i2o_dst->is_ip4)
1757     return;
1758   smm->original_dst_lookup (&i2o_src->ip.ip4, i2o_src->port, &i2o_dst->ip.ip4,
1759                             i2o_dst->port, proto, original_dst,
1760                             original_dst_port);
1761 }
1762
1763 /* *INDENT-OFF* */
1764 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1765     session_tx_fifo_peek_and_snd,
1766     session_tx_fifo_dequeue_and_snd,
1767     session_tx_fifo_dequeue_internal,
1768     session_tx_fifo_dequeue_and_snd
1769 };
1770 /* *INDENT-ON* */
1771
1772 void
1773 session_register_transport (transport_proto_t transport_proto,
1774                             const transport_proto_vft_t * vft, u8 is_ip4,
1775                             u32 output_node)
1776 {
1777   session_main_t *smm = &session_main;
1778   session_type_t session_type;
1779   u32 next_index = ~0;
1780
1781   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1782
1783   vec_validate (smm->session_type_to_next, session_type);
1784   vec_validate (smm->session_tx_fns, session_type);
1785
1786   if (output_node != ~0)
1787     next_index = vlib_node_add_next (vlib_get_main (),
1788                                      session_queue_node.index, output_node);
1789
1790   smm->session_type_to_next[session_type] = next_index;
1791   smm->session_tx_fns[session_type] =
1792     session_tx_fns[vft->transport_options.tx_type];
1793 }
1794
1795 void
1796 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1797 {
1798   session_main_t *smm = &session_main;
1799   session_update_time_fn *fi;
1800   u32 fi_pos = ~0;
1801   u8 found = 0;
1802
1803   vec_foreach (fi, smm->update_time_fns)
1804     {
1805       if (*fi == fn)
1806         {
1807           fi_pos = fi - smm->update_time_fns;
1808           found = 1;
1809           break;
1810         }
1811     }
1812
1813   if (is_add)
1814     {
1815       if (found)
1816         {
1817           clib_warning ("update time fn %p already registered", fn);
1818           return;
1819         }
1820       vec_add1 (smm->update_time_fns, fn);
1821     }
1822   else
1823     {
1824       vec_del1 (smm->update_time_fns, fi_pos);
1825     }
1826 }
1827
1828 transport_proto_t
1829 session_add_transport_proto (void)
1830 {
1831   session_main_t *smm = &session_main;
1832   session_worker_t *wrk;
1833   u32 thread;
1834
1835   smm->last_transport_proto_type += 1;
1836
1837   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1838     {
1839       wrk = session_main_get_worker (thread);
1840       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1841     }
1842
1843   return smm->last_transport_proto_type;
1844 }
1845
1846 transport_connection_t *
1847 session_get_transport (session_t * s)
1848 {
1849   if (s->session_state != SESSION_STATE_LISTENING)
1850     return transport_get_connection (session_get_transport_proto (s),
1851                                      s->connection_index, s->thread_index);
1852   else
1853     return transport_get_listener (session_get_transport_proto (s),
1854                                    s->connection_index);
1855 }
1856
1857 void
1858 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1859 {
1860   if (s->session_state != SESSION_STATE_LISTENING)
1861     return transport_get_endpoint (session_get_transport_proto (s),
1862                                    s->connection_index, s->thread_index, tep,
1863                                    is_lcl);
1864   else
1865     return transport_get_listener_endpoint (session_get_transport_proto (s),
1866                                             s->connection_index, tep, is_lcl);
1867 }
1868
1869 int
1870 session_transport_attribute (session_t *s, u8 is_get,
1871                              transport_endpt_attr_t *attr)
1872 {
1873   if (s->session_state < SESSION_STATE_READY)
1874     return -1;
1875
1876   return transport_connection_attribute (session_get_transport_proto (s),
1877                                          s->connection_index, s->thread_index,
1878                                          is_get, attr);
1879 }
1880
1881 transport_connection_t *
1882 listen_session_get_transport (session_t * s)
1883 {
1884   return transport_get_listener (session_get_transport_proto (s),
1885                                  s->connection_index);
1886 }
1887
1888 void
1889 session_queue_run_on_main_thread (vlib_main_t * vm)
1890 {
1891   ASSERT (vlib_get_thread_index () == 0);
1892   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1893 }
1894
1895 static void
1896 session_stats_collector_fn (vlib_stats_collector_data_t *d)
1897 {
1898   u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1899   session_main_t *smm = &session_main;
1900   session_worker_t *wrk;
1901   counter_t **counters;
1902   counter_t *cb;
1903
1904   n_workers = vec_len (smm->wrk);
1905   vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1906   counters = d->entry->data;
1907   cb = counters[0];
1908
1909   for (i = 0; i < vec_len (smm->wrk); i++)
1910     {
1911       wrk = session_main_get_worker (i);
1912       n_wrk_sessions = pool_elts (wrk->sessions);
1913       cb[i] = n_wrk_sessions;
1914       n_sessions += n_wrk_sessions;
1915     }
1916
1917   vlib_stats_set_gauge (d->private_data, n_sessions);
1918 }
1919
1920 static void
1921 session_stats_collector_init (void)
1922 {
1923   vlib_stats_collector_reg_t reg = {};
1924
1925   reg.entry_index =
1926     vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1927   reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1928   reg.collect_fn = session_stats_collector_fn;
1929   vlib_stats_register_collector_fn (&reg);
1930   vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1931 }
1932
1933 static clib_error_t *
1934 session_manager_main_enable (vlib_main_t * vm)
1935 {
1936   session_main_t *smm = &session_main;
1937   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1938   u32 num_threads, preallocated_sessions_per_worker;
1939   session_worker_t *wrk;
1940   int i;
1941
1942   /* We only initialize once and do not de-initialized on disable */
1943   if (smm->is_initialized)
1944     goto done;
1945
1946   num_threads = 1 /* main thread */  + vtm->n_threads;
1947
1948   if (num_threads < 1)
1949     return clib_error_return (0, "n_thread_stacks not set");
1950
1951   /* Allocate cache line aligned worker contexts */
1952   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1953   clib_spinlock_init (&session_main.pool_realloc_lock);
1954
1955   for (i = 0; i < num_threads; i++)
1956     {
1957       wrk = &smm->wrk[i];
1958       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1959       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1960       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1961       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1962       wrk->evts_pending_main =
1963         clib_llist_make_head (wrk->event_elts, evt_list);
1964       wrk->vm = vlib_get_main_by_index (i);
1965       wrk->last_vlib_time = vlib_time_now (vm);
1966       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1967       wrk->timerfd = -1;
1968       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1969
1970       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1971         session_wrk_enable_adaptive_mode (wrk);
1972     }
1973
1974   /* Allocate vpp event queues segment and queue */
1975   session_vpp_wrk_mqs_alloc (smm);
1976
1977   /* Initialize segment manager properties */
1978   segment_manager_main_init ();
1979
1980   /* Preallocate sessions */
1981   if (smm->preallocated_sessions)
1982     {
1983       if (num_threads == 1)
1984         {
1985           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1986         }
1987       else
1988         {
1989           int j;
1990           preallocated_sessions_per_worker =
1991             (1.1 * (f64) smm->preallocated_sessions /
1992              (f64) (num_threads - 1));
1993
1994           for (j = 1; j < num_threads; j++)
1995             {
1996               pool_init_fixed (smm->wrk[j].sessions,
1997                                preallocated_sessions_per_worker);
1998             }
1999         }
2000     }
2001
2002   session_lookup_init ();
2003   app_namespaces_init ();
2004   transport_init ();
2005   session_stats_collector_init ();
2006   smm->is_initialized = 1;
2007
2008 done:
2009
2010   smm->is_enabled = 1;
2011
2012   /* Enable transports */
2013   transport_enable_disable (vm, 1);
2014   session_debug_init ();
2015
2016   return 0;
2017 }
2018
2019 static void
2020 session_manager_main_disable (vlib_main_t * vm)
2021 {
2022   transport_enable_disable (vm, 0 /* is_en */ );
2023 }
2024
2025 /* in this new callback, cookie hint the index */
2026 void
2027 session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
2028 {
2029   session_worker_t *wrk;
2030   wrk = session_main_get_worker (vm->thread_index);
2031   session_dma_transfer *dma_transfer;
2032
2033   dma_transfer = &wrk->dma_trans[wrk->trans_head];
2034   vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2035            vec_len (dma_transfer->pending_tx_buffers));
2036   vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2037            vec_len (dma_transfer->pending_tx_nexts));
2038   vec_reset_length (dma_transfer->pending_tx_buffers);
2039   vec_reset_length (dma_transfer->pending_tx_nexts);
2040   wrk->trans_head++;
2041   if (wrk->trans_head == wrk->trans_size)
2042     wrk->trans_head = 0;
2043   return;
2044 }
2045
2046 static void
2047 session_prepare_dma_args (vlib_dma_config_t *args)
2048 {
2049   args->max_batches = 16;
2050   args->max_transfers = DMA_TRANS_SIZE;
2051   args->max_transfer_size = 65536;
2052   args->features = 0;
2053   args->sw_fallback = 1;
2054   args->barrier_before_last = 1;
2055   args->callback_fn = session_dma_completion_cb;
2056 }
2057
2058 static void
2059 session_node_enable_dma (u8 is_en, int n_vlibs)
2060 {
2061   vlib_dma_config_t args;
2062   session_prepare_dma_args (&args);
2063   session_worker_t *wrk;
2064   vlib_main_t *vm;
2065
2066   int config_index = -1;
2067
2068   if (is_en)
2069     {
2070       vm = vlib_get_main_by_index (0);
2071       config_index = vlib_dma_config_add (vm, &args);
2072     }
2073   else
2074     {
2075       vm = vlib_get_main_by_index (0);
2076       wrk = session_main_get_worker (0);
2077       if (wrk->config_index >= 0)
2078         vlib_dma_config_del (vm, wrk->config_index);
2079     }
2080   int i;
2081   for (i = 0; i < n_vlibs; i++)
2082     {
2083       vm = vlib_get_main_by_index (i);
2084       wrk = session_main_get_worker (vm->thread_index);
2085       wrk->config_index = config_index;
2086       if (is_en)
2087         {
2088           if (config_index >= 0)
2089             wrk->dma_enabled = true;
2090           wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2091             sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2092           bzero (wrk->dma_trans,
2093                  sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2094         }
2095       else
2096         {
2097           if (wrk->dma_trans)
2098             clib_mem_free (wrk->dma_trans);
2099         }
2100       wrk->trans_head = 0;
2101       wrk->trans_tail = 0;
2102       wrk->trans_size = DMA_TRANS_SIZE;
2103     }
2104 }
2105
2106 void
2107 session_node_enable_disable (u8 is_en)
2108 {
2109   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2110   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2111   session_main_t *sm = &session_main;
2112   vlib_main_t *vm;
2113   vlib_node_t *n;
2114   int n_vlibs, i;
2115
2116   n_vlibs = vlib_get_n_threads ();
2117   for (i = 0; i < n_vlibs; i++)
2118     {
2119       vm = vlib_get_main_by_index (i);
2120       /* main thread with workers and not polling */
2121       if (i == 0 && n_vlibs > 1)
2122         {
2123           vlib_node_set_state (vm, session_queue_node.index, mstate);
2124           if (is_en)
2125             {
2126               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2127               vlib_node_set_state (vm, session_queue_process_node.index,
2128                                    state);
2129               n = vlib_get_node (vm, session_queue_process_node.index);
2130               vlib_start_process (vm, n->runtime_index);
2131             }
2132           else
2133             {
2134               vlib_process_signal_event_mt (vm,
2135                                             session_queue_process_node.index,
2136                                             SESSION_Q_PROCESS_STOP, 0);
2137             }
2138           if (!sm->poll_main)
2139             continue;
2140         }
2141       vlib_node_set_state (vm, session_input_node.index, mstate);
2142       vlib_node_set_state (vm, session_queue_node.index, state);
2143     }
2144
2145   if (sm->use_private_rx_mqs)
2146     application_enable_rx_mqs_nodes (is_en);
2147
2148   if (sm->dma_enabled)
2149     session_node_enable_dma (is_en, n_vlibs);
2150 }
2151
2152 clib_error_t *
2153 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2154 {
2155   clib_error_t *error = 0;
2156   if (is_en)
2157     {
2158       if (session_main.is_enabled)
2159         return 0;
2160
2161       error = session_manager_main_enable (vm);
2162       session_node_enable_disable (is_en);
2163     }
2164   else
2165     {
2166       session_main.is_enabled = 0;
2167       session_manager_main_disable (vm);
2168       session_node_enable_disable (is_en);
2169     }
2170
2171   return error;
2172 }
2173
2174 clib_error_t *
2175 session_main_init (vlib_main_t * vm)
2176 {
2177   session_main_t *smm = &session_main;
2178
2179   smm->is_enabled = 0;
2180   smm->session_enable_asap = 0;
2181   smm->poll_main = 0;
2182   smm->use_private_rx_mqs = 0;
2183   smm->no_adaptive = 0;
2184   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2185   smm->port_allocator_min_src_port = 1024;
2186   smm->port_allocator_max_src_port = 65535;
2187
2188   return 0;
2189 }
2190
2191 static clib_error_t *
2192 session_main_loop_init (vlib_main_t * vm)
2193 {
2194   session_main_t *smm = &session_main;
2195   if (smm->session_enable_asap)
2196     {
2197       vlib_worker_thread_barrier_sync (vm);
2198       vnet_session_enable_disable (vm, 1 /* is_en */ );
2199       vlib_worker_thread_barrier_release (vm);
2200     }
2201   return 0;
2202 }
2203
2204 VLIB_INIT_FUNCTION (session_main_init);
2205 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2206
2207 static clib_error_t *
2208 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2209 {
2210   session_main_t *smm = &session_main;
2211   u32 nitems;
2212   uword tmp;
2213
2214   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2215     {
2216       if (unformat (input, "wrk-mq-length %d", &nitems))
2217         {
2218           if (nitems >= 2048)
2219             smm->configured_wrk_mq_length = nitems;
2220           else
2221             clib_warning ("event queue length %d too small, ignored", nitems);
2222         }
2223       else if (unformat (input, "wrk-mqs-segment-size %U",
2224                          unformat_memory_size, &smm->wrk_mqs_segment_size))
2225         ;
2226       else if (unformat (input, "preallocated-sessions %d",
2227                          &smm->preallocated_sessions))
2228         ;
2229       else if (unformat (input, "v4-session-table-buckets %d",
2230                          &smm->configured_v4_session_table_buckets))
2231         ;
2232       else if (unformat (input, "v4-halfopen-table-buckets %d",
2233                          &smm->configured_v4_halfopen_table_buckets))
2234         ;
2235       else if (unformat (input, "v6-session-table-buckets %d",
2236                          &smm->configured_v6_session_table_buckets))
2237         ;
2238       else if (unformat (input, "v6-halfopen-table-buckets %d",
2239                          &smm->configured_v6_halfopen_table_buckets))
2240         ;
2241       else if (unformat (input, "v4-session-table-memory %U",
2242                          unformat_memory_size, &tmp))
2243         {
2244           if (tmp >= 0x100000000)
2245             return clib_error_return (0, "memory size %llx (%lld) too large",
2246                                       tmp, tmp);
2247           smm->configured_v4_session_table_memory = tmp;
2248         }
2249       else if (unformat (input, "v4-halfopen-table-memory %U",
2250                          unformat_memory_size, &tmp))
2251         {
2252           if (tmp >= 0x100000000)
2253             return clib_error_return (0, "memory size %llx (%lld) too large",
2254                                       tmp, tmp);
2255           smm->configured_v4_halfopen_table_memory = tmp;
2256         }
2257       else if (unformat (input, "v6-session-table-memory %U",
2258                          unformat_memory_size, &tmp))
2259         {
2260           if (tmp >= 0x100000000)
2261             return clib_error_return (0, "memory size %llx (%lld) too large",
2262                                       tmp, tmp);
2263           smm->configured_v6_session_table_memory = tmp;
2264         }
2265       else if (unformat (input, "v6-halfopen-table-memory %U",
2266                          unformat_memory_size, &tmp))
2267         {
2268           if (tmp >= 0x100000000)
2269             return clib_error_return (0, "memory size %llx (%lld) too large",
2270                                       tmp, tmp);
2271           smm->configured_v6_halfopen_table_memory = tmp;
2272         }
2273       else if (unformat (input, "local-endpoints-table-memory %U",
2274                          unformat_memory_size, &tmp))
2275         {
2276           if (tmp >= 0x100000000)
2277             return clib_error_return (0, "memory size %llx (%lld) too large",
2278                                       tmp, tmp);
2279           smm->local_endpoints_table_memory = tmp;
2280         }
2281       else if (unformat (input, "local-endpoints-table-buckets %d",
2282                          &smm->local_endpoints_table_buckets))
2283         ;
2284       else if (unformat (input, "min-src-port %d", &tmp))
2285         smm->port_allocator_min_src_port = tmp;
2286       else if (unformat (input, "max-src-port %d", &tmp))
2287         smm->port_allocator_max_src_port = tmp;
2288       else if (unformat (input, "enable"))
2289         smm->session_enable_asap = 1;
2290       else if (unformat (input, "use-app-socket-api"))
2291         (void) appns_sapi_enable_disable (1 /* is_enable */);
2292       else if (unformat (input, "poll-main"))
2293         smm->poll_main = 1;
2294       else if (unformat (input, "use-private-rx-mqs"))
2295         smm->use_private_rx_mqs = 1;
2296       else if (unformat (input, "no-adaptive"))
2297         smm->no_adaptive = 1;
2298       else if (unformat (input, "use-dma"))
2299         smm->dma_enabled = 1;
2300       else if (unformat (input, "nat44-original-dst-enable"))
2301         {
2302           smm->original_dst_lookup = vlib_get_plugin_symbol (
2303             "nat_plugin.so", "nat44_original_dst_lookup");
2304         }
2305       /*
2306        * Deprecated but maintained for compatibility
2307        */
2308       else if (unformat (input, "evt_qs_memfd_seg"))
2309         ;
2310       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2311         ;
2312       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2313                          &smm->wrk_mqs_segment_size))
2314         ;
2315       else if (unformat (input, "event-queue-length %d", &nitems))
2316         {
2317           if (nitems >= 2048)
2318             smm->configured_wrk_mq_length = nitems;
2319           else
2320             clib_warning ("event queue length %d too small, ignored", nitems);
2321         }
2322       else
2323         return clib_error_return (0, "unknown input `%U'",
2324                                   format_unformat_error, input);
2325     }
2326   return 0;
2327 }
2328
2329 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2330
2331 /*
2332  * fd.io coding-style-patch-verification: ON
2333  *
2334  * Local Variables:
2335  * eval: (c-set-style "gnu")
2336  * End:
2337  */