session: support dma option
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24 #include <vlib/stats/stats.h>
25 #include <vlib/dma/dma.h>
26
27 session_main_t session_main;
28
29 static inline int
30 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
31                             session_evt_type_t evt_type)
32 {
33   session_worker_t *wrk = session_main_get_worker (thread_index);
34   session_event_t *evt;
35   svm_msg_q_msg_t msg;
36   svm_msg_q_t *mq;
37
38   mq = wrk->vpp_event_queue;
39   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
40     return -1;
41   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
42     {
43       svm_msg_q_unlock (mq);
44       return -2;
45     }
46   switch (evt_type)
47     {
48     case SESSION_CTRL_EVT_RPC:
49       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
50       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
51       evt->rpc_args.fp = data;
52       evt->rpc_args.arg = args;
53       break;
54     case SESSION_IO_EVT_RX:
55     case SESSION_IO_EVT_TX:
56     case SESSION_IO_EVT_TX_FLUSH:
57     case SESSION_IO_EVT_BUILTIN_RX:
58       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
59       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
60       evt->session_index = *(u32 *) data;
61       break;
62     case SESSION_IO_EVT_BUILTIN_TX:
63     case SESSION_CTRL_EVT_CLOSE:
64     case SESSION_CTRL_EVT_RESET:
65       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
66       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
67       evt->session_handle = session_handle ((session_t *) data);
68       break;
69     default:
70       clib_warning ("evt unhandled!");
71       svm_msg_q_unlock (mq);
72       return -1;
73     }
74   evt->event_type = evt_type;
75
76   svm_msg_q_add_and_unlock (mq, &msg);
77
78   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
79     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
80
81   return 0;
82 }
83
84 int
85 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
86 {
87   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
88                                      f->master_thread_index, evt_type);
89 }
90
91 int
92 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
93                                       session_evt_type_t evt_type)
94 {
95   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
96 }
97
98 int
99 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
100 {
101   /* only events supported are disconnect, shutdown and reset */
102   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
104           evt_type == SESSION_CTRL_EVT_RESET);
105   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 void
129 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
130 {
131   session_t *s = session_get (tc->s_index, tc->thread_index);
132
133   ASSERT (s->thread_index == vlib_get_thread_index ());
134   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
135
136   if (!(s->flags & SESSION_F_CUSTOM_TX))
137     {
138       s->flags |= SESSION_F_CUSTOM_TX;
139       if (svm_fifo_set_event (s->tx_fifo)
140           || transport_connection_is_descheduled (tc))
141         {
142           session_evt_elt_t *elt;
143           session_worker_t *wrk;
144
145           wrk = session_main_get_worker (tc->thread_index);
146           if (has_prio)
147             elt = session_evt_alloc_new (wrk);
148           else
149             elt = session_evt_alloc_old (wrk);
150           elt->evt.session_index = tc->s_index;
151           elt->evt.event_type = SESSION_IO_EVT_TX;
152           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
153
154           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
155             vlib_node_set_interrupt_pending (wrk->vm,
156                                              session_queue_node.index);
157         }
158     }
159 }
160
161 void
162 sesssion_reschedule_tx (transport_connection_t * tc)
163 {
164   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
165   session_evt_elt_t *elt;
166
167   ASSERT (tc->thread_index == vlib_get_thread_index ());
168
169   elt = session_evt_alloc_new (wrk);
170   elt->evt.session_index = tc->s_index;
171   elt->evt.event_type = SESSION_IO_EVT_TX;
172
173   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
174     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
175 }
176
177 static void
178 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
179 {
180   u32 thread_index = vlib_get_thread_index ();
181   session_evt_elt_t *elt;
182   session_worker_t *wrk;
183
184   /* If we are in the handler thread, or being called with the worker barrier
185    * held, just append a new event to pending disconnects vector. */
186   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
187     {
188       wrk = session_main_get_worker (s->thread_index);
189       elt = session_evt_alloc_ctrl (wrk);
190       clib_memset (&elt->evt, 0, sizeof (session_event_t));
191       elt->evt.session_handle = session_handle (s);
192       elt->evt.event_type = evt;
193
194       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
195         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
196     }
197   else
198     session_send_ctrl_evt_to_thread (s, evt);
199 }
200
201 session_t *
202 session_alloc (u32 thread_index)
203 {
204   session_worker_t *wrk = &session_main.wrk[thread_index];
205   session_t *s;
206
207   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
208   clib_memset (s, 0, sizeof (*s));
209   s->session_index = s - wrk->sessions;
210   s->thread_index = thread_index;
211   s->app_index = APP_INVALID_INDEX;
212
213   return s;
214 }
215
216 void
217 session_free (session_t * s)
218 {
219   if (CLIB_DEBUG)
220     {
221       u8 thread_index = s->thread_index;
222       clib_memset (s, 0xFA, sizeof (*s));
223       pool_put (session_main.wrk[thread_index].sessions, s);
224       return;
225     }
226   SESSION_EVT (SESSION_EVT_FREE, s);
227   pool_put (session_main.wrk[s->thread_index].sessions, s);
228 }
229
230 u8
231 session_is_valid (u32 si, u8 thread_index)
232 {
233   session_t *s;
234   transport_connection_t *tc;
235
236   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
237
238   if (s->thread_index != thread_index || s->session_index != si)
239     return 0;
240
241   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
242       || s->session_state <= SESSION_STATE_LISTENING)
243     return 1;
244
245   if (s->session_state == SESSION_STATE_CONNECTING &&
246       (s->flags & SESSION_F_HALF_OPEN))
247     return 1;
248
249   tc = session_get_transport (s);
250   if (s->connection_index != tc->c_index
251       || s->thread_index != tc->thread_index || tc->s_index != si)
252     return 0;
253
254   return 1;
255 }
256
257 static void
258 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
259 {
260   app_worker_t *app_wrk;
261
262   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
263   if (!app_wrk)
264     return;
265   app_worker_cleanup_notify (app_wrk, s, ntf);
266 }
267
268 void
269 session_free_w_fifos (session_t * s)
270 {
271   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
272   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
273   session_free (s);
274 }
275
276 /**
277  * Cleans up session and lookup table.
278  *
279  * Transport connection must still be valid.
280  */
281 static void
282 session_delete (session_t * s)
283 {
284   int rv;
285
286   /* Delete from the main lookup table. */
287   if ((rv = session_lookup_del_session (s)))
288     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
289
290   session_free_w_fifos (s);
291 }
292
293 void
294 session_cleanup_half_open (session_handle_t ho_handle)
295 {
296   session_t *ho = session_get_from_handle (ho_handle);
297
298   /* App transports can migrate their half-opens */
299   if (ho->flags & SESSION_F_IS_MIGRATING)
300     {
301       /* Session still migrating, move to closed state to signal that the
302        * session should be removed. */
303       if (ho->connection_index == ~0)
304         {
305           ho->session_state = SESSION_STATE_CLOSED;
306           return;
307         }
308       /* Migrated transports are no longer half-opens */
309       transport_cleanup (session_get_transport_proto (ho),
310                          ho->connection_index, ho->app_index /* overloaded */);
311     }
312   else
313     transport_cleanup_half_open (session_get_transport_proto (ho),
314                                  ho->connection_index);
315   session_free (ho);
316 }
317
318 static void
319 session_half_open_free (session_t *ho)
320 {
321   app_worker_t *app_wrk;
322
323   ASSERT (vlib_get_thread_index () <= 1);
324   app_wrk = app_worker_get (ho->app_wrk_index);
325   app_worker_del_half_open (app_wrk, ho);
326   session_free (ho);
327 }
328
329 static void
330 session_half_open_free_rpc (void *args)
331 {
332   session_t *ho = ho_session_get (pointer_to_uword (args));
333   session_half_open_free (ho);
334 }
335
336 void
337 session_half_open_delete_notify (transport_connection_t *tc)
338 {
339   /* Notification from ctrl thread accepted without rpc */
340   if (!tc->thread_index)
341     {
342       session_half_open_free (ho_session_get (tc->s_index));
343     }
344   else
345     {
346       void *args = uword_to_pointer ((uword) tc->s_index, void *);
347       session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
348                                             args);
349     }
350 }
351
352 void
353 session_half_open_migrate_notify (transport_connection_t *tc)
354 {
355   session_t *ho;
356
357   ho = ho_session_get (tc->s_index);
358   ho->flags |= SESSION_F_IS_MIGRATING;
359   ho->connection_index = ~0;
360 }
361
362 int
363 session_half_open_migrated_notify (transport_connection_t *tc)
364 {
365   session_t *ho;
366
367   ho = ho_session_get (tc->s_index);
368
369   /* App probably detached so the half-open must be cleaned up */
370   if (ho->session_state == SESSION_STATE_CLOSED)
371     {
372       session_half_open_delete_notify (tc);
373       return -1;
374     }
375   ho->connection_index = tc->c_index;
376   /* Overload app index for half-open with new thread */
377   ho->app_index = tc->thread_index;
378   return 0;
379 }
380
381 session_t *
382 session_alloc_for_connection (transport_connection_t * tc)
383 {
384   session_t *s;
385   u32 thread_index = tc->thread_index;
386
387   ASSERT (thread_index == vlib_get_thread_index ()
388           || transport_protocol_is_cl (tc->proto));
389
390   s = session_alloc (thread_index);
391   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
392   s->session_state = SESSION_STATE_CLOSED;
393
394   /* Attach transport to session and vice versa */
395   s->connection_index = tc->c_index;
396   tc->s_index = s->session_index;
397   return s;
398 }
399
400 session_t *
401 session_alloc_for_half_open (transport_connection_t *tc)
402 {
403   session_t *s;
404
405   s = ho_session_alloc ();
406   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
407   s->connection_index = tc->c_index;
408   tc->s_index = s->session_index;
409   return s;
410 }
411
412 /**
413  * Discards bytes from buffer chain
414  *
415  * It discards n_bytes_to_drop starting at first buffer after chain_b
416  */
417 always_inline void
418 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
419                                      vlib_buffer_t ** chain_b,
420                                      u32 n_bytes_to_drop)
421 {
422   vlib_buffer_t *next = *chain_b;
423   u32 to_drop = n_bytes_to_drop;
424   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
425   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
426     {
427       next = vlib_get_buffer (vm, next->next_buffer);
428       if (next->current_length > to_drop)
429         {
430           vlib_buffer_advance (next, to_drop);
431           to_drop = 0;
432         }
433       else
434         {
435           to_drop -= next->current_length;
436           next->current_length = 0;
437         }
438     }
439   *chain_b = next;
440
441   if (to_drop == 0)
442     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
443 }
444
445 /**
446  * Enqueue buffer chain tail
447  */
448 always_inline int
449 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
450                             u32 offset, u8 is_in_order)
451 {
452   vlib_buffer_t *chain_b;
453   u32 chain_bi, len, diff;
454   vlib_main_t *vm = vlib_get_main ();
455   u8 *data;
456   u32 written = 0;
457   int rv = 0;
458
459   if (is_in_order && offset)
460     {
461       diff = offset - b->current_length;
462       if (diff > b->total_length_not_including_first_buffer)
463         return 0;
464       chain_b = b;
465       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
466       chain_bi = vlib_get_buffer_index (vm, chain_b);
467     }
468   else
469     chain_bi = b->next_buffer;
470
471   do
472     {
473       chain_b = vlib_get_buffer (vm, chain_bi);
474       data = vlib_buffer_get_current (chain_b);
475       len = chain_b->current_length;
476       if (!len)
477         continue;
478       if (is_in_order)
479         {
480           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
481           if (rv == len)
482             {
483               written += rv;
484             }
485           else if (rv < len)
486             {
487               return (rv > 0) ? (written + rv) : written;
488             }
489           else if (rv > len)
490             {
491               written += rv;
492
493               /* written more than what was left in chain */
494               if (written > b->total_length_not_including_first_buffer)
495                 return written;
496
497               /* drop the bytes that have already been delivered */
498               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
499             }
500         }
501       else
502         {
503           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
504           if (rv)
505             {
506               clib_warning ("failed to enqueue multi-buffer seg");
507               return -1;
508             }
509           offset += len;
510         }
511     }
512   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
513           ? chain_b->next_buffer : 0));
514
515   if (is_in_order)
516     return written;
517
518   return 0;
519 }
520
521 void
522 session_fifo_tuning (session_t * s, svm_fifo_t * f,
523                      session_ft_action_t act, u32 len)
524 {
525   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
526     {
527       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
528       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
529       if (CLIB_ASSERT_ENABLE)
530         {
531           segment_manager_t *sm;
532           sm = segment_manager_get (f->segment_manager);
533           ASSERT (f->shr->size >= 4096);
534           ASSERT (f->shr->size <= sm->max_fifo_size);
535         }
536     }
537 }
538
539 /*
540  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
541  * event but on request can queue notification events for later delivery by
542  * calling stream_server_flush_enqueue_events().
543  *
544  * @param tc Transport connection which is to be enqueued data
545  * @param b Buffer to be enqueued
546  * @param offset Offset at which to start enqueueing if out-of-order
547  * @param queue_event Flag to indicate if peer is to be notified or if event
548  *                    is to be queued. The former is useful when more data is
549  *                    enqueued and only one event is to be generated.
550  * @param is_in_order Flag to indicate if data is in order
551  * @return Number of bytes enqueued or a negative value if enqueueing failed.
552  */
553 int
554 session_enqueue_stream_connection (transport_connection_t * tc,
555                                    vlib_buffer_t * b, u32 offset,
556                                    u8 queue_event, u8 is_in_order)
557 {
558   session_t *s;
559   int enqueued = 0, rv, in_order_off;
560
561   s = session_get (tc->s_index, tc->thread_index);
562
563   if (is_in_order)
564     {
565       enqueued = svm_fifo_enqueue (s->rx_fifo,
566                                    b->current_length,
567                                    vlib_buffer_get_current (b));
568       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
569                          && enqueued >= 0))
570         {
571           in_order_off = enqueued > b->current_length ? enqueued : 0;
572           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
573           if (rv > 0)
574             enqueued += rv;
575         }
576     }
577   else
578     {
579       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
580                                          b->current_length,
581                                          vlib_buffer_get_current (b));
582       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
583         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
584       /* if something was enqueued, report even this as success for ooo
585        * segment handling */
586       return rv;
587     }
588
589   if (queue_event)
590     {
591       /* Queue RX event on this fifo. Eventually these will need to be flushed
592        * by calling stream_server_flush_enqueue_events () */
593       session_worker_t *wrk;
594
595       wrk = session_main_get_worker (s->thread_index);
596       if (!(s->flags & SESSION_F_RX_EVT))
597         {
598           s->flags |= SESSION_F_RX_EVT;
599           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
600         }
601
602       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
603     }
604
605   return enqueued;
606 }
607
608 int
609 session_enqueue_dgram_connection (session_t * s,
610                                   session_dgram_hdr_t * hdr,
611                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
612 {
613   int rv;
614
615   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
616           >= b->current_length + sizeof (*hdr));
617
618   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
619     {
620       /* *INDENT-OFF* */
621       svm_fifo_seg_t segs[2] = {
622           { (u8 *) hdr, sizeof (*hdr) },
623           { vlib_buffer_get_current (b), b->current_length }
624       };
625       /* *INDENT-ON* */
626
627       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
628                                       0 /* allow_partial */ );
629     }
630   else
631     {
632       vlib_main_t *vm = vlib_get_main ();
633       svm_fifo_seg_t *segs = 0, *seg;
634       vlib_buffer_t *it = b;
635       u32 n_segs = 1;
636
637       vec_add2 (segs, seg, 1);
638       seg->data = (u8 *) hdr;
639       seg->len = sizeof (*hdr);
640       while (it)
641         {
642           vec_add2 (segs, seg, 1);
643           seg->data = vlib_buffer_get_current (it);
644           seg->len = it->current_length;
645           n_segs++;
646           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
647             break;
648           it = vlib_get_buffer (vm, it->next_buffer);
649         }
650       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
651                                       0 /* allow partial */ );
652       vec_free (segs);
653     }
654
655   if (queue_event && rv > 0)
656     {
657       /* Queue RX event on this fifo. Eventually these will need to be flushed
658        * by calling stream_server_flush_enqueue_events () */
659       session_worker_t *wrk;
660
661       wrk = session_main_get_worker (s->thread_index);
662       if (!(s->flags & SESSION_F_RX_EVT))
663         {
664           s->flags |= SESSION_F_RX_EVT;
665           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
666         }
667
668       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
669     }
670   return rv > 0 ? rv : 0;
671 }
672
673 int
674 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
675                             u32 offset, u32 max_bytes)
676 {
677   session_t *s = session_get (tc->s_index, tc->thread_index);
678   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
679 }
680
681 u32
682 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
683 {
684   session_t *s = session_get (tc->s_index, tc->thread_index);
685   u32 rv;
686
687   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
688   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
689
690   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
691     session_dequeue_notify (s);
692
693   return rv;
694 }
695
696 static inline int
697 session_notify_subscribers (u32 app_index, session_t * s,
698                             svm_fifo_t * f, session_evt_type_t evt_type)
699 {
700   app_worker_t *app_wrk;
701   application_t *app;
702   int i;
703
704   app = application_get (app_index);
705   if (!app)
706     return -1;
707
708   for (i = 0; i < f->shr->n_subscribers; i++)
709     {
710       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
711       if (!app_wrk)
712         continue;
713       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
714         return -1;
715     }
716
717   return 0;
718 }
719
720 /**
721  * Notify session peer that new data has been enqueued.
722  *
723  * @param s     Stream session for which the event is to be generated.
724  * @param lock  Flag to indicate if call should lock message queue.
725  *
726  * @return 0 on success or negative number if failed to send notification.
727  */
728 static inline int
729 session_enqueue_notify_inline (session_t * s)
730 {
731   app_worker_t *app_wrk;
732   u32 session_index;
733   u8 n_subscribers;
734
735   session_index = s->session_index;
736   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
737
738   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
739   if (PREDICT_FALSE (!app_wrk))
740     {
741       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
742       return 0;
743     }
744
745   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
746
747   s->flags &= ~SESSION_F_RX_EVT;
748
749   /* Application didn't confirm accept yet */
750   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
751     return 0;
752
753   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
754                                                      SESSION_IO_EVT_RX)))
755     return -1;
756
757   if (PREDICT_FALSE (n_subscribers))
758     {
759       s = session_get (session_index, vlib_get_thread_index ());
760       return session_notify_subscribers (app_wrk->app_index, s,
761                                          s->rx_fifo, SESSION_IO_EVT_RX);
762     }
763
764   return 0;
765 }
766
767 int
768 session_enqueue_notify (session_t * s)
769 {
770   return session_enqueue_notify_inline (s);
771 }
772
773 static void
774 session_enqueue_notify_rpc (void *arg)
775 {
776   u32 session_index = pointer_to_uword (arg);
777   session_t *s;
778
779   s = session_get_if_valid (session_index, vlib_get_thread_index ());
780   if (!s)
781     return;
782
783   session_enqueue_notify (s);
784 }
785
786 /**
787  * Like session_enqueue_notify, but can be called from a thread that does not
788  * own the session.
789  */
790 void
791 session_enqueue_notify_thread (session_handle_t sh)
792 {
793   u32 thread_index = session_thread_from_handle (sh);
794   u32 session_index = session_index_from_handle (sh);
795
796   /*
797    * Pass session index (u32) as opposed to handle (u64) in case pointers
798    * are not 64-bit.
799    */
800   session_send_rpc_evt_to_thread (thread_index,
801                                   session_enqueue_notify_rpc,
802                                   uword_to_pointer (session_index, void *));
803 }
804
805 int
806 session_dequeue_notify (session_t * s)
807 {
808   app_worker_t *app_wrk;
809
810   svm_fifo_clear_deq_ntf (s->tx_fifo);
811
812   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
813   if (PREDICT_FALSE (!app_wrk))
814     return -1;
815
816   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
817                                                      SESSION_IO_EVT_TX)))
818     return -1;
819
820   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
821     return session_notify_subscribers (app_wrk->app_index, s,
822                                        s->tx_fifo, SESSION_IO_EVT_TX);
823
824   return 0;
825 }
826
827 /**
828  * Flushes queue of sessions that are to be notified of new data
829  * enqueued events.
830  *
831  * @param thread_index Thread index for which the flush is to be performed.
832  * @return 0 on success or a positive number indicating the number of
833  *         failures due to API queue being full.
834  */
835 int
836 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
837 {
838   session_worker_t *wrk = session_main_get_worker (thread_index);
839   session_t *s;
840   int i, errors = 0;
841   u32 *indices;
842
843   indices = wrk->session_to_enqueue[transport_proto];
844
845   for (i = 0; i < vec_len (indices); i++)
846     {
847       s = session_get_if_valid (indices[i], thread_index);
848       if (PREDICT_FALSE (!s))
849         {
850           errors++;
851           continue;
852         }
853
854       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
855                            0 /* TODO/not needed */ );
856
857       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
858         errors++;
859     }
860
861   vec_reset_length (indices);
862   wrk->session_to_enqueue[transport_proto] = indices;
863
864   return errors;
865 }
866
867 int
868 session_main_flush_all_enqueue_events (u8 transport_proto)
869 {
870   vlib_thread_main_t *vtm = vlib_get_thread_main ();
871   int i, errors = 0;
872   for (i = 0; i < 1 + vtm->n_threads; i++)
873     errors += session_main_flush_enqueue_events (transport_proto, i);
874   return errors;
875 }
876
877 int
878 session_stream_connect_notify (transport_connection_t * tc,
879                                session_error_t err)
880 {
881   u32 opaque = 0, new_ti, new_si;
882   app_worker_t *app_wrk;
883   session_t *s = 0, *ho;
884
885   /*
886    * Cleanup half-open table
887    */
888   session_lookup_del_half_open (tc);
889
890   ho = ho_session_get (tc->s_index);
891   opaque = ho->opaque;
892   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
893   if (!app_wrk)
894     return -1;
895
896   if (err)
897     return app_worker_connect_notify (app_wrk, s, err, opaque);
898
899   s = session_alloc_for_connection (tc);
900   s->session_state = SESSION_STATE_CONNECTING;
901   s->app_wrk_index = app_wrk->wrk_index;
902   new_si = s->session_index;
903   new_ti = s->thread_index;
904
905   if ((err = app_worker_init_connected (app_wrk, s)))
906     {
907       session_free (s);
908       app_worker_connect_notify (app_wrk, 0, err, opaque);
909       return -1;
910     }
911
912   s = session_get (new_si, new_ti);
913   s->session_state = SESSION_STATE_READY;
914   session_lookup_add_connection (tc, session_handle (s));
915
916   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
917     {
918       session_lookup_del_connection (tc);
919       /* Avoid notifying app about rejected session cleanup */
920       s = session_get (new_si, new_ti);
921       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
922       session_free (s);
923       return -1;
924     }
925
926   return 0;
927 }
928
929 typedef union session_switch_pool_reply_args_
930 {
931   struct
932   {
933     u32 session_index;
934     u16 thread_index;
935     u8 is_closed;
936   };
937   u64 as_u64;
938 } session_switch_pool_reply_args_t;
939
940 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
941                "switch pool reply args size");
942
943 static void
944 session_switch_pool_reply (void *arg)
945 {
946   session_switch_pool_reply_args_t rargs;
947   session_t *s;
948
949   rargs.as_u64 = pointer_to_uword (arg);
950   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
951   if (!s)
952     return;
953
954   /* Session closed during migration. Clean everything up */
955   if (rargs.is_closed)
956     {
957       transport_cleanup (session_get_transport_proto (s), s->connection_index,
958                          s->thread_index);
959       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
960       session_free (s);
961       return;
962     }
963
964   /* Notify app that it has data on the new session */
965   session_enqueue_notify (s);
966 }
967
968 typedef struct _session_switch_pool_args
969 {
970   u32 session_index;
971   u32 thread_index;
972   u32 new_thread_index;
973   u32 new_session_index;
974 } session_switch_pool_args_t;
975
976 /**
977  * Notify old thread of the session pool switch
978  */
979 static void
980 session_switch_pool (void *cb_args)
981 {
982   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
983   session_switch_pool_reply_args_t rargs;
984   session_handle_t new_sh;
985   segment_manager_t *sm;
986   app_worker_t *app_wrk;
987   session_t *s;
988
989   ASSERT (args->thread_index == vlib_get_thread_index ());
990   s = session_get (args->session_index, args->thread_index);
991
992   /* Check if session closed during migration */
993   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
994
995   transport_cleanup (session_get_transport_proto (s), s->connection_index,
996                      s->thread_index);
997
998   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
999   if (app_wrk)
1000     {
1001       /* Cleanup fifo segment slice state for fifos */
1002       sm = app_worker_get_connect_segment_manager (app_wrk);
1003       segment_manager_detach_fifo (sm, &s->rx_fifo);
1004       segment_manager_detach_fifo (sm, &s->tx_fifo);
1005
1006       /* Notify app, using old session, about the migration event */
1007       if (!rargs.is_closed)
1008         {
1009           new_sh = session_make_handle (args->new_session_index,
1010                                         args->new_thread_index);
1011           app_worker_migrate_notify (app_wrk, s, new_sh);
1012         }
1013     }
1014
1015   /* Trigger app read and fifo updates on the new thread */
1016   rargs.session_index = args->new_session_index;
1017   rargs.thread_index = args->new_thread_index;
1018   session_send_rpc_evt_to_thread (args->new_thread_index,
1019                                   session_switch_pool_reply,
1020                                   uword_to_pointer (rargs.as_u64, void *));
1021
1022   session_free (s);
1023   clib_mem_free (cb_args);
1024 }
1025
1026 /**
1027  * Move dgram session to the right thread
1028  */
1029 int
1030 session_dgram_connect_notify (transport_connection_t * tc,
1031                               u32 old_thread_index, session_t ** new_session)
1032 {
1033   session_t *new_s;
1034   session_switch_pool_args_t *rpc_args;
1035   segment_manager_t *sm;
1036   app_worker_t *app_wrk;
1037
1038   /*
1039    * Clone half-open session to the right thread.
1040    */
1041   new_s = session_clone_safe (tc->s_index, old_thread_index);
1042   new_s->connection_index = tc->c_index;
1043   new_s->session_state = SESSION_STATE_READY;
1044   new_s->flags |= SESSION_F_IS_MIGRATING;
1045
1046   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1047     session_lookup_add_connection (tc, session_handle (new_s));
1048
1049   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1050   if (app_wrk)
1051     {
1052       /* New set of fifos attached to the same shared memory */
1053       sm = app_worker_get_connect_segment_manager (app_wrk);
1054       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1055       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1056     }
1057
1058   /*
1059    * Ask thread owning the old session to clean it up and make us the tx
1060    * fifo owner
1061    */
1062   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1063   rpc_args->new_session_index = new_s->session_index;
1064   rpc_args->new_thread_index = new_s->thread_index;
1065   rpc_args->session_index = tc->s_index;
1066   rpc_args->thread_index = old_thread_index;
1067   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1068                                   rpc_args);
1069
1070   tc->s_index = new_s->session_index;
1071   new_s->connection_index = tc->c_index;
1072   *new_session = new_s;
1073   return 0;
1074 }
1075
1076 /**
1077  * Notification from transport that connection is being closed.
1078  *
1079  * A disconnect is sent to application but state is not removed. Once
1080  * disconnect is acknowledged by application, session disconnect is called.
1081  * Ultimately this leads to close being called on transport (passive close).
1082  */
1083 void
1084 session_transport_closing_notify (transport_connection_t * tc)
1085 {
1086   app_worker_t *app_wrk;
1087   session_t *s;
1088
1089   s = session_get (tc->s_index, tc->thread_index);
1090   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1091     return;
1092
1093   /* Wait for reply from app before sending notification as the
1094    * accept might be rejected */
1095   if (s->session_state == SESSION_STATE_ACCEPTING)
1096     {
1097       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1098       return;
1099     }
1100
1101   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1102   app_wrk = app_worker_get (s->app_wrk_index);
1103   app_worker_close_notify (app_wrk, s);
1104 }
1105
1106 /**
1107  * Notification from transport that connection is being deleted
1108  *
1109  * This removes the session if it is still valid. It should be called only on
1110  * previously fully established sessions. For instance failed connects should
1111  * call stream_session_connect_notify and indicate that the connect has
1112  * failed.
1113  */
1114 void
1115 session_transport_delete_notify (transport_connection_t * tc)
1116 {
1117   session_t *s;
1118
1119   /* App might've been removed already */
1120   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1121     return;
1122
1123   switch (s->session_state)
1124     {
1125     case SESSION_STATE_CREATED:
1126       /* Session was created but accept notification was not yet sent to the
1127        * app. Cleanup everything. */
1128       session_lookup_del_session (s);
1129       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1130       session_free (s);
1131       break;
1132     case SESSION_STATE_ACCEPTING:
1133     case SESSION_STATE_TRANSPORT_CLOSING:
1134     case SESSION_STATE_CLOSING:
1135     case SESSION_STATE_TRANSPORT_CLOSED:
1136       /* If transport finishes or times out before we get a reply
1137        * from the app, mark transport as closed and wait for reply
1138        * before removing the session. Cleanup session table in advance
1139        * because transport will soon be closed and closed sessions
1140        * are assumed to have been removed from the lookup table */
1141       session_lookup_del_session (s);
1142       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1143       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1144       svm_fifo_dequeue_drop_all (s->tx_fifo);
1145       break;
1146     case SESSION_STATE_APP_CLOSED:
1147       /* Cleanup lookup table as transport needs to still be valid.
1148        * Program transport close to ensure that all session events
1149        * have been cleaned up. Once transport close is called, the
1150        * session is just removed because both transport and app have
1151        * confirmed the close*/
1152       session_lookup_del_session (s);
1153       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1154       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1155       svm_fifo_dequeue_drop_all (s->tx_fifo);
1156       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1157       break;
1158     case SESSION_STATE_TRANSPORT_DELETED:
1159       break;
1160     case SESSION_STATE_CLOSED:
1161       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1162       session_delete (s);
1163       break;
1164     default:
1165       clib_warning ("session state %u", s->session_state);
1166       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1167       session_delete (s);
1168       break;
1169     }
1170 }
1171
1172 /**
1173  * Notification from transport that it is closed
1174  *
1175  * Should be called by transport, prior to calling delete notify, once it
1176  * knows that no more data will be exchanged. This could serve as an
1177  * early acknowledgment of an active close especially if transport delete
1178  * can be delayed a long time, e.g., tcp time-wait.
1179  */
1180 void
1181 session_transport_closed_notify (transport_connection_t * tc)
1182 {
1183   app_worker_t *app_wrk;
1184   session_t *s;
1185
1186   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1187     return;
1188
1189   /* Transport thinks that app requested close but it actually didn't.
1190    * Can happen for tcp:
1191    * 1)if fin and rst are received in close succession.
1192    * 2)if app shutdown the connection.  */
1193   if (s->session_state == SESSION_STATE_READY)
1194     {
1195       session_transport_closing_notify (tc);
1196       svm_fifo_dequeue_drop_all (s->tx_fifo);
1197       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1198     }
1199   /* If app close has not been received or has not yet resulted in
1200    * a transport close, only mark the session transport as closed */
1201   else if (s->session_state <= SESSION_STATE_CLOSING)
1202     {
1203       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1204     }
1205   /* If app also closed, switch to closed */
1206   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1207     s->session_state = SESSION_STATE_CLOSED;
1208
1209   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1210   if (app_wrk)
1211     app_worker_transport_closed_notify (app_wrk, s);
1212 }
1213
1214 /**
1215  * Notify application that connection has been reset.
1216  */
1217 void
1218 session_transport_reset_notify (transport_connection_t * tc)
1219 {
1220   app_worker_t *app_wrk;
1221   session_t *s;
1222
1223   s = session_get (tc->s_index, tc->thread_index);
1224   svm_fifo_dequeue_drop_all (s->tx_fifo);
1225   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1226     return;
1227   if (s->session_state == SESSION_STATE_ACCEPTING)
1228     {
1229       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1230       return;
1231     }
1232   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1233   app_wrk = app_worker_get (s->app_wrk_index);
1234   app_worker_reset_notify (app_wrk, s);
1235 }
1236
1237 int
1238 session_stream_accept_notify (transport_connection_t * tc)
1239 {
1240   app_worker_t *app_wrk;
1241   session_t *s;
1242
1243   s = session_get (tc->s_index, tc->thread_index);
1244   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1245   if (!app_wrk)
1246     return -1;
1247   if (s->session_state != SESSION_STATE_CREATED)
1248     return 0;
1249   s->session_state = SESSION_STATE_ACCEPTING;
1250   if (app_worker_accept_notify (app_wrk, s))
1251     {
1252       /* On transport delete, no notifications should be sent. Unless, the
1253        * accept is retried and successful. */
1254       s->session_state = SESSION_STATE_CREATED;
1255       return -1;
1256     }
1257   return 0;
1258 }
1259
1260 /**
1261  * Accept a stream session. Optionally ping the server by callback.
1262  */
1263 int
1264 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1265                        u32 thread_index, u8 notify)
1266 {
1267   session_t *s;
1268   int rv;
1269
1270   s = session_alloc_for_connection (tc);
1271   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1272   s->session_state = SESSION_STATE_CREATED;
1273
1274   if ((rv = app_worker_init_accepted (s)))
1275     {
1276       session_free (s);
1277       return rv;
1278     }
1279
1280   session_lookup_add_connection (tc, session_handle (s));
1281
1282   /* Shoulder-tap the server */
1283   if (notify)
1284     {
1285       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1286       if ((rv = app_worker_accept_notify (app_wrk, s)))
1287         {
1288           session_lookup_del_session (s);
1289           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1290           session_free (s);
1291           return rv;
1292         }
1293     }
1294
1295   return 0;
1296 }
1297
1298 int
1299 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1300                       u32 thread_index)
1301 {
1302   app_worker_t *app_wrk;
1303   session_t *s;
1304   int rv;
1305
1306   s = session_alloc_for_connection (tc);
1307   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1308
1309   if ((rv = app_worker_init_accepted (s)))
1310     {
1311       session_free (s);
1312       return rv;
1313     }
1314
1315   session_lookup_add_connection (tc, session_handle (s));
1316   s->session_state = SESSION_STATE_ACCEPTING;
1317
1318   app_wrk = app_worker_get (s->app_wrk_index);
1319   if ((rv = app_worker_accept_notify (app_wrk, s)))
1320     {
1321       session_lookup_del_session (s);
1322       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1323       session_free (s);
1324       return rv;
1325     }
1326
1327   return 0;
1328 }
1329
1330 int
1331 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1332 {
1333   transport_connection_t *tc;
1334   transport_endpoint_cfg_t *tep;
1335   app_worker_t *app_wrk;
1336   session_handle_t sh;
1337   session_t *s;
1338   int rv;
1339
1340   tep = session_endpoint_to_transport_cfg (rmt);
1341   rv = transport_connect (rmt->transport_proto, tep);
1342   if (rv < 0)
1343     {
1344       SESSION_DBG ("Transport failed to open connection.");
1345       return rv;
1346     }
1347
1348   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1349
1350   /* For dgram type of service, allocate session and fifos now */
1351   app_wrk = app_worker_get (rmt->app_wrk_index);
1352   s = session_alloc_for_connection (tc);
1353   s->app_wrk_index = app_wrk->wrk_index;
1354   s->session_state = SESSION_STATE_OPENED;
1355   if (app_worker_init_connected (app_wrk, s))
1356     {
1357       session_free (s);
1358       return -1;
1359     }
1360
1361   sh = session_handle (s);
1362   *rsh = sh;
1363
1364   session_lookup_add_connection (tc, sh);
1365   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1366 }
1367
1368 int
1369 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1370 {
1371   transport_connection_t *tc;
1372   transport_endpoint_cfg_t *tep;
1373   app_worker_t *app_wrk;
1374   session_t *ho;
1375   int rv;
1376
1377   tep = session_endpoint_to_transport_cfg (rmt);
1378   rv = transport_connect (rmt->transport_proto, tep);
1379   if (rv < 0)
1380     {
1381       SESSION_DBG ("Transport failed to open connection.");
1382       return rv;
1383     }
1384
1385   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1386
1387   app_wrk = app_worker_get (rmt->app_wrk_index);
1388
1389   /* If transport offers a vc service, only allocate established
1390    * session once the connection has been established.
1391    * In the meantime allocate half-open session for tracking purposes
1392    * associate half-open connection to it and add session to app-worker
1393    * half-open table. These are needed to allocate the established
1394    * session on transport notification, and to cleanup the half-open
1395    * session if the app detaches before connection establishment.
1396    */
1397   ho = session_alloc_for_half_open (tc);
1398   ho->app_wrk_index = app_wrk->wrk_index;
1399   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1400   ho->opaque = rmt->opaque;
1401   *rsh = session_handle (ho);
1402
1403   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1404     session_lookup_add_half_open (tc, tc->c_index);
1405
1406   return 0;
1407 }
1408
1409 int
1410 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1411 {
1412   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1413
1414   /* Not supported for now */
1415   *rsh = SESSION_INVALID_HANDLE;
1416   return transport_connect (rmt->transport_proto, tep_cfg);
1417 }
1418
1419 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1420                                         session_handle_t *);
1421
1422 /* *INDENT-OFF* */
1423 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1424   session_open_vc,
1425   session_open_cl,
1426   session_open_app,
1427 };
1428 /* *INDENT-ON* */
1429
1430 /**
1431  * Ask transport to open connection to remote transport endpoint.
1432  *
1433  * Stores handle for matching request with reply since the call can be
1434  * asynchronous. For instance, for TCP the 3-way handshake must complete
1435  * before reply comes. Session is only created once connection is established.
1436  *
1437  * @param app_index Index of the application requesting the connect
1438  * @param st Session type requested.
1439  * @param tep Remote transport endpoint
1440  * @param opaque Opaque data (typically, api_context) the application expects
1441  *               on open completion.
1442  */
1443 int
1444 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1445 {
1446   transport_service_type_t tst;
1447   tst = transport_protocol_service_type (rmt->transport_proto);
1448   return session_open_srv_fns[tst](rmt, rsh);
1449 }
1450
1451 /**
1452  * Ask transport to listen on session endpoint.
1453  *
1454  * @param s Session for which listen will be called. Note that unlike
1455  *          established sessions, listen sessions are not associated to a
1456  *          thread.
1457  * @param sep Local endpoint to be listened on.
1458  */
1459 int
1460 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1461 {
1462   transport_endpoint_cfg_t *tep;
1463   int tc_index;
1464   u32 s_index;
1465
1466   /* Transport bind/listen */
1467   tep = session_endpoint_to_transport_cfg (sep);
1468   s_index = ls->session_index;
1469   tc_index = transport_start_listen (session_get_transport_proto (ls),
1470                                      s_index, tep);
1471
1472   if (tc_index < 0)
1473     return tc_index;
1474
1475   /* Attach transport to session. Lookup tables are populated by the app
1476    * worker because local tables (for ct sessions) are not backed by a fib */
1477   ls = listen_session_get (s_index);
1478   ls->connection_index = tc_index;
1479
1480   return 0;
1481 }
1482
1483 /**
1484  * Ask transport to stop listening on local transport endpoint.
1485  *
1486  * @param s Session to stop listening on. It must be in state LISTENING.
1487  */
1488 int
1489 session_stop_listen (session_t * s)
1490 {
1491   transport_proto_t tp = session_get_transport_proto (s);
1492   transport_connection_t *tc;
1493
1494   if (s->session_state != SESSION_STATE_LISTENING)
1495     return SESSION_E_NOLISTEN;
1496
1497   tc = transport_get_listener (tp, s->connection_index);
1498
1499   /* If no transport, assume everything was cleaned up already */
1500   if (!tc)
1501     return SESSION_E_NONE;
1502
1503   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1504     session_lookup_del_connection (tc);
1505
1506   transport_stop_listen (tp, s->connection_index);
1507   return 0;
1508 }
1509
1510 /**
1511  * Initialize session half-closing procedure.
1512  *
1513  * Note that half-closing will not change the state of the session.
1514  */
1515 void
1516 session_half_close (session_t *s)
1517 {
1518   if (!s)
1519     return;
1520
1521   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1522 }
1523
1524 /**
1525  * Initialize session closing procedure.
1526  *
1527  * Request is always sent to session node to ensure that all outstanding
1528  * requests are served before transport is notified.
1529  */
1530 void
1531 session_close (session_t * s)
1532 {
1533   if (!s)
1534     return;
1535
1536   if (s->session_state >= SESSION_STATE_CLOSING)
1537     {
1538       /* Session will only be removed once both app and transport
1539        * acknowledge the close */
1540       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1541           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1542         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1543       return;
1544     }
1545
1546   /* App closed so stop propagating dequeue notifications */
1547   svm_fifo_clear_deq_ntf (s->tx_fifo);
1548   s->session_state = SESSION_STATE_CLOSING;
1549   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1550 }
1551
1552 /**
1553  * Force a close without waiting for data to be flushed
1554  */
1555 void
1556 session_reset (session_t * s)
1557 {
1558   if (s->session_state >= SESSION_STATE_CLOSING)
1559     return;
1560   /* Drop all outstanding tx data */
1561   svm_fifo_dequeue_drop_all (s->tx_fifo);
1562   s->session_state = SESSION_STATE_CLOSING;
1563   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1564 }
1565
1566 /**
1567  * Notify transport the session can be half-disconnected.
1568  *
1569  * Must be called from the session's thread.
1570  */
1571 void
1572 session_transport_half_close (session_t *s)
1573 {
1574   /* Only READY session can be half-closed */
1575   if (s->session_state != SESSION_STATE_READY)
1576     {
1577       return;
1578     }
1579
1580   transport_half_close (session_get_transport_proto (s), s->connection_index,
1581                         s->thread_index);
1582 }
1583
1584 /**
1585  * Notify transport the session can be disconnected. This should eventually
1586  * result in a delete notification that allows us to cleanup session state.
1587  * Called for both active/passive disconnects.
1588  *
1589  * Must be called from the session's thread.
1590  */
1591 void
1592 session_transport_close (session_t * s)
1593 {
1594   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1595     {
1596       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1597         s->session_state = SESSION_STATE_CLOSED;
1598       /* If transport is already deleted, just free the session */
1599       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1600         session_free_w_fifos (s);
1601       return;
1602     }
1603
1604   /* If the tx queue wasn't drained, the transport can continue to try
1605    * sending the outstanding data (in closed state it cannot). It MUST however
1606    * at one point, either after sending everything or after a timeout, call
1607    * delete notify. This will finally lead to the complete cleanup of the
1608    * session.
1609    */
1610   s->session_state = SESSION_STATE_APP_CLOSED;
1611
1612   transport_close (session_get_transport_proto (s), s->connection_index,
1613                    s->thread_index);
1614 }
1615
1616 /**
1617  * Force transport close
1618  */
1619 void
1620 session_transport_reset (session_t * s)
1621 {
1622   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1623     {
1624       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1625         s->session_state = SESSION_STATE_CLOSED;
1626       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1627         session_free_w_fifos (s);
1628       return;
1629     }
1630
1631   s->session_state = SESSION_STATE_APP_CLOSED;
1632   transport_reset (session_get_transport_proto (s), s->connection_index,
1633                    s->thread_index);
1634 }
1635
1636 /**
1637  * Cleanup transport and session state.
1638  *
1639  * Notify transport of the cleanup and free the session. This should
1640  * be called only if transport reported some error and is already
1641  * closed.
1642  */
1643 void
1644 session_transport_cleanup (session_t * s)
1645 {
1646   /* Delete from main lookup table before we axe the the transport */
1647   session_lookup_del_session (s);
1648   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1649     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1650                        s->thread_index);
1651   /* Since we called cleanup, no delete notification will come. So, make
1652    * sure the session is properly freed. */
1653   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1654   session_free (s);
1655 }
1656
1657 /**
1658  * Allocate worker mqs in share-able segment
1659  *
1660  * That can only be a newly created memfd segment, that must be mapped
1661  * by all apps/stack users unless private rx mqs are enabled.
1662  */
1663 void
1664 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1665 {
1666   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1667   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1668   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1669   uword mqs_seg_size;
1670   int i;
1671
1672   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1673
1674   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1675     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1676   };
1677   cfg->consumer_pid = 0;
1678   cfg->n_rings = 2;
1679   cfg->q_nitems = mq_q_length;
1680   cfg->ring_cfgs = rc;
1681
1682   /*
1683    * Compute mqs segment size based on rings config and leave space
1684    * for passing extended configuration messages, i.e., data allocated
1685    * outside of the rings. If provided with a config value, accept it
1686    * if larger than minimum size.
1687    */
1688   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1689   mqs_seg_size = mqs_seg_size + (1 << 20);
1690   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1691
1692   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1693   mqs_seg->ssvm.my_pid = getpid ();
1694   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1695
1696   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1697     {
1698       clib_warning ("failed to initialize queue segment");
1699       return;
1700     }
1701
1702   fifo_segment_init (mqs_seg);
1703
1704   /* Special fifo segment that's filled only with mqs */
1705   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1706
1707   for (i = 0; i < vec_len (smm->wrk); i++)
1708     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1709 }
1710
1711 fifo_segment_t *
1712 session_main_get_wrk_mqs_segment (void)
1713 {
1714   return &session_main.wrk_mqs_segment;
1715 }
1716
1717 u64
1718 session_segment_handle (session_t * s)
1719 {
1720   svm_fifo_t *f;
1721
1722   if (!s->rx_fifo)
1723     return SESSION_INVALID_HANDLE;
1724
1725   f = s->rx_fifo;
1726   return segment_manager_make_segment_handle (f->segment_manager,
1727                                               f->segment_index);
1728 }
1729
1730 /* *INDENT-OFF* */
1731 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1732     session_tx_fifo_peek_and_snd,
1733     session_tx_fifo_dequeue_and_snd,
1734     session_tx_fifo_dequeue_internal,
1735     session_tx_fifo_dequeue_and_snd
1736 };
1737 /* *INDENT-ON* */
1738
1739 void
1740 session_register_transport (transport_proto_t transport_proto,
1741                             const transport_proto_vft_t * vft, u8 is_ip4,
1742                             u32 output_node)
1743 {
1744   session_main_t *smm = &session_main;
1745   session_type_t session_type;
1746   u32 next_index = ~0;
1747
1748   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1749
1750   vec_validate (smm->session_type_to_next, session_type);
1751   vec_validate (smm->session_tx_fns, session_type);
1752
1753   if (output_node != ~0)
1754     next_index = vlib_node_add_next (vlib_get_main (),
1755                                      session_queue_node.index, output_node);
1756
1757   smm->session_type_to_next[session_type] = next_index;
1758   smm->session_tx_fns[session_type] =
1759     session_tx_fns[vft->transport_options.tx_type];
1760 }
1761
1762 void
1763 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1764 {
1765   session_main_t *smm = &session_main;
1766   session_update_time_fn *fi;
1767   u32 fi_pos = ~0;
1768   u8 found = 0;
1769
1770   vec_foreach (fi, smm->update_time_fns)
1771     {
1772       if (*fi == fn)
1773         {
1774           fi_pos = fi - smm->update_time_fns;
1775           found = 1;
1776           break;
1777         }
1778     }
1779
1780   if (is_add)
1781     {
1782       if (found)
1783         {
1784           clib_warning ("update time fn %p already registered", fn);
1785           return;
1786         }
1787       vec_add1 (smm->update_time_fns, fn);
1788     }
1789   else
1790     {
1791       vec_del1 (smm->update_time_fns, fi_pos);
1792     }
1793 }
1794
1795 transport_proto_t
1796 session_add_transport_proto (void)
1797 {
1798   session_main_t *smm = &session_main;
1799   session_worker_t *wrk;
1800   u32 thread;
1801
1802   smm->last_transport_proto_type += 1;
1803
1804   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1805     {
1806       wrk = session_main_get_worker (thread);
1807       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1808     }
1809
1810   return smm->last_transport_proto_type;
1811 }
1812
1813 transport_connection_t *
1814 session_get_transport (session_t * s)
1815 {
1816   if (s->session_state != SESSION_STATE_LISTENING)
1817     return transport_get_connection (session_get_transport_proto (s),
1818                                      s->connection_index, s->thread_index);
1819   else
1820     return transport_get_listener (session_get_transport_proto (s),
1821                                    s->connection_index);
1822 }
1823
1824 void
1825 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1826 {
1827   if (s->session_state != SESSION_STATE_LISTENING)
1828     return transport_get_endpoint (session_get_transport_proto (s),
1829                                    s->connection_index, s->thread_index, tep,
1830                                    is_lcl);
1831   else
1832     return transport_get_listener_endpoint (session_get_transport_proto (s),
1833                                             s->connection_index, tep, is_lcl);
1834 }
1835
1836 int
1837 session_transport_attribute (session_t *s, u8 is_get,
1838                              transport_endpt_attr_t *attr)
1839 {
1840   if (s->session_state < SESSION_STATE_READY)
1841     return -1;
1842
1843   return transport_connection_attribute (session_get_transport_proto (s),
1844                                          s->connection_index, s->thread_index,
1845                                          is_get, attr);
1846 }
1847
1848 transport_connection_t *
1849 listen_session_get_transport (session_t * s)
1850 {
1851   return transport_get_listener (session_get_transport_proto (s),
1852                                  s->connection_index);
1853 }
1854
1855 void
1856 session_queue_run_on_main_thread (vlib_main_t * vm)
1857 {
1858   ASSERT (vlib_get_thread_index () == 0);
1859   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1860 }
1861
1862 static void
1863 session_stats_collector_fn (vlib_stats_collector_data_t *d)
1864 {
1865   u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1866   session_main_t *smm = &session_main;
1867   session_worker_t *wrk;
1868   counter_t **counters;
1869   counter_t *cb;
1870
1871   n_workers = vec_len (smm->wrk);
1872   vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1873   counters = d->entry->data;
1874   cb = counters[0];
1875
1876   for (i = 0; i < vec_len (smm->wrk); i++)
1877     {
1878       wrk = session_main_get_worker (i);
1879       n_wrk_sessions = pool_elts (wrk->sessions);
1880       cb[i] = n_wrk_sessions;
1881       n_sessions += n_wrk_sessions;
1882     }
1883
1884   vlib_stats_set_gauge (d->private_data, n_sessions);
1885 }
1886
1887 static void
1888 session_stats_collector_init (void)
1889 {
1890   vlib_stats_collector_reg_t reg = {};
1891
1892   reg.entry_index =
1893     vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1894   reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1895   reg.collect_fn = session_stats_collector_fn;
1896   vlib_stats_register_collector_fn (&reg);
1897   vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1898 }
1899
1900 static clib_error_t *
1901 session_manager_main_enable (vlib_main_t * vm)
1902 {
1903   session_main_t *smm = &session_main;
1904   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1905   u32 num_threads, preallocated_sessions_per_worker;
1906   session_worker_t *wrk;
1907   int i;
1908
1909   /* We only initialize once and do not de-initialized on disable */
1910   if (smm->is_initialized)
1911     goto done;
1912
1913   num_threads = 1 /* main thread */  + vtm->n_threads;
1914
1915   if (num_threads < 1)
1916     return clib_error_return (0, "n_thread_stacks not set");
1917
1918   /* Allocate cache line aligned worker contexts */
1919   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1920   clib_spinlock_init (&session_main.pool_realloc_lock);
1921
1922   for (i = 0; i < num_threads; i++)
1923     {
1924       wrk = &smm->wrk[i];
1925       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1926       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1927       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1928       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1929       wrk->evts_pending_main =
1930         clib_llist_make_head (wrk->event_elts, evt_list);
1931       wrk->vm = vlib_get_main_by_index (i);
1932       wrk->last_vlib_time = vlib_time_now (vm);
1933       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1934       wrk->timerfd = -1;
1935       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1936
1937       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1938         session_wrk_enable_adaptive_mode (wrk);
1939     }
1940
1941   /* Allocate vpp event queues segment and queue */
1942   session_vpp_wrk_mqs_alloc (smm);
1943
1944   /* Initialize segment manager properties */
1945   segment_manager_main_init ();
1946
1947   /* Preallocate sessions */
1948   if (smm->preallocated_sessions)
1949     {
1950       if (num_threads == 1)
1951         {
1952           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1953         }
1954       else
1955         {
1956           int j;
1957           preallocated_sessions_per_worker =
1958             (1.1 * (f64) smm->preallocated_sessions /
1959              (f64) (num_threads - 1));
1960
1961           for (j = 1; j < num_threads; j++)
1962             {
1963               pool_init_fixed (smm->wrk[j].sessions,
1964                                preallocated_sessions_per_worker);
1965             }
1966         }
1967     }
1968
1969   session_lookup_init ();
1970   app_namespaces_init ();
1971   transport_init ();
1972   session_stats_collector_init ();
1973   smm->is_initialized = 1;
1974
1975 done:
1976
1977   smm->is_enabled = 1;
1978
1979   /* Enable transports */
1980   transport_enable_disable (vm, 1);
1981   session_debug_init ();
1982
1983   return 0;
1984 }
1985
1986 static void
1987 session_manager_main_disable (vlib_main_t * vm)
1988 {
1989   transport_enable_disable (vm, 0 /* is_en */ );
1990 }
1991
1992 /* in this new callback, cookie hint the index */
1993 void
1994 session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
1995 {
1996   session_worker_t *wrk;
1997   wrk = session_main_get_worker (vm->thread_index);
1998   session_dma_transfer *dma_transfer;
1999
2000   dma_transfer = &wrk->dma_trans[wrk->trans_head];
2001   vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2002            vec_len (dma_transfer->pending_tx_buffers));
2003   vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2004            vec_len (dma_transfer->pending_tx_nexts));
2005   vec_reset_length (dma_transfer->pending_tx_buffers);
2006   vec_reset_length (dma_transfer->pending_tx_nexts);
2007   wrk->trans_head++;
2008   if (wrk->trans_head == wrk->trans_size)
2009     wrk->trans_head = 0;
2010   return;
2011 }
2012
2013 static void
2014 session_prepare_dma_args (vlib_dma_config_t *args)
2015 {
2016   args->max_transfers = DMA_TRANS_SIZE;
2017   args->max_transfer_size = 65536;
2018   args->features = 0;
2019   args->sw_fallback = 1;
2020   args->barrier_before_last = 1;
2021   args->callback_fn = session_dma_completion_cb;
2022 }
2023
2024 static void
2025 session_node_enable_dma (u8 is_en, int n_vlibs)
2026 {
2027   vlib_dma_config_t args;
2028   session_prepare_dma_args (&args);
2029   session_worker_t *wrk;
2030   vlib_main_t *vm;
2031
2032   int config_index = -1;
2033
2034   if (is_en)
2035     {
2036       vm = vlib_get_main_by_index (0);
2037       config_index = vlib_dma_config_add (vm, &args);
2038     }
2039   else
2040     {
2041       vm = vlib_get_main_by_index (0);
2042       wrk = session_main_get_worker (0);
2043       if (wrk->config_index >= 0)
2044         vlib_dma_config_del (vm, wrk->config_index);
2045     }
2046   int i;
2047   for (i = 0; i < n_vlibs; i++)
2048     {
2049       vm = vlib_get_main_by_index (i);
2050       wrk = session_main_get_worker (vm->thread_index);
2051       wrk->config_index = config_index;
2052       if (is_en)
2053         {
2054           if (config_index >= 0)
2055             wrk->dma_enabled = true;
2056           wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2057             sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2058           bzero (wrk->dma_trans,
2059                  sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2060         }
2061       else
2062         {
2063           if (wrk->dma_trans)
2064             clib_mem_free (wrk->dma_trans);
2065         }
2066       wrk->trans_head = 0;
2067       wrk->trans_tail = 0;
2068       wrk->trans_size = DMA_TRANS_SIZE;
2069     }
2070 }
2071
2072 void
2073 session_node_enable_disable (u8 is_en)
2074 {
2075   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2076   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2077   session_main_t *sm = &session_main;
2078   vlib_main_t *vm;
2079   vlib_node_t *n;
2080   int n_vlibs, i;
2081
2082   n_vlibs = vlib_get_n_threads ();
2083   for (i = 0; i < n_vlibs; i++)
2084     {
2085       vm = vlib_get_main_by_index (i);
2086       /* main thread with workers and not polling */
2087       if (i == 0 && n_vlibs > 1)
2088         {
2089           vlib_node_set_state (vm, session_queue_node.index, mstate);
2090           if (is_en)
2091             {
2092               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2093               vlib_node_set_state (vm, session_queue_process_node.index,
2094                                    state);
2095               n = vlib_get_node (vm, session_queue_process_node.index);
2096               vlib_start_process (vm, n->runtime_index);
2097             }
2098           else
2099             {
2100               vlib_process_signal_event_mt (vm,
2101                                             session_queue_process_node.index,
2102                                             SESSION_Q_PROCESS_STOP, 0);
2103             }
2104           if (!sm->poll_main)
2105             continue;
2106         }
2107       vlib_node_set_state (vm, session_queue_node.index, state);
2108     }
2109
2110   if (sm->use_private_rx_mqs)
2111     application_enable_rx_mqs_nodes (is_en);
2112
2113   if (sm->dma_enabled)
2114     session_node_enable_dma (is_en, n_vlibs);
2115 }
2116
2117 clib_error_t *
2118 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2119 {
2120   clib_error_t *error = 0;
2121   if (is_en)
2122     {
2123       if (session_main.is_enabled)
2124         return 0;
2125
2126       error = session_manager_main_enable (vm);
2127       session_node_enable_disable (is_en);
2128     }
2129   else
2130     {
2131       session_main.is_enabled = 0;
2132       session_manager_main_disable (vm);
2133       session_node_enable_disable (is_en);
2134     }
2135
2136   return error;
2137 }
2138
2139 clib_error_t *
2140 session_main_init (vlib_main_t * vm)
2141 {
2142   session_main_t *smm = &session_main;
2143
2144   smm->is_enabled = 0;
2145   smm->session_enable_asap = 0;
2146   smm->poll_main = 0;
2147   smm->use_private_rx_mqs = 0;
2148   smm->no_adaptive = 0;
2149   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2150
2151   return 0;
2152 }
2153
2154 static clib_error_t *
2155 session_main_loop_init (vlib_main_t * vm)
2156 {
2157   session_main_t *smm = &session_main;
2158   if (smm->session_enable_asap)
2159     {
2160       vlib_worker_thread_barrier_sync (vm);
2161       vnet_session_enable_disable (vm, 1 /* is_en */ );
2162       vlib_worker_thread_barrier_release (vm);
2163     }
2164   return 0;
2165 }
2166
2167 VLIB_INIT_FUNCTION (session_main_init);
2168 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2169
2170 static clib_error_t *
2171 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2172 {
2173   session_main_t *smm = &session_main;
2174   u32 nitems;
2175   uword tmp;
2176
2177   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2178     {
2179       if (unformat (input, "wrk-mq-length %d", &nitems))
2180         {
2181           if (nitems >= 2048)
2182             smm->configured_wrk_mq_length = nitems;
2183           else
2184             clib_warning ("event queue length %d too small, ignored", nitems);
2185         }
2186       else if (unformat (input, "wrk-mqs-segment-size %U",
2187                          unformat_memory_size, &smm->wrk_mqs_segment_size))
2188         ;
2189       else if (unformat (input, "preallocated-sessions %d",
2190                          &smm->preallocated_sessions))
2191         ;
2192       else if (unformat (input, "v4-session-table-buckets %d",
2193                          &smm->configured_v4_session_table_buckets))
2194         ;
2195       else if (unformat (input, "v4-halfopen-table-buckets %d",
2196                          &smm->configured_v4_halfopen_table_buckets))
2197         ;
2198       else if (unformat (input, "v6-session-table-buckets %d",
2199                          &smm->configured_v6_session_table_buckets))
2200         ;
2201       else if (unformat (input, "v6-halfopen-table-buckets %d",
2202                          &smm->configured_v6_halfopen_table_buckets))
2203         ;
2204       else if (unformat (input, "v4-session-table-memory %U",
2205                          unformat_memory_size, &tmp))
2206         {
2207           if (tmp >= 0x100000000)
2208             return clib_error_return (0, "memory size %llx (%lld) too large",
2209                                       tmp, tmp);
2210           smm->configured_v4_session_table_memory = tmp;
2211         }
2212       else if (unformat (input, "v4-halfopen-table-memory %U",
2213                          unformat_memory_size, &tmp))
2214         {
2215           if (tmp >= 0x100000000)
2216             return clib_error_return (0, "memory size %llx (%lld) too large",
2217                                       tmp, tmp);
2218           smm->configured_v4_halfopen_table_memory = tmp;
2219         }
2220       else if (unformat (input, "v6-session-table-memory %U",
2221                          unformat_memory_size, &tmp))
2222         {
2223           if (tmp >= 0x100000000)
2224             return clib_error_return (0, "memory size %llx (%lld) too large",
2225                                       tmp, tmp);
2226           smm->configured_v6_session_table_memory = tmp;
2227         }
2228       else if (unformat (input, "v6-halfopen-table-memory %U",
2229                          unformat_memory_size, &tmp))
2230         {
2231           if (tmp >= 0x100000000)
2232             return clib_error_return (0, "memory size %llx (%lld) too large",
2233                                       tmp, tmp);
2234           smm->configured_v6_halfopen_table_memory = tmp;
2235         }
2236       else if (unformat (input, "local-endpoints-table-memory %U",
2237                          unformat_memory_size, &tmp))
2238         {
2239           if (tmp >= 0x100000000)
2240             return clib_error_return (0, "memory size %llx (%lld) too large",
2241                                       tmp, tmp);
2242           smm->local_endpoints_table_memory = tmp;
2243         }
2244       else if (unformat (input, "local-endpoints-table-buckets %d",
2245                          &smm->local_endpoints_table_buckets))
2246         ;
2247       else if (unformat (input, "enable"))
2248         smm->session_enable_asap = 1;
2249       else if (unformat (input, "use-app-socket-api"))
2250         (void) appns_sapi_enable_disable (1 /* is_enable */);
2251       else if (unformat (input, "poll-main"))
2252         smm->poll_main = 1;
2253       else if (unformat (input, "use-private-rx-mqs"))
2254         smm->use_private_rx_mqs = 1;
2255       else if (unformat (input, "no-adaptive"))
2256         smm->no_adaptive = 1;
2257       else if (unformat (input, "use-dma"))
2258         smm->dma_enabled = 1;
2259       /*
2260        * Deprecated but maintained for compatibility
2261        */
2262       else if (unformat (input, "evt_qs_memfd_seg"))
2263         ;
2264       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2265         ;
2266       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2267                          &smm->wrk_mqs_segment_size))
2268         ;
2269       else if (unformat (input, "event-queue-length %d", &nitems))
2270         {
2271           if (nitems >= 2048)
2272             smm->configured_wrk_mq_length = nitems;
2273           else
2274             clib_warning ("event queue length %d too small, ignored", nitems);
2275         }
2276       else
2277         return clib_error_return (0, "unknown input `%U'",
2278                                   format_unformat_error, input);
2279     }
2280   return 0;
2281 }
2282
2283 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2284
2285 /*
2286  * fd.io coding-style-patch-verification: ON
2287  *
2288  * Local Variables:
2289  * eval: (c-set-style "gnu")
2290  * End:
2291  */