54d9b22c838ad59acb1ae371026355998b11c558
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24 #include <vlib/stats/stats.h>
25 #include <vlib/dma/dma.h>
26
27 session_main_t session_main;
28
29 static inline int
30 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
31                             session_evt_type_t evt_type)
32 {
33   session_worker_t *wrk = session_main_get_worker (thread_index);
34   session_event_t *evt;
35   svm_msg_q_msg_t msg;
36   svm_msg_q_t *mq;
37
38   mq = wrk->vpp_event_queue;
39   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
40     return -1;
41   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
42     {
43       svm_msg_q_unlock (mq);
44       return -2;
45     }
46   switch (evt_type)
47     {
48     case SESSION_CTRL_EVT_RPC:
49       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
50       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
51       evt->rpc_args.fp = data;
52       evt->rpc_args.arg = args;
53       break;
54     case SESSION_IO_EVT_RX:
55     case SESSION_IO_EVT_TX:
56     case SESSION_IO_EVT_TX_FLUSH:
57     case SESSION_IO_EVT_BUILTIN_RX:
58       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
59       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
60       evt->session_index = *(u32 *) data;
61       break;
62     case SESSION_IO_EVT_BUILTIN_TX:
63     case SESSION_CTRL_EVT_CLOSE:
64     case SESSION_CTRL_EVT_RESET:
65       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
66       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
67       evt->session_handle = session_handle ((session_t *) data);
68       break;
69     default:
70       clib_warning ("evt unhandled!");
71       svm_msg_q_unlock (mq);
72       return -1;
73     }
74   evt->event_type = evt_type;
75
76   svm_msg_q_add_and_unlock (mq, &msg);
77
78   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
79     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
80
81   return 0;
82 }
83
84 int
85 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
86 {
87   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
88                                      f->master_thread_index, evt_type);
89 }
90
91 int
92 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
93                                       session_evt_type_t evt_type)
94 {
95   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
96 }
97
98 int
99 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
100 {
101   /* only events supported are disconnect, shutdown and reset */
102   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
104           evt_type == SESSION_CTRL_EVT_RESET);
105   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 void
129 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
130 {
131   session_t *s = session_get (tc->s_index, tc->thread_index);
132
133   ASSERT (s->thread_index == vlib_get_thread_index ());
134   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
135
136   if (!(s->flags & SESSION_F_CUSTOM_TX))
137     {
138       s->flags |= SESSION_F_CUSTOM_TX;
139       if (svm_fifo_set_event (s->tx_fifo)
140           || transport_connection_is_descheduled (tc))
141         {
142           session_evt_elt_t *elt;
143           session_worker_t *wrk;
144
145           wrk = session_main_get_worker (tc->thread_index);
146           if (has_prio)
147             elt = session_evt_alloc_new (wrk);
148           else
149             elt = session_evt_alloc_old (wrk);
150           elt->evt.session_index = tc->s_index;
151           elt->evt.event_type = SESSION_IO_EVT_TX;
152           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
153
154           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
155             vlib_node_set_interrupt_pending (wrk->vm,
156                                              session_queue_node.index);
157         }
158     }
159 }
160
161 void
162 sesssion_reschedule_tx (transport_connection_t * tc)
163 {
164   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
165   session_evt_elt_t *elt;
166
167   ASSERT (tc->thread_index == vlib_get_thread_index ());
168
169   elt = session_evt_alloc_new (wrk);
170   elt->evt.session_index = tc->s_index;
171   elt->evt.event_type = SESSION_IO_EVT_TX;
172
173   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
174     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
175 }
176
177 static void
178 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
179 {
180   u32 thread_index = vlib_get_thread_index ();
181   session_evt_elt_t *elt;
182   session_worker_t *wrk;
183
184   /* If we are in the handler thread, or being called with the worker barrier
185    * held, just append a new event to pending disconnects vector. */
186   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
187     {
188       wrk = session_main_get_worker (s->thread_index);
189       elt = session_evt_alloc_ctrl (wrk);
190       clib_memset (&elt->evt, 0, sizeof (session_event_t));
191       elt->evt.session_handle = session_handle (s);
192       elt->evt.event_type = evt;
193
194       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
195         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
196     }
197   else
198     session_send_ctrl_evt_to_thread (s, evt);
199 }
200
201 session_t *
202 session_alloc (u32 thread_index)
203 {
204   session_worker_t *wrk = &session_main.wrk[thread_index];
205   session_t *s;
206
207   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
208   clib_memset (s, 0, sizeof (*s));
209   s->session_index = s - wrk->sessions;
210   s->thread_index = thread_index;
211   s->app_index = APP_INVALID_INDEX;
212
213   return s;
214 }
215
216 void
217 session_free (session_t * s)
218 {
219   if (CLIB_DEBUG)
220     {
221       u8 thread_index = s->thread_index;
222       clib_memset (s, 0xFA, sizeof (*s));
223       pool_put (session_main.wrk[thread_index].sessions, s);
224       return;
225     }
226   SESSION_EVT (SESSION_EVT_FREE, s);
227   pool_put (session_main.wrk[s->thread_index].sessions, s);
228 }
229
230 u8
231 session_is_valid (u32 si, u8 thread_index)
232 {
233   session_t *s;
234   transport_connection_t *tc;
235
236   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
237
238   if (s->thread_index != thread_index || s->session_index != si)
239     return 0;
240
241   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
242       || s->session_state <= SESSION_STATE_LISTENING)
243     return 1;
244
245   if (s->session_state == SESSION_STATE_CONNECTING &&
246       (s->flags & SESSION_F_HALF_OPEN))
247     return 1;
248
249   tc = session_get_transport (s);
250   if (s->connection_index != tc->c_index
251       || s->thread_index != tc->thread_index || tc->s_index != si)
252     return 0;
253
254   return 1;
255 }
256
257 static void
258 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
259 {
260   app_worker_t *app_wrk;
261
262   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
263   if (!app_wrk)
264     return;
265   app_worker_cleanup_notify (app_wrk, s, ntf);
266 }
267
268 void
269 session_free_w_fifos (session_t * s)
270 {
271   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
272   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
273   session_free (s);
274 }
275
276 /**
277  * Cleans up session and lookup table.
278  *
279  * Transport connection must still be valid.
280  */
281 static void
282 session_delete (session_t * s)
283 {
284   int rv;
285
286   /* Delete from the main lookup table. */
287   if ((rv = session_lookup_del_session (s)))
288     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
289
290   session_free_w_fifos (s);
291 }
292
293 void
294 session_cleanup_half_open (session_handle_t ho_handle)
295 {
296   session_t *ho = session_get_from_handle (ho_handle);
297
298   /* App transports can migrate their half-opens */
299   if (ho->flags & SESSION_F_IS_MIGRATING)
300     {
301       /* Session still migrating, move to closed state to signal that the
302        * session should be removed. */
303       if (ho->connection_index == ~0)
304         {
305           ho->session_state = SESSION_STATE_CLOSED;
306           return;
307         }
308       /* Migrated transports are no longer half-opens */
309       transport_cleanup (session_get_transport_proto (ho),
310                          ho->connection_index, ho->app_index /* overloaded */);
311     }
312   else
313     transport_cleanup_half_open (session_get_transport_proto (ho),
314                                  ho->connection_index);
315   session_free (ho);
316 }
317
318 static void
319 session_half_open_free (session_t *ho)
320 {
321   app_worker_t *app_wrk;
322
323   ASSERT (vlib_get_thread_index () <= 1);
324   app_wrk = app_worker_get (ho->app_wrk_index);
325   app_worker_del_half_open (app_wrk, ho);
326   session_free (ho);
327 }
328
329 static void
330 session_half_open_free_rpc (void *args)
331 {
332   session_t *ho = ho_session_get (pointer_to_uword (args));
333   session_half_open_free (ho);
334 }
335
336 void
337 session_half_open_delete_notify (transport_connection_t *tc)
338 {
339   /* Notification from ctrl thread accepted without rpc */
340   if (!tc->thread_index)
341     {
342       session_half_open_free (ho_session_get (tc->s_index));
343     }
344   else
345     {
346       void *args = uword_to_pointer ((uword) tc->s_index, void *);
347       session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
348                                             args);
349     }
350 }
351
352 void
353 session_half_open_migrate_notify (transport_connection_t *tc)
354 {
355   session_t *ho;
356
357   ho = ho_session_get (tc->s_index);
358   ho->flags |= SESSION_F_IS_MIGRATING;
359   ho->connection_index = ~0;
360 }
361
362 int
363 session_half_open_migrated_notify (transport_connection_t *tc)
364 {
365   session_t *ho;
366
367   ho = ho_session_get (tc->s_index);
368
369   /* App probably detached so the half-open must be cleaned up */
370   if (ho->session_state == SESSION_STATE_CLOSED)
371     {
372       session_half_open_delete_notify (tc);
373       return -1;
374     }
375   ho->connection_index = tc->c_index;
376   /* Overload app index for half-open with new thread */
377   ho->app_index = tc->thread_index;
378   return 0;
379 }
380
381 session_t *
382 session_alloc_for_connection (transport_connection_t * tc)
383 {
384   session_t *s;
385   u32 thread_index = tc->thread_index;
386
387   ASSERT (thread_index == vlib_get_thread_index ()
388           || transport_protocol_is_cl (tc->proto));
389
390   s = session_alloc (thread_index);
391   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
392   s->session_state = SESSION_STATE_CLOSED;
393
394   /* Attach transport to session and vice versa */
395   s->connection_index = tc->c_index;
396   tc->s_index = s->session_index;
397   return s;
398 }
399
400 session_t *
401 session_alloc_for_half_open (transport_connection_t *tc)
402 {
403   session_t *s;
404
405   s = ho_session_alloc ();
406   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
407   s->connection_index = tc->c_index;
408   tc->s_index = s->session_index;
409   return s;
410 }
411
412 /**
413  * Discards bytes from buffer chain
414  *
415  * It discards n_bytes_to_drop starting at first buffer after chain_b
416  */
417 always_inline void
418 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
419                                      vlib_buffer_t ** chain_b,
420                                      u32 n_bytes_to_drop)
421 {
422   vlib_buffer_t *next = *chain_b;
423   u32 to_drop = n_bytes_to_drop;
424   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
425   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
426     {
427       next = vlib_get_buffer (vm, next->next_buffer);
428       if (next->current_length > to_drop)
429         {
430           vlib_buffer_advance (next, to_drop);
431           to_drop = 0;
432         }
433       else
434         {
435           to_drop -= next->current_length;
436           next->current_length = 0;
437         }
438     }
439   *chain_b = next;
440
441   if (to_drop == 0)
442     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
443 }
444
445 /**
446  * Enqueue buffer chain tail
447  */
448 always_inline int
449 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
450                             u32 offset, u8 is_in_order)
451 {
452   vlib_buffer_t *chain_b;
453   u32 chain_bi, len, diff;
454   vlib_main_t *vm = vlib_get_main ();
455   u8 *data;
456   u32 written = 0;
457   int rv = 0;
458
459   if (is_in_order && offset)
460     {
461       diff = offset - b->current_length;
462       if (diff > b->total_length_not_including_first_buffer)
463         return 0;
464       chain_b = b;
465       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
466       chain_bi = vlib_get_buffer_index (vm, chain_b);
467     }
468   else
469     chain_bi = b->next_buffer;
470
471   do
472     {
473       chain_b = vlib_get_buffer (vm, chain_bi);
474       data = vlib_buffer_get_current (chain_b);
475       len = chain_b->current_length;
476       if (!len)
477         continue;
478       if (is_in_order)
479         {
480           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
481           if (rv == len)
482             {
483               written += rv;
484             }
485           else if (rv < len)
486             {
487               return (rv > 0) ? (written + rv) : written;
488             }
489           else if (rv > len)
490             {
491               written += rv;
492
493               /* written more than what was left in chain */
494               if (written > b->total_length_not_including_first_buffer)
495                 return written;
496
497               /* drop the bytes that have already been delivered */
498               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
499             }
500         }
501       else
502         {
503           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
504           if (rv)
505             {
506               clib_warning ("failed to enqueue multi-buffer seg");
507               return -1;
508             }
509           offset += len;
510         }
511     }
512   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
513           ? chain_b->next_buffer : 0));
514
515   if (is_in_order)
516     return written;
517
518   return 0;
519 }
520
521 void
522 session_fifo_tuning (session_t * s, svm_fifo_t * f,
523                      session_ft_action_t act, u32 len)
524 {
525   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
526     {
527       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
528       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
529       if (CLIB_ASSERT_ENABLE)
530         {
531           segment_manager_t *sm;
532           sm = segment_manager_get (f->segment_manager);
533           ASSERT (f->shr->size >= 4096);
534           ASSERT (f->shr->size <= sm->max_fifo_size);
535         }
536     }
537 }
538
539 /*
540  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
541  * event but on request can queue notification events for later delivery by
542  * calling stream_server_flush_enqueue_events().
543  *
544  * @param tc Transport connection which is to be enqueued data
545  * @param b Buffer to be enqueued
546  * @param offset Offset at which to start enqueueing if out-of-order
547  * @param queue_event Flag to indicate if peer is to be notified or if event
548  *                    is to be queued. The former is useful when more data is
549  *                    enqueued and only one event is to be generated.
550  * @param is_in_order Flag to indicate if data is in order
551  * @return Number of bytes enqueued or a negative value if enqueueing failed.
552  */
553 int
554 session_enqueue_stream_connection (transport_connection_t * tc,
555                                    vlib_buffer_t * b, u32 offset,
556                                    u8 queue_event, u8 is_in_order)
557 {
558   session_t *s;
559   int enqueued = 0, rv, in_order_off;
560
561   s = session_get (tc->s_index, tc->thread_index);
562
563   if (is_in_order)
564     {
565       enqueued = svm_fifo_enqueue (s->rx_fifo,
566                                    b->current_length,
567                                    vlib_buffer_get_current (b));
568       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
569                          && enqueued >= 0))
570         {
571           in_order_off = enqueued > b->current_length ? enqueued : 0;
572           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
573           if (rv > 0)
574             enqueued += rv;
575         }
576     }
577   else
578     {
579       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
580                                          b->current_length,
581                                          vlib_buffer_get_current (b));
582       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
583         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
584       /* if something was enqueued, report even this as success for ooo
585        * segment handling */
586       return rv;
587     }
588
589   if (queue_event)
590     {
591       /* Queue RX event on this fifo. Eventually these will need to be flushed
592        * by calling stream_server_flush_enqueue_events () */
593       session_worker_t *wrk;
594
595       wrk = session_main_get_worker (s->thread_index);
596       if (!(s->flags & SESSION_F_RX_EVT))
597         {
598           s->flags |= SESSION_F_RX_EVT;
599           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
600         }
601
602       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
603     }
604
605   return enqueued;
606 }
607
608 int
609 session_enqueue_dgram_connection (session_t * s,
610                                   session_dgram_hdr_t * hdr,
611                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
612 {
613   int rv;
614
615   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
616           >= b->current_length + sizeof (*hdr));
617
618   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
619     {
620       /* *INDENT-OFF* */
621       svm_fifo_seg_t segs[2] = {
622           { (u8 *) hdr, sizeof (*hdr) },
623           { vlib_buffer_get_current (b), b->current_length }
624       };
625       /* *INDENT-ON* */
626
627       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
628                                       0 /* allow_partial */ );
629     }
630   else
631     {
632       vlib_main_t *vm = vlib_get_main ();
633       svm_fifo_seg_t *segs = 0, *seg;
634       vlib_buffer_t *it = b;
635       u32 n_segs = 1;
636
637       vec_add2 (segs, seg, 1);
638       seg->data = (u8 *) hdr;
639       seg->len = sizeof (*hdr);
640       while (it)
641         {
642           vec_add2 (segs, seg, 1);
643           seg->data = vlib_buffer_get_current (it);
644           seg->len = it->current_length;
645           n_segs++;
646           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
647             break;
648           it = vlib_get_buffer (vm, it->next_buffer);
649         }
650       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
651                                       0 /* allow partial */ );
652       vec_free (segs);
653     }
654
655   if (queue_event && rv > 0)
656     {
657       /* Queue RX event on this fifo. Eventually these will need to be flushed
658        * by calling stream_server_flush_enqueue_events () */
659       session_worker_t *wrk;
660
661       wrk = session_main_get_worker (s->thread_index);
662       if (!(s->flags & SESSION_F_RX_EVT))
663         {
664           s->flags |= SESSION_F_RX_EVT;
665           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
666         }
667
668       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
669     }
670   return rv > 0 ? rv : 0;
671 }
672
673 int
674 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
675                             u32 offset, u32 max_bytes)
676 {
677   session_t *s = session_get (tc->s_index, tc->thread_index);
678   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
679 }
680
681 u32
682 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
683 {
684   session_t *s = session_get (tc->s_index, tc->thread_index);
685   u32 rv;
686
687   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
688   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
689
690   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
691     session_dequeue_notify (s);
692
693   return rv;
694 }
695
696 static inline int
697 session_notify_subscribers (u32 app_index, session_t * s,
698                             svm_fifo_t * f, session_evt_type_t evt_type)
699 {
700   app_worker_t *app_wrk;
701   application_t *app;
702   int i;
703
704   app = application_get (app_index);
705   if (!app)
706     return -1;
707
708   for (i = 0; i < f->shr->n_subscribers; i++)
709     {
710       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
711       if (!app_wrk)
712         continue;
713       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
714         return -1;
715     }
716
717   return 0;
718 }
719
720 /**
721  * Notify session peer that new data has been enqueued.
722  *
723  * @param s     Stream session for which the event is to be generated.
724  * @param lock  Flag to indicate if call should lock message queue.
725  *
726  * @return 0 on success or negative number if failed to send notification.
727  */
728 static inline int
729 session_enqueue_notify_inline (session_t * s)
730 {
731   app_worker_t *app_wrk;
732   u32 session_index;
733   u8 n_subscribers;
734
735   session_index = s->session_index;
736   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
737
738   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
739   if (PREDICT_FALSE (!app_wrk))
740     {
741       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
742       return 0;
743     }
744
745   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
746
747   s->flags &= ~SESSION_F_RX_EVT;
748
749   /* Application didn't confirm accept yet */
750   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
751     return 0;
752
753   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
754                                                      SESSION_IO_EVT_RX)))
755     return -1;
756
757   if (PREDICT_FALSE (n_subscribers))
758     {
759       s = session_get (session_index, vlib_get_thread_index ());
760       return session_notify_subscribers (app_wrk->app_index, s,
761                                          s->rx_fifo, SESSION_IO_EVT_RX);
762     }
763
764   return 0;
765 }
766
767 int
768 session_enqueue_notify (session_t * s)
769 {
770   return session_enqueue_notify_inline (s);
771 }
772
773 static void
774 session_enqueue_notify_rpc (void *arg)
775 {
776   u32 session_index = pointer_to_uword (arg);
777   session_t *s;
778
779   s = session_get_if_valid (session_index, vlib_get_thread_index ());
780   if (!s)
781     return;
782
783   session_enqueue_notify (s);
784 }
785
786 /**
787  * Like session_enqueue_notify, but can be called from a thread that does not
788  * own the session.
789  */
790 void
791 session_enqueue_notify_thread (session_handle_t sh)
792 {
793   u32 thread_index = session_thread_from_handle (sh);
794   u32 session_index = session_index_from_handle (sh);
795
796   /*
797    * Pass session index (u32) as opposed to handle (u64) in case pointers
798    * are not 64-bit.
799    */
800   session_send_rpc_evt_to_thread (thread_index,
801                                   session_enqueue_notify_rpc,
802                                   uword_to_pointer (session_index, void *));
803 }
804
805 int
806 session_dequeue_notify (session_t * s)
807 {
808   app_worker_t *app_wrk;
809
810   svm_fifo_clear_deq_ntf (s->tx_fifo);
811
812   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
813   if (PREDICT_FALSE (!app_wrk))
814     return -1;
815
816   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
817                                                      SESSION_IO_EVT_TX)))
818     return -1;
819
820   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
821     return session_notify_subscribers (app_wrk->app_index, s,
822                                        s->tx_fifo, SESSION_IO_EVT_TX);
823
824   return 0;
825 }
826
827 /**
828  * Flushes queue of sessions that are to be notified of new data
829  * enqueued events.
830  *
831  * @param thread_index Thread index for which the flush is to be performed.
832  * @return 0 on success or a positive number indicating the number of
833  *         failures due to API queue being full.
834  */
835 int
836 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
837 {
838   session_worker_t *wrk = session_main_get_worker (thread_index);
839   session_t *s;
840   int i, errors = 0;
841   u32 *indices;
842
843   indices = wrk->session_to_enqueue[transport_proto];
844
845   for (i = 0; i < vec_len (indices); i++)
846     {
847       s = session_get_if_valid (indices[i], thread_index);
848       if (PREDICT_FALSE (!s))
849         {
850           errors++;
851           continue;
852         }
853
854       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
855                            0 /* TODO/not needed */ );
856
857       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
858         errors++;
859     }
860
861   vec_reset_length (indices);
862   wrk->session_to_enqueue[transport_proto] = indices;
863
864   return errors;
865 }
866
867 int
868 session_main_flush_all_enqueue_events (u8 transport_proto)
869 {
870   vlib_thread_main_t *vtm = vlib_get_thread_main ();
871   int i, errors = 0;
872   for (i = 0; i < 1 + vtm->n_threads; i++)
873     errors += session_main_flush_enqueue_events (transport_proto, i);
874   return errors;
875 }
876
877 int
878 session_stream_connect_notify (transport_connection_t * tc,
879                                session_error_t err)
880 {
881   u32 opaque = 0, new_ti, new_si;
882   app_worker_t *app_wrk;
883   session_t *s = 0, *ho;
884
885   /*
886    * Cleanup half-open table
887    */
888   session_lookup_del_half_open (tc);
889
890   ho = ho_session_get (tc->s_index);
891   opaque = ho->opaque;
892   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
893   if (!app_wrk)
894     return -1;
895
896   if (err)
897     return app_worker_connect_notify (app_wrk, s, err, opaque);
898
899   s = session_alloc_for_connection (tc);
900   s->session_state = SESSION_STATE_CONNECTING;
901   s->app_wrk_index = app_wrk->wrk_index;
902   new_si = s->session_index;
903   new_ti = s->thread_index;
904
905   if ((err = app_worker_init_connected (app_wrk, s)))
906     {
907       session_free (s);
908       app_worker_connect_notify (app_wrk, 0, err, opaque);
909       return -1;
910     }
911
912   s = session_get (new_si, new_ti);
913   s->session_state = SESSION_STATE_READY;
914   session_lookup_add_connection (tc, session_handle (s));
915
916   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
917     {
918       session_lookup_del_connection (tc);
919       /* Avoid notifying app about rejected session cleanup */
920       s = session_get (new_si, new_ti);
921       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
922       session_free (s);
923       return -1;
924     }
925
926   return 0;
927 }
928
929 typedef union session_switch_pool_reply_args_
930 {
931   struct
932   {
933     u32 session_index;
934     u16 thread_index;
935     u8 is_closed;
936   };
937   u64 as_u64;
938 } session_switch_pool_reply_args_t;
939
940 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
941                "switch pool reply args size");
942
943 static void
944 session_switch_pool_reply (void *arg)
945 {
946   session_switch_pool_reply_args_t rargs;
947   session_t *s;
948
949   rargs.as_u64 = pointer_to_uword (arg);
950   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
951   if (!s)
952     return;
953
954   /* Session closed during migration. Clean everything up */
955   if (rargs.is_closed)
956     {
957       transport_cleanup (session_get_transport_proto (s), s->connection_index,
958                          s->thread_index);
959       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
960       session_free (s);
961       return;
962     }
963
964   /* Notify app that it has data on the new session */
965   session_enqueue_notify (s);
966 }
967
968 typedef struct _session_switch_pool_args
969 {
970   u32 session_index;
971   u32 thread_index;
972   u32 new_thread_index;
973   u32 new_session_index;
974 } session_switch_pool_args_t;
975
976 /**
977  * Notify old thread of the session pool switch
978  */
979 static void
980 session_switch_pool (void *cb_args)
981 {
982   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
983   session_switch_pool_reply_args_t rargs;
984   session_handle_t new_sh;
985   segment_manager_t *sm;
986   app_worker_t *app_wrk;
987   session_t *s;
988
989   ASSERT (args->thread_index == vlib_get_thread_index ());
990   s = session_get (args->session_index, args->thread_index);
991
992   /* Check if session closed during migration */
993   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
994
995   transport_cleanup (session_get_transport_proto (s), s->connection_index,
996                      s->thread_index);
997
998   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
999   if (app_wrk)
1000     {
1001       /* Cleanup fifo segment slice state for fifos */
1002       sm = app_worker_get_connect_segment_manager (app_wrk);
1003       segment_manager_detach_fifo (sm, &s->rx_fifo);
1004       segment_manager_detach_fifo (sm, &s->tx_fifo);
1005
1006       /* Notify app, using old session, about the migration event */
1007       if (!rargs.is_closed)
1008         {
1009           new_sh = session_make_handle (args->new_session_index,
1010                                         args->new_thread_index);
1011           app_worker_migrate_notify (app_wrk, s, new_sh);
1012         }
1013     }
1014
1015   /* Trigger app read and fifo updates on the new thread */
1016   rargs.session_index = args->new_session_index;
1017   rargs.thread_index = args->new_thread_index;
1018   session_send_rpc_evt_to_thread (args->new_thread_index,
1019                                   session_switch_pool_reply,
1020                                   uword_to_pointer (rargs.as_u64, void *));
1021
1022   session_free (s);
1023   clib_mem_free (cb_args);
1024 }
1025
1026 /**
1027  * Move dgram session to the right thread
1028  */
1029 int
1030 session_dgram_connect_notify (transport_connection_t * tc,
1031                               u32 old_thread_index, session_t ** new_session)
1032 {
1033   session_t *new_s;
1034   session_switch_pool_args_t *rpc_args;
1035   segment_manager_t *sm;
1036   app_worker_t *app_wrk;
1037
1038   /*
1039    * Clone half-open session to the right thread.
1040    */
1041   new_s = session_clone_safe (tc->s_index, old_thread_index);
1042   new_s->connection_index = tc->c_index;
1043   new_s->session_state = SESSION_STATE_READY;
1044   new_s->flags |= SESSION_F_IS_MIGRATING;
1045
1046   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1047     session_lookup_add_connection (tc, session_handle (new_s));
1048
1049   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1050   if (app_wrk)
1051     {
1052       /* New set of fifos attached to the same shared memory */
1053       sm = app_worker_get_connect_segment_manager (app_wrk);
1054       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1055       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1056     }
1057
1058   /*
1059    * Ask thread owning the old session to clean it up and make us the tx
1060    * fifo owner
1061    */
1062   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1063   rpc_args->new_session_index = new_s->session_index;
1064   rpc_args->new_thread_index = new_s->thread_index;
1065   rpc_args->session_index = tc->s_index;
1066   rpc_args->thread_index = old_thread_index;
1067   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1068                                   rpc_args);
1069
1070   tc->s_index = new_s->session_index;
1071   new_s->connection_index = tc->c_index;
1072   *new_session = new_s;
1073   return 0;
1074 }
1075
1076 /**
1077  * Notification from transport that connection is being closed.
1078  *
1079  * A disconnect is sent to application but state is not removed. Once
1080  * disconnect is acknowledged by application, session disconnect is called.
1081  * Ultimately this leads to close being called on transport (passive close).
1082  */
1083 void
1084 session_transport_closing_notify (transport_connection_t * tc)
1085 {
1086   app_worker_t *app_wrk;
1087   session_t *s;
1088
1089   s = session_get (tc->s_index, tc->thread_index);
1090   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1091     return;
1092
1093   /* Wait for reply from app before sending notification as the
1094    * accept might be rejected */
1095   if (s->session_state == SESSION_STATE_ACCEPTING)
1096     {
1097       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1098       return;
1099     }
1100
1101   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1102   app_wrk = app_worker_get (s->app_wrk_index);
1103   app_worker_close_notify (app_wrk, s);
1104 }
1105
1106 /**
1107  * Notification from transport that connection is being deleted
1108  *
1109  * This removes the session if it is still valid. It should be called only on
1110  * previously fully established sessions. For instance failed connects should
1111  * call stream_session_connect_notify and indicate that the connect has
1112  * failed.
1113  */
1114 void
1115 session_transport_delete_notify (transport_connection_t * tc)
1116 {
1117   session_t *s;
1118
1119   /* App might've been removed already */
1120   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1121     return;
1122
1123   switch (s->session_state)
1124     {
1125     case SESSION_STATE_CREATED:
1126       /* Session was created but accept notification was not yet sent to the
1127        * app. Cleanup everything. */
1128       session_lookup_del_session (s);
1129       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1130       session_free (s);
1131       break;
1132     case SESSION_STATE_ACCEPTING:
1133     case SESSION_STATE_TRANSPORT_CLOSING:
1134     case SESSION_STATE_CLOSING:
1135     case SESSION_STATE_TRANSPORT_CLOSED:
1136       /* If transport finishes or times out before we get a reply
1137        * from the app, mark transport as closed and wait for reply
1138        * before removing the session. Cleanup session table in advance
1139        * because transport will soon be closed and closed sessions
1140        * are assumed to have been removed from the lookup table */
1141       session_lookup_del_session (s);
1142       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1143       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1144       svm_fifo_dequeue_drop_all (s->tx_fifo);
1145       break;
1146     case SESSION_STATE_APP_CLOSED:
1147       /* Cleanup lookup table as transport needs to still be valid.
1148        * Program transport close to ensure that all session events
1149        * have been cleaned up. Once transport close is called, the
1150        * session is just removed because both transport and app have
1151        * confirmed the close*/
1152       session_lookup_del_session (s);
1153       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1154       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1155       svm_fifo_dequeue_drop_all (s->tx_fifo);
1156       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1157       break;
1158     case SESSION_STATE_TRANSPORT_DELETED:
1159       break;
1160     case SESSION_STATE_CLOSED:
1161       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1162       session_delete (s);
1163       break;
1164     default:
1165       clib_warning ("session state %u", s->session_state);
1166       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1167       session_delete (s);
1168       break;
1169     }
1170 }
1171
1172 /**
1173  * Notification from transport that it is closed
1174  *
1175  * Should be called by transport, prior to calling delete notify, once it
1176  * knows that no more data will be exchanged. This could serve as an
1177  * early acknowledgment of an active close especially if transport delete
1178  * can be delayed a long time, e.g., tcp time-wait.
1179  */
1180 void
1181 session_transport_closed_notify (transport_connection_t * tc)
1182 {
1183   app_worker_t *app_wrk;
1184   session_t *s;
1185
1186   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1187     return;
1188
1189   /* Transport thinks that app requested close but it actually didn't.
1190    * Can happen for tcp:
1191    * 1)if fin and rst are received in close succession.
1192    * 2)if app shutdown the connection.  */
1193   if (s->session_state == SESSION_STATE_READY)
1194     {
1195       session_transport_closing_notify (tc);
1196       svm_fifo_dequeue_drop_all (s->tx_fifo);
1197       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1198     }
1199   /* If app close has not been received or has not yet resulted in
1200    * a transport close, only mark the session transport as closed */
1201   else if (s->session_state <= SESSION_STATE_CLOSING)
1202     {
1203       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1204     }
1205   /* If app also closed, switch to closed */
1206   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1207     s->session_state = SESSION_STATE_CLOSED;
1208
1209   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1210   if (app_wrk)
1211     app_worker_transport_closed_notify (app_wrk, s);
1212 }
1213
1214 /**
1215  * Notify application that connection has been reset.
1216  */
1217 void
1218 session_transport_reset_notify (transport_connection_t * tc)
1219 {
1220   app_worker_t *app_wrk;
1221   session_t *s;
1222
1223   s = session_get (tc->s_index, tc->thread_index);
1224   svm_fifo_dequeue_drop_all (s->tx_fifo);
1225   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1226     return;
1227   if (s->session_state == SESSION_STATE_ACCEPTING)
1228     {
1229       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1230       return;
1231     }
1232   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1233   app_wrk = app_worker_get (s->app_wrk_index);
1234   app_worker_reset_notify (app_wrk, s);
1235 }
1236
1237 int
1238 session_stream_accept_notify (transport_connection_t * tc)
1239 {
1240   app_worker_t *app_wrk;
1241   session_t *s;
1242
1243   s = session_get (tc->s_index, tc->thread_index);
1244   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1245   if (!app_wrk)
1246     return -1;
1247   if (s->session_state != SESSION_STATE_CREATED)
1248     return 0;
1249   s->session_state = SESSION_STATE_ACCEPTING;
1250   if (app_worker_accept_notify (app_wrk, s))
1251     {
1252       /* On transport delete, no notifications should be sent. Unless, the
1253        * accept is retried and successful. */
1254       s->session_state = SESSION_STATE_CREATED;
1255       return -1;
1256     }
1257   return 0;
1258 }
1259
1260 /**
1261  * Accept a stream session. Optionally ping the server by callback.
1262  */
1263 int
1264 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1265                        u32 thread_index, u8 notify)
1266 {
1267   session_t *s;
1268   int rv;
1269
1270   s = session_alloc_for_connection (tc);
1271   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1272   s->session_state = SESSION_STATE_CREATED;
1273
1274   if ((rv = app_worker_init_accepted (s)))
1275     {
1276       session_free (s);
1277       return rv;
1278     }
1279
1280   session_lookup_add_connection (tc, session_handle (s));
1281
1282   /* Shoulder-tap the server */
1283   if (notify)
1284     {
1285       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1286       if ((rv = app_worker_accept_notify (app_wrk, s)))
1287         {
1288           session_lookup_del_session (s);
1289           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1290           session_free (s);
1291           return rv;
1292         }
1293     }
1294
1295   return 0;
1296 }
1297
1298 int
1299 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1300                       u32 thread_index)
1301 {
1302   app_worker_t *app_wrk;
1303   session_t *s;
1304   int rv;
1305
1306   s = session_alloc_for_connection (tc);
1307   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1308
1309   if ((rv = app_worker_init_accepted (s)))
1310     {
1311       session_free (s);
1312       return rv;
1313     }
1314
1315   session_lookup_add_connection (tc, session_handle (s));
1316   s->session_state = SESSION_STATE_ACCEPTING;
1317
1318   app_wrk = app_worker_get (s->app_wrk_index);
1319   if ((rv = app_worker_accept_notify (app_wrk, s)))
1320     {
1321       session_lookup_del_session (s);
1322       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1323       session_free (s);
1324       return rv;
1325     }
1326
1327   return 0;
1328 }
1329
1330 int
1331 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1332 {
1333   transport_connection_t *tc;
1334   transport_endpoint_cfg_t *tep;
1335   app_worker_t *app_wrk;
1336   session_handle_t sh;
1337   session_t *s;
1338   int rv;
1339
1340   tep = session_endpoint_to_transport_cfg (rmt);
1341   rv = transport_connect (rmt->transport_proto, tep);
1342   if (rv < 0)
1343     {
1344       SESSION_DBG ("Transport failed to open connection.");
1345       return rv;
1346     }
1347
1348   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1349
1350   /* For dgram type of service, allocate session and fifos now */
1351   app_wrk = app_worker_get (rmt->app_wrk_index);
1352   s = session_alloc_for_connection (tc);
1353   s->app_wrk_index = app_wrk->wrk_index;
1354   s->session_state = SESSION_STATE_OPENED;
1355   if (app_worker_init_connected (app_wrk, s))
1356     {
1357       session_free (s);
1358       return -1;
1359     }
1360
1361   sh = session_handle (s);
1362   *rsh = sh;
1363
1364   session_lookup_add_connection (tc, sh);
1365   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1366 }
1367
1368 int
1369 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1370 {
1371   transport_connection_t *tc;
1372   transport_endpoint_cfg_t *tep;
1373   app_worker_t *app_wrk;
1374   session_t *ho;
1375   int rv;
1376
1377   tep = session_endpoint_to_transport_cfg (rmt);
1378   rv = transport_connect (rmt->transport_proto, tep);
1379   if (rv < 0)
1380     {
1381       SESSION_DBG ("Transport failed to open connection.");
1382       return rv;
1383     }
1384
1385   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1386
1387   app_wrk = app_worker_get (rmt->app_wrk_index);
1388
1389   /* If transport offers a vc service, only allocate established
1390    * session once the connection has been established.
1391    * In the meantime allocate half-open session for tracking purposes
1392    * associate half-open connection to it and add session to app-worker
1393    * half-open table. These are needed to allocate the established
1394    * session on transport notification, and to cleanup the half-open
1395    * session if the app detaches before connection establishment.
1396    */
1397   ho = session_alloc_for_half_open (tc);
1398   ho->app_wrk_index = app_wrk->wrk_index;
1399   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1400   ho->opaque = rmt->opaque;
1401   *rsh = session_handle (ho);
1402
1403   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1404     session_lookup_add_half_open (tc, tc->c_index);
1405
1406   return 0;
1407 }
1408
1409 int
1410 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1411 {
1412   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1413
1414   /* Not supported for now */
1415   *rsh = SESSION_INVALID_HANDLE;
1416   return transport_connect (rmt->transport_proto, tep_cfg);
1417 }
1418
1419 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1420                                         session_handle_t *);
1421
1422 /* *INDENT-OFF* */
1423 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1424   session_open_vc,
1425   session_open_cl,
1426   session_open_app,
1427 };
1428 /* *INDENT-ON* */
1429
1430 /**
1431  * Ask transport to open connection to remote transport endpoint.
1432  *
1433  * Stores handle for matching request with reply since the call can be
1434  * asynchronous. For instance, for TCP the 3-way handshake must complete
1435  * before reply comes. Session is only created once connection is established.
1436  *
1437  * @param app_index Index of the application requesting the connect
1438  * @param st Session type requested.
1439  * @param tep Remote transport endpoint
1440  * @param opaque Opaque data (typically, api_context) the application expects
1441  *               on open completion.
1442  */
1443 int
1444 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1445 {
1446   transport_service_type_t tst;
1447   tst = transport_protocol_service_type (rmt->transport_proto);
1448   return session_open_srv_fns[tst](rmt, rsh);
1449 }
1450
1451 /**
1452  * Ask transport to listen on session endpoint.
1453  *
1454  * @param s Session for which listen will be called. Note that unlike
1455  *          established sessions, listen sessions are not associated to a
1456  *          thread.
1457  * @param sep Local endpoint to be listened on.
1458  */
1459 int
1460 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1461 {
1462   transport_endpoint_cfg_t *tep;
1463   int tc_index;
1464   u32 s_index;
1465
1466   /* Transport bind/listen */
1467   tep = session_endpoint_to_transport_cfg (sep);
1468   s_index = ls->session_index;
1469   tc_index = transport_start_listen (session_get_transport_proto (ls),
1470                                      s_index, tep);
1471
1472   if (tc_index < 0)
1473     return tc_index;
1474
1475   /* Attach transport to session. Lookup tables are populated by the app
1476    * worker because local tables (for ct sessions) are not backed by a fib */
1477   ls = listen_session_get (s_index);
1478   ls->connection_index = tc_index;
1479   ls->opaque = sep->opaque;
1480
1481   return 0;
1482 }
1483
1484 /**
1485  * Ask transport to stop listening on local transport endpoint.
1486  *
1487  * @param s Session to stop listening on. It must be in state LISTENING.
1488  */
1489 int
1490 session_stop_listen (session_t * s)
1491 {
1492   transport_proto_t tp = session_get_transport_proto (s);
1493   transport_connection_t *tc;
1494
1495   if (s->session_state != SESSION_STATE_LISTENING)
1496     return SESSION_E_NOLISTEN;
1497
1498   tc = transport_get_listener (tp, s->connection_index);
1499
1500   /* If no transport, assume everything was cleaned up already */
1501   if (!tc)
1502     return SESSION_E_NONE;
1503
1504   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1505     session_lookup_del_connection (tc);
1506
1507   transport_stop_listen (tp, s->connection_index);
1508   return 0;
1509 }
1510
1511 /**
1512  * Initialize session half-closing procedure.
1513  *
1514  * Note that half-closing will not change the state of the session.
1515  */
1516 void
1517 session_half_close (session_t *s)
1518 {
1519   if (!s)
1520     return;
1521
1522   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1523 }
1524
1525 /**
1526  * Initialize session closing procedure.
1527  *
1528  * Request is always sent to session node to ensure that all outstanding
1529  * requests are served before transport is notified.
1530  */
1531 void
1532 session_close (session_t * s)
1533 {
1534   if (!s)
1535     return;
1536
1537   if (s->session_state >= SESSION_STATE_CLOSING)
1538     {
1539       /* Session will only be removed once both app and transport
1540        * acknowledge the close */
1541       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1542           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1543         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1544       return;
1545     }
1546
1547   /* App closed so stop propagating dequeue notifications */
1548   svm_fifo_clear_deq_ntf (s->tx_fifo);
1549   s->session_state = SESSION_STATE_CLOSING;
1550   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1551 }
1552
1553 /**
1554  * Force a close without waiting for data to be flushed
1555  */
1556 void
1557 session_reset (session_t * s)
1558 {
1559   if (s->session_state >= SESSION_STATE_CLOSING)
1560     return;
1561   /* Drop all outstanding tx data */
1562   svm_fifo_dequeue_drop_all (s->tx_fifo);
1563   s->session_state = SESSION_STATE_CLOSING;
1564   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1565 }
1566
1567 /**
1568  * Notify transport the session can be half-disconnected.
1569  *
1570  * Must be called from the session's thread.
1571  */
1572 void
1573 session_transport_half_close (session_t *s)
1574 {
1575   /* Only READY session can be half-closed */
1576   if (s->session_state != SESSION_STATE_READY)
1577     {
1578       return;
1579     }
1580
1581   transport_half_close (session_get_transport_proto (s), s->connection_index,
1582                         s->thread_index);
1583 }
1584
1585 /**
1586  * Notify transport the session can be disconnected. This should eventually
1587  * result in a delete notification that allows us to cleanup session state.
1588  * Called for both active/passive disconnects.
1589  *
1590  * Must be called from the session's thread.
1591  */
1592 void
1593 session_transport_close (session_t * s)
1594 {
1595   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1596     {
1597       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1598         s->session_state = SESSION_STATE_CLOSED;
1599       /* If transport is already deleted, just free the session */
1600       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1601         session_free_w_fifos (s);
1602       return;
1603     }
1604
1605   /* If the tx queue wasn't drained, the transport can continue to try
1606    * sending the outstanding data (in closed state it cannot). It MUST however
1607    * at one point, either after sending everything or after a timeout, call
1608    * delete notify. This will finally lead to the complete cleanup of the
1609    * session.
1610    */
1611   s->session_state = SESSION_STATE_APP_CLOSED;
1612
1613   transport_close (session_get_transport_proto (s), s->connection_index,
1614                    s->thread_index);
1615 }
1616
1617 /**
1618  * Force transport close
1619  */
1620 void
1621 session_transport_reset (session_t * s)
1622 {
1623   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1624     {
1625       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1626         s->session_state = SESSION_STATE_CLOSED;
1627       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1628         session_free_w_fifos (s);
1629       return;
1630     }
1631
1632   s->session_state = SESSION_STATE_APP_CLOSED;
1633   transport_reset (session_get_transport_proto (s), s->connection_index,
1634                    s->thread_index);
1635 }
1636
1637 /**
1638  * Cleanup transport and session state.
1639  *
1640  * Notify transport of the cleanup and free the session. This should
1641  * be called only if transport reported some error and is already
1642  * closed.
1643  */
1644 void
1645 session_transport_cleanup (session_t * s)
1646 {
1647   /* Delete from main lookup table before we axe the the transport */
1648   session_lookup_del_session (s);
1649   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1650     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1651                        s->thread_index);
1652   /* Since we called cleanup, no delete notification will come. So, make
1653    * sure the session is properly freed. */
1654   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1655   session_free (s);
1656 }
1657
1658 /**
1659  * Allocate worker mqs in share-able segment
1660  *
1661  * That can only be a newly created memfd segment, that must be mapped
1662  * by all apps/stack users unless private rx mqs are enabled.
1663  */
1664 void
1665 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1666 {
1667   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1668   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1669   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1670   uword mqs_seg_size;
1671   int i;
1672
1673   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1674
1675   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1676     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1677   };
1678   cfg->consumer_pid = 0;
1679   cfg->n_rings = 2;
1680   cfg->q_nitems = mq_q_length;
1681   cfg->ring_cfgs = rc;
1682
1683   /*
1684    * Compute mqs segment size based on rings config and leave space
1685    * for passing extended configuration messages, i.e., data allocated
1686    * outside of the rings. If provided with a config value, accept it
1687    * if larger than minimum size.
1688    */
1689   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1690   mqs_seg_size = mqs_seg_size + (1 << 20);
1691   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1692
1693   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1694   mqs_seg->ssvm.my_pid = getpid ();
1695   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1696
1697   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1698     {
1699       clib_warning ("failed to initialize queue segment");
1700       return;
1701     }
1702
1703   fifo_segment_init (mqs_seg);
1704
1705   /* Special fifo segment that's filled only with mqs */
1706   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1707
1708   for (i = 0; i < vec_len (smm->wrk); i++)
1709     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1710 }
1711
1712 fifo_segment_t *
1713 session_main_get_wrk_mqs_segment (void)
1714 {
1715   return &session_main.wrk_mqs_segment;
1716 }
1717
1718 u64
1719 session_segment_handle (session_t * s)
1720 {
1721   svm_fifo_t *f;
1722
1723   if (!s->rx_fifo)
1724     return SESSION_INVALID_HANDLE;
1725
1726   f = s->rx_fifo;
1727   return segment_manager_make_segment_handle (f->segment_manager,
1728                                               f->segment_index);
1729 }
1730
1731 /* *INDENT-OFF* */
1732 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1733     session_tx_fifo_peek_and_snd,
1734     session_tx_fifo_dequeue_and_snd,
1735     session_tx_fifo_dequeue_internal,
1736     session_tx_fifo_dequeue_and_snd
1737 };
1738 /* *INDENT-ON* */
1739
1740 void
1741 session_register_transport (transport_proto_t transport_proto,
1742                             const transport_proto_vft_t * vft, u8 is_ip4,
1743                             u32 output_node)
1744 {
1745   session_main_t *smm = &session_main;
1746   session_type_t session_type;
1747   u32 next_index = ~0;
1748
1749   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1750
1751   vec_validate (smm->session_type_to_next, session_type);
1752   vec_validate (smm->session_tx_fns, session_type);
1753
1754   if (output_node != ~0)
1755     next_index = vlib_node_add_next (vlib_get_main (),
1756                                      session_queue_node.index, output_node);
1757
1758   smm->session_type_to_next[session_type] = next_index;
1759   smm->session_tx_fns[session_type] =
1760     session_tx_fns[vft->transport_options.tx_type];
1761 }
1762
1763 void
1764 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1765 {
1766   session_main_t *smm = &session_main;
1767   session_update_time_fn *fi;
1768   u32 fi_pos = ~0;
1769   u8 found = 0;
1770
1771   vec_foreach (fi, smm->update_time_fns)
1772     {
1773       if (*fi == fn)
1774         {
1775           fi_pos = fi - smm->update_time_fns;
1776           found = 1;
1777           break;
1778         }
1779     }
1780
1781   if (is_add)
1782     {
1783       if (found)
1784         {
1785           clib_warning ("update time fn %p already registered", fn);
1786           return;
1787         }
1788       vec_add1 (smm->update_time_fns, fn);
1789     }
1790   else
1791     {
1792       vec_del1 (smm->update_time_fns, fi_pos);
1793     }
1794 }
1795
1796 transport_proto_t
1797 session_add_transport_proto (void)
1798 {
1799   session_main_t *smm = &session_main;
1800   session_worker_t *wrk;
1801   u32 thread;
1802
1803   smm->last_transport_proto_type += 1;
1804
1805   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1806     {
1807       wrk = session_main_get_worker (thread);
1808       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1809     }
1810
1811   return smm->last_transport_proto_type;
1812 }
1813
1814 transport_connection_t *
1815 session_get_transport (session_t * s)
1816 {
1817   if (s->session_state != SESSION_STATE_LISTENING)
1818     return transport_get_connection (session_get_transport_proto (s),
1819                                      s->connection_index, s->thread_index);
1820   else
1821     return transport_get_listener (session_get_transport_proto (s),
1822                                    s->connection_index);
1823 }
1824
1825 void
1826 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1827 {
1828   if (s->session_state != SESSION_STATE_LISTENING)
1829     return transport_get_endpoint (session_get_transport_proto (s),
1830                                    s->connection_index, s->thread_index, tep,
1831                                    is_lcl);
1832   else
1833     return transport_get_listener_endpoint (session_get_transport_proto (s),
1834                                             s->connection_index, tep, is_lcl);
1835 }
1836
1837 int
1838 session_transport_attribute (session_t *s, u8 is_get,
1839                              transport_endpt_attr_t *attr)
1840 {
1841   if (s->session_state < SESSION_STATE_READY)
1842     return -1;
1843
1844   return transport_connection_attribute (session_get_transport_proto (s),
1845                                          s->connection_index, s->thread_index,
1846                                          is_get, attr);
1847 }
1848
1849 transport_connection_t *
1850 listen_session_get_transport (session_t * s)
1851 {
1852   return transport_get_listener (session_get_transport_proto (s),
1853                                  s->connection_index);
1854 }
1855
1856 void
1857 session_queue_run_on_main_thread (vlib_main_t * vm)
1858 {
1859   ASSERT (vlib_get_thread_index () == 0);
1860   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1861 }
1862
1863 static void
1864 session_stats_collector_fn (vlib_stats_collector_data_t *d)
1865 {
1866   u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1867   session_main_t *smm = &session_main;
1868   session_worker_t *wrk;
1869   counter_t **counters;
1870   counter_t *cb;
1871
1872   n_workers = vec_len (smm->wrk);
1873   vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1874   counters = d->entry->data;
1875   cb = counters[0];
1876
1877   for (i = 0; i < vec_len (smm->wrk); i++)
1878     {
1879       wrk = session_main_get_worker (i);
1880       n_wrk_sessions = pool_elts (wrk->sessions);
1881       cb[i] = n_wrk_sessions;
1882       n_sessions += n_wrk_sessions;
1883     }
1884
1885   vlib_stats_set_gauge (d->private_data, n_sessions);
1886 }
1887
1888 static void
1889 session_stats_collector_init (void)
1890 {
1891   vlib_stats_collector_reg_t reg = {};
1892
1893   reg.entry_index =
1894     vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1895   reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1896   reg.collect_fn = session_stats_collector_fn;
1897   vlib_stats_register_collector_fn (&reg);
1898   vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1899 }
1900
1901 static clib_error_t *
1902 session_manager_main_enable (vlib_main_t * vm)
1903 {
1904   session_main_t *smm = &session_main;
1905   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1906   u32 num_threads, preallocated_sessions_per_worker;
1907   session_worker_t *wrk;
1908   int i;
1909
1910   /* We only initialize once and do not de-initialized on disable */
1911   if (smm->is_initialized)
1912     goto done;
1913
1914   num_threads = 1 /* main thread */  + vtm->n_threads;
1915
1916   if (num_threads < 1)
1917     return clib_error_return (0, "n_thread_stacks not set");
1918
1919   /* Allocate cache line aligned worker contexts */
1920   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1921   clib_spinlock_init (&session_main.pool_realloc_lock);
1922
1923   for (i = 0; i < num_threads; i++)
1924     {
1925       wrk = &smm->wrk[i];
1926       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1927       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1928       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1929       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1930       wrk->evts_pending_main =
1931         clib_llist_make_head (wrk->event_elts, evt_list);
1932       wrk->vm = vlib_get_main_by_index (i);
1933       wrk->last_vlib_time = vlib_time_now (vm);
1934       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1935       wrk->timerfd = -1;
1936       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1937
1938       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1939         session_wrk_enable_adaptive_mode (wrk);
1940     }
1941
1942   /* Allocate vpp event queues segment and queue */
1943   session_vpp_wrk_mqs_alloc (smm);
1944
1945   /* Initialize segment manager properties */
1946   segment_manager_main_init ();
1947
1948   /* Preallocate sessions */
1949   if (smm->preallocated_sessions)
1950     {
1951       if (num_threads == 1)
1952         {
1953           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1954         }
1955       else
1956         {
1957           int j;
1958           preallocated_sessions_per_worker =
1959             (1.1 * (f64) smm->preallocated_sessions /
1960              (f64) (num_threads - 1));
1961
1962           for (j = 1; j < num_threads; j++)
1963             {
1964               pool_init_fixed (smm->wrk[j].sessions,
1965                                preallocated_sessions_per_worker);
1966             }
1967         }
1968     }
1969
1970   session_lookup_init ();
1971   app_namespaces_init ();
1972   transport_init ();
1973   session_stats_collector_init ();
1974   smm->is_initialized = 1;
1975
1976 done:
1977
1978   smm->is_enabled = 1;
1979
1980   /* Enable transports */
1981   transport_enable_disable (vm, 1);
1982   session_debug_init ();
1983
1984   return 0;
1985 }
1986
1987 static void
1988 session_manager_main_disable (vlib_main_t * vm)
1989 {
1990   transport_enable_disable (vm, 0 /* is_en */ );
1991 }
1992
1993 /* in this new callback, cookie hint the index */
1994 void
1995 session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
1996 {
1997   session_worker_t *wrk;
1998   wrk = session_main_get_worker (vm->thread_index);
1999   session_dma_transfer *dma_transfer;
2000
2001   dma_transfer = &wrk->dma_trans[wrk->trans_head];
2002   vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2003            vec_len (dma_transfer->pending_tx_buffers));
2004   vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2005            vec_len (dma_transfer->pending_tx_nexts));
2006   vec_reset_length (dma_transfer->pending_tx_buffers);
2007   vec_reset_length (dma_transfer->pending_tx_nexts);
2008   wrk->trans_head++;
2009   if (wrk->trans_head == wrk->trans_size)
2010     wrk->trans_head = 0;
2011   return;
2012 }
2013
2014 static void
2015 session_prepare_dma_args (vlib_dma_config_t *args)
2016 {
2017   args->max_transfers = DMA_TRANS_SIZE;
2018   args->max_transfer_size = 65536;
2019   args->features = 0;
2020   args->sw_fallback = 1;
2021   args->barrier_before_last = 1;
2022   args->callback_fn = session_dma_completion_cb;
2023 }
2024
2025 static void
2026 session_node_enable_dma (u8 is_en, int n_vlibs)
2027 {
2028   vlib_dma_config_t args;
2029   session_prepare_dma_args (&args);
2030   session_worker_t *wrk;
2031   vlib_main_t *vm;
2032
2033   int config_index = -1;
2034
2035   if (is_en)
2036     {
2037       vm = vlib_get_main_by_index (0);
2038       config_index = vlib_dma_config_add (vm, &args);
2039     }
2040   else
2041     {
2042       vm = vlib_get_main_by_index (0);
2043       wrk = session_main_get_worker (0);
2044       if (wrk->config_index >= 0)
2045         vlib_dma_config_del (vm, wrk->config_index);
2046     }
2047   int i;
2048   for (i = 0; i < n_vlibs; i++)
2049     {
2050       vm = vlib_get_main_by_index (i);
2051       wrk = session_main_get_worker (vm->thread_index);
2052       wrk->config_index = config_index;
2053       if (is_en)
2054         {
2055           if (config_index >= 0)
2056             wrk->dma_enabled = true;
2057           wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2058             sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2059           bzero (wrk->dma_trans,
2060                  sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2061         }
2062       else
2063         {
2064           if (wrk->dma_trans)
2065             clib_mem_free (wrk->dma_trans);
2066         }
2067       wrk->trans_head = 0;
2068       wrk->trans_tail = 0;
2069       wrk->trans_size = DMA_TRANS_SIZE;
2070     }
2071 }
2072
2073 void
2074 session_node_enable_disable (u8 is_en)
2075 {
2076   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2077   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2078   session_main_t *sm = &session_main;
2079   vlib_main_t *vm;
2080   vlib_node_t *n;
2081   int n_vlibs, i;
2082
2083   n_vlibs = vlib_get_n_threads ();
2084   for (i = 0; i < n_vlibs; i++)
2085     {
2086       vm = vlib_get_main_by_index (i);
2087       /* main thread with workers and not polling */
2088       if (i == 0 && n_vlibs > 1)
2089         {
2090           vlib_node_set_state (vm, session_queue_node.index, mstate);
2091           if (is_en)
2092             {
2093               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2094               vlib_node_set_state (vm, session_queue_process_node.index,
2095                                    state);
2096               n = vlib_get_node (vm, session_queue_process_node.index);
2097               vlib_start_process (vm, n->runtime_index);
2098             }
2099           else
2100             {
2101               vlib_process_signal_event_mt (vm,
2102                                             session_queue_process_node.index,
2103                                             SESSION_Q_PROCESS_STOP, 0);
2104             }
2105           if (!sm->poll_main)
2106             continue;
2107         }
2108       vlib_node_set_state (vm, session_queue_node.index, state);
2109     }
2110
2111   if (sm->use_private_rx_mqs)
2112     application_enable_rx_mqs_nodes (is_en);
2113
2114   if (sm->dma_enabled)
2115     session_node_enable_dma (is_en, n_vlibs);
2116 }
2117
2118 clib_error_t *
2119 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2120 {
2121   clib_error_t *error = 0;
2122   if (is_en)
2123     {
2124       if (session_main.is_enabled)
2125         return 0;
2126
2127       error = session_manager_main_enable (vm);
2128       session_node_enable_disable (is_en);
2129     }
2130   else
2131     {
2132       session_main.is_enabled = 0;
2133       session_manager_main_disable (vm);
2134       session_node_enable_disable (is_en);
2135     }
2136
2137   return error;
2138 }
2139
2140 clib_error_t *
2141 session_main_init (vlib_main_t * vm)
2142 {
2143   session_main_t *smm = &session_main;
2144
2145   smm->is_enabled = 0;
2146   smm->session_enable_asap = 0;
2147   smm->poll_main = 0;
2148   smm->use_private_rx_mqs = 0;
2149   smm->no_adaptive = 0;
2150   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2151
2152   return 0;
2153 }
2154
2155 static clib_error_t *
2156 session_main_loop_init (vlib_main_t * vm)
2157 {
2158   session_main_t *smm = &session_main;
2159   if (smm->session_enable_asap)
2160     {
2161       vlib_worker_thread_barrier_sync (vm);
2162       vnet_session_enable_disable (vm, 1 /* is_en */ );
2163       vlib_worker_thread_barrier_release (vm);
2164     }
2165   return 0;
2166 }
2167
2168 VLIB_INIT_FUNCTION (session_main_init);
2169 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2170
2171 static clib_error_t *
2172 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2173 {
2174   session_main_t *smm = &session_main;
2175   u32 nitems;
2176   uword tmp;
2177
2178   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2179     {
2180       if (unformat (input, "wrk-mq-length %d", &nitems))
2181         {
2182           if (nitems >= 2048)
2183             smm->configured_wrk_mq_length = nitems;
2184           else
2185             clib_warning ("event queue length %d too small, ignored", nitems);
2186         }
2187       else if (unformat (input, "wrk-mqs-segment-size %U",
2188                          unformat_memory_size, &smm->wrk_mqs_segment_size))
2189         ;
2190       else if (unformat (input, "preallocated-sessions %d",
2191                          &smm->preallocated_sessions))
2192         ;
2193       else if (unformat (input, "v4-session-table-buckets %d",
2194                          &smm->configured_v4_session_table_buckets))
2195         ;
2196       else if (unformat (input, "v4-halfopen-table-buckets %d",
2197                          &smm->configured_v4_halfopen_table_buckets))
2198         ;
2199       else if (unformat (input, "v6-session-table-buckets %d",
2200                          &smm->configured_v6_session_table_buckets))
2201         ;
2202       else if (unformat (input, "v6-halfopen-table-buckets %d",
2203                          &smm->configured_v6_halfopen_table_buckets))
2204         ;
2205       else if (unformat (input, "v4-session-table-memory %U",
2206                          unformat_memory_size, &tmp))
2207         {
2208           if (tmp >= 0x100000000)
2209             return clib_error_return (0, "memory size %llx (%lld) too large",
2210                                       tmp, tmp);
2211           smm->configured_v4_session_table_memory = tmp;
2212         }
2213       else if (unformat (input, "v4-halfopen-table-memory %U",
2214                          unformat_memory_size, &tmp))
2215         {
2216           if (tmp >= 0x100000000)
2217             return clib_error_return (0, "memory size %llx (%lld) too large",
2218                                       tmp, tmp);
2219           smm->configured_v4_halfopen_table_memory = tmp;
2220         }
2221       else if (unformat (input, "v6-session-table-memory %U",
2222                          unformat_memory_size, &tmp))
2223         {
2224           if (tmp >= 0x100000000)
2225             return clib_error_return (0, "memory size %llx (%lld) too large",
2226                                       tmp, tmp);
2227           smm->configured_v6_session_table_memory = tmp;
2228         }
2229       else if (unformat (input, "v6-halfopen-table-memory %U",
2230                          unformat_memory_size, &tmp))
2231         {
2232           if (tmp >= 0x100000000)
2233             return clib_error_return (0, "memory size %llx (%lld) too large",
2234                                       tmp, tmp);
2235           smm->configured_v6_halfopen_table_memory = tmp;
2236         }
2237       else if (unformat (input, "local-endpoints-table-memory %U",
2238                          unformat_memory_size, &tmp))
2239         {
2240           if (tmp >= 0x100000000)
2241             return clib_error_return (0, "memory size %llx (%lld) too large",
2242                                       tmp, tmp);
2243           smm->local_endpoints_table_memory = tmp;
2244         }
2245       else if (unformat (input, "local-endpoints-table-buckets %d",
2246                          &smm->local_endpoints_table_buckets))
2247         ;
2248       else if (unformat (input, "enable"))
2249         smm->session_enable_asap = 1;
2250       else if (unformat (input, "use-app-socket-api"))
2251         (void) appns_sapi_enable_disable (1 /* is_enable */);
2252       else if (unformat (input, "poll-main"))
2253         smm->poll_main = 1;
2254       else if (unformat (input, "use-private-rx-mqs"))
2255         smm->use_private_rx_mqs = 1;
2256       else if (unformat (input, "no-adaptive"))
2257         smm->no_adaptive = 1;
2258       else if (unformat (input, "use-dma"))
2259         smm->dma_enabled = 1;
2260       /*
2261        * Deprecated but maintained for compatibility
2262        */
2263       else if (unformat (input, "evt_qs_memfd_seg"))
2264         ;
2265       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2266         ;
2267       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2268                          &smm->wrk_mqs_segment_size))
2269         ;
2270       else if (unformat (input, "event-queue-length %d", &nitems))
2271         {
2272           if (nitems >= 2048)
2273             smm->configured_wrk_mq_length = nitems;
2274           else
2275             clib_warning ("event queue length %d too small, ignored", nitems);
2276         }
2277       else
2278         return clib_error_return (0, "unknown input `%U'",
2279                                   format_unformat_error, input);
2280     }
2281   return 0;
2282 }
2283
2284 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2285
2286 /*
2287  * fd.io coding-style-patch-verification: ON
2288  *
2289  * Local Variables:
2290  * eval: (c-set-style "gnu")
2291  * End:
2292  */