session: move connects to first worker
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24 #include <vlib/stats/stats.h>
25 #include <vlib/dma/dma.h>
26
27 session_main_t session_main;
28
29 static inline int
30 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
31                             session_evt_type_t evt_type)
32 {
33   session_worker_t *wrk = session_main_get_worker (thread_index);
34   session_event_t *evt;
35   svm_msg_q_msg_t msg;
36   svm_msg_q_t *mq;
37
38   mq = wrk->vpp_event_queue;
39   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
40     return -1;
41   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
42     {
43       svm_msg_q_unlock (mq);
44       return -2;
45     }
46   switch (evt_type)
47     {
48     case SESSION_CTRL_EVT_RPC:
49       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
50       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
51       evt->rpc_args.fp = data;
52       evt->rpc_args.arg = args;
53       break;
54     case SESSION_IO_EVT_RX:
55     case SESSION_IO_EVT_TX:
56     case SESSION_IO_EVT_TX_FLUSH:
57     case SESSION_IO_EVT_BUILTIN_RX:
58       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
59       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
60       evt->session_index = *(u32 *) data;
61       break;
62     case SESSION_IO_EVT_BUILTIN_TX:
63     case SESSION_CTRL_EVT_CLOSE:
64     case SESSION_CTRL_EVT_RESET:
65       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
66       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
67       evt->session_handle = session_handle ((session_t *) data);
68       break;
69     default:
70       clib_warning ("evt unhandled!");
71       svm_msg_q_unlock (mq);
72       return -1;
73     }
74   evt->event_type = evt_type;
75
76   svm_msg_q_add_and_unlock (mq, &msg);
77
78   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
79     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
80
81   return 0;
82 }
83
84 int
85 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
86 {
87   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
88                                      f->master_thread_index, evt_type);
89 }
90
91 int
92 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
93                                       session_evt_type_t evt_type)
94 {
95   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
96 }
97
98 int
99 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
100 {
101   /* only events supported are disconnect, shutdown and reset */
102   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
104           evt_type == SESSION_CTRL_EVT_RESET);
105   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 void
129 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
130 {
131   session_t *s = session_get (tc->s_index, tc->thread_index);
132
133   ASSERT (s->thread_index == vlib_get_thread_index ());
134   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
135
136   if (!(s->flags & SESSION_F_CUSTOM_TX))
137     {
138       s->flags |= SESSION_F_CUSTOM_TX;
139       if (svm_fifo_set_event (s->tx_fifo)
140           || transport_connection_is_descheduled (tc))
141         {
142           session_evt_elt_t *elt;
143           session_worker_t *wrk;
144
145           wrk = session_main_get_worker (tc->thread_index);
146           if (has_prio)
147             elt = session_evt_alloc_new (wrk);
148           else
149             elt = session_evt_alloc_old (wrk);
150           elt->evt.session_index = tc->s_index;
151           elt->evt.event_type = SESSION_IO_EVT_TX;
152           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
153
154           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
155             vlib_node_set_interrupt_pending (wrk->vm,
156                                              session_queue_node.index);
157         }
158     }
159 }
160
161 void
162 sesssion_reschedule_tx (transport_connection_t * tc)
163 {
164   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
165   session_evt_elt_t *elt;
166
167   ASSERT (tc->thread_index == vlib_get_thread_index ());
168
169   elt = session_evt_alloc_new (wrk);
170   elt->evt.session_index = tc->s_index;
171   elt->evt.event_type = SESSION_IO_EVT_TX;
172
173   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
174     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
175 }
176
177 static void
178 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
179 {
180   u32 thread_index = vlib_get_thread_index ();
181   session_evt_elt_t *elt;
182   session_worker_t *wrk;
183
184   /* If we are in the handler thread, or being called with the worker barrier
185    * held, just append a new event to pending disconnects vector. */
186   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
187     {
188       wrk = session_main_get_worker (s->thread_index);
189       elt = session_evt_alloc_ctrl (wrk);
190       clib_memset (&elt->evt, 0, sizeof (session_event_t));
191       elt->evt.session_handle = session_handle (s);
192       elt->evt.event_type = evt;
193
194       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
195         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
196     }
197   else
198     session_send_ctrl_evt_to_thread (s, evt);
199 }
200
201 session_t *
202 session_alloc (u32 thread_index)
203 {
204   session_worker_t *wrk = &session_main.wrk[thread_index];
205   session_t *s;
206
207   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
208   clib_memset (s, 0, sizeof (*s));
209   s->session_index = s - wrk->sessions;
210   s->thread_index = thread_index;
211   s->app_index = APP_INVALID_INDEX;
212
213   return s;
214 }
215
216 void
217 session_free (session_t * s)
218 {
219   session_worker_t *wrk = &session_main.wrk[s->thread_index];
220
221   SESSION_EVT (SESSION_EVT_FREE, s);
222   if (CLIB_DEBUG)
223     clib_memset (s, 0xFA, sizeof (*s));
224   pool_put (wrk->sessions, s);
225 }
226
227 u8
228 session_is_valid (u32 si, u8 thread_index)
229 {
230   session_t *s;
231   transport_connection_t *tc;
232
233   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
234
235   if (s->thread_index != thread_index || s->session_index != si)
236     return 0;
237
238   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
239       || s->session_state <= SESSION_STATE_LISTENING)
240     return 1;
241
242   if (s->session_state == SESSION_STATE_CONNECTING &&
243       (s->flags & SESSION_F_HALF_OPEN))
244     return 1;
245
246   tc = session_get_transport (s);
247   if (s->connection_index != tc->c_index
248       || s->thread_index != tc->thread_index || tc->s_index != si)
249     return 0;
250
251   return 1;
252 }
253
254 static void
255 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
256 {
257   app_worker_t *app_wrk;
258
259   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
260   if (!app_wrk)
261     return;
262   app_worker_cleanup_notify (app_wrk, s, ntf);
263 }
264
265 void
266 session_free_w_fifos (session_t * s)
267 {
268   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
269   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
270   session_free (s);
271 }
272
273 /**
274  * Cleans up session and lookup table.
275  *
276  * Transport connection must still be valid.
277  */
278 static void
279 session_delete (session_t * s)
280 {
281   int rv;
282
283   /* Delete from the main lookup table. */
284   if ((rv = session_lookup_del_session (s)))
285     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
286
287   session_free_w_fifos (s);
288 }
289
290 void
291 session_cleanup_half_open (session_handle_t ho_handle)
292 {
293   session_t *ho = session_get_from_handle (ho_handle);
294
295   /* App transports can migrate their half-opens */
296   if (ho->flags & SESSION_F_IS_MIGRATING)
297     {
298       /* Session still migrating, move to closed state to signal that the
299        * session should be removed. */
300       if (ho->connection_index == ~0)
301         {
302           session_set_state (ho, SESSION_STATE_CLOSED);
303           return;
304         }
305       /* Migrated transports are no longer half-opens */
306       transport_cleanup (session_get_transport_proto (ho),
307                          ho->connection_index, ho->app_index /* overloaded */);
308     }
309   else
310     transport_cleanup_half_open (session_get_transport_proto (ho),
311                                  ho->connection_index);
312   session_free (ho);
313 }
314
315 static void
316 session_half_open_free (session_t *ho)
317 {
318   app_worker_t *app_wrk;
319
320   ASSERT (vlib_get_thread_index () <= 1);
321   app_wrk = app_worker_get (ho->app_wrk_index);
322   app_worker_del_half_open (app_wrk, ho);
323   session_free (ho);
324 }
325
326 static void
327 session_half_open_free_rpc (void *args)
328 {
329   session_t *ho = ho_session_get (pointer_to_uword (args));
330   session_half_open_free (ho);
331 }
332
333 void
334 session_half_open_delete_notify (transport_connection_t *tc)
335 {
336   /* Notification from ctrl thread accepted without rpc */
337   if (tc->thread_index == transport_cl_thread ())
338     {
339       session_half_open_free (ho_session_get (tc->s_index));
340     }
341   else
342     {
343       void *args = uword_to_pointer ((uword) tc->s_index, void *);
344       session_send_rpc_evt_to_thread_force (transport_cl_thread (),
345                                             session_half_open_free_rpc, args);
346     }
347 }
348
349 void
350 session_half_open_migrate_notify (transport_connection_t *tc)
351 {
352   session_t *ho;
353
354   ho = ho_session_get (tc->s_index);
355   ho->flags |= SESSION_F_IS_MIGRATING;
356   ho->connection_index = ~0;
357 }
358
359 int
360 session_half_open_migrated_notify (transport_connection_t *tc)
361 {
362   session_t *ho;
363
364   ho = ho_session_get (tc->s_index);
365
366   /* App probably detached so the half-open must be cleaned up */
367   if (ho->session_state == SESSION_STATE_CLOSED)
368     {
369       session_half_open_delete_notify (tc);
370       return -1;
371     }
372   ho->connection_index = tc->c_index;
373   /* Overload app index for half-open with new thread */
374   ho->app_index = tc->thread_index;
375   return 0;
376 }
377
378 session_t *
379 session_alloc_for_connection (transport_connection_t * tc)
380 {
381   session_t *s;
382   u32 thread_index = tc->thread_index;
383
384   ASSERT (thread_index == vlib_get_thread_index ()
385           || transport_protocol_is_cl (tc->proto));
386
387   s = session_alloc (thread_index);
388   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
389   session_set_state (s, SESSION_STATE_CLOSED);
390
391   /* Attach transport to session and vice versa */
392   s->connection_index = tc->c_index;
393   tc->s_index = s->session_index;
394   return s;
395 }
396
397 session_t *
398 session_alloc_for_half_open (transport_connection_t *tc)
399 {
400   session_t *s;
401
402   s = ho_session_alloc ();
403   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
404   s->connection_index = tc->c_index;
405   tc->s_index = s->session_index;
406   return s;
407 }
408
409 /**
410  * Discards bytes from buffer chain
411  *
412  * It discards n_bytes_to_drop starting at first buffer after chain_b
413  */
414 always_inline void
415 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
416                                      vlib_buffer_t ** chain_b,
417                                      u32 n_bytes_to_drop)
418 {
419   vlib_buffer_t *next = *chain_b;
420   u32 to_drop = n_bytes_to_drop;
421   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
422   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
423     {
424       next = vlib_get_buffer (vm, next->next_buffer);
425       if (next->current_length > to_drop)
426         {
427           vlib_buffer_advance (next, to_drop);
428           to_drop = 0;
429         }
430       else
431         {
432           to_drop -= next->current_length;
433           next->current_length = 0;
434         }
435     }
436   *chain_b = next;
437
438   if (to_drop == 0)
439     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
440 }
441
442 /**
443  * Enqueue buffer chain tail
444  */
445 always_inline int
446 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
447                             u32 offset, u8 is_in_order)
448 {
449   vlib_buffer_t *chain_b;
450   u32 chain_bi, len, diff;
451   vlib_main_t *vm = vlib_get_main ();
452   u8 *data;
453   u32 written = 0;
454   int rv = 0;
455
456   if (is_in_order && offset)
457     {
458       diff = offset - b->current_length;
459       if (diff > b->total_length_not_including_first_buffer)
460         return 0;
461       chain_b = b;
462       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
463       chain_bi = vlib_get_buffer_index (vm, chain_b);
464     }
465   else
466     chain_bi = b->next_buffer;
467
468   do
469     {
470       chain_b = vlib_get_buffer (vm, chain_bi);
471       data = vlib_buffer_get_current (chain_b);
472       len = chain_b->current_length;
473       if (!len)
474         continue;
475       if (is_in_order)
476         {
477           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
478           if (rv == len)
479             {
480               written += rv;
481             }
482           else if (rv < len)
483             {
484               return (rv > 0) ? (written + rv) : written;
485             }
486           else if (rv > len)
487             {
488               written += rv;
489
490               /* written more than what was left in chain */
491               if (written > b->total_length_not_including_first_buffer)
492                 return written;
493
494               /* drop the bytes that have already been delivered */
495               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
496             }
497         }
498       else
499         {
500           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
501           if (rv)
502             {
503               clib_warning ("failed to enqueue multi-buffer seg");
504               return -1;
505             }
506           offset += len;
507         }
508     }
509   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
510           ? chain_b->next_buffer : 0));
511
512   if (is_in_order)
513     return written;
514
515   return 0;
516 }
517
518 void
519 session_fifo_tuning (session_t * s, svm_fifo_t * f,
520                      session_ft_action_t act, u32 len)
521 {
522   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
523     {
524       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
525       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
526       if (CLIB_ASSERT_ENABLE)
527         {
528           segment_manager_t *sm;
529           sm = segment_manager_get (f->segment_manager);
530           ASSERT (f->shr->size >= 4096);
531           ASSERT (f->shr->size <= sm->max_fifo_size);
532         }
533     }
534 }
535
536 /*
537  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
538  * event but on request can queue notification events for later delivery by
539  * calling stream_server_flush_enqueue_events().
540  *
541  * @param tc Transport connection which is to be enqueued data
542  * @param b Buffer to be enqueued
543  * @param offset Offset at which to start enqueueing if out-of-order
544  * @param queue_event Flag to indicate if peer is to be notified or if event
545  *                    is to be queued. The former is useful when more data is
546  *                    enqueued and only one event is to be generated.
547  * @param is_in_order Flag to indicate if data is in order
548  * @return Number of bytes enqueued or a negative value if enqueueing failed.
549  */
550 int
551 session_enqueue_stream_connection (transport_connection_t * tc,
552                                    vlib_buffer_t * b, u32 offset,
553                                    u8 queue_event, u8 is_in_order)
554 {
555   session_t *s;
556   int enqueued = 0, rv, in_order_off;
557
558   s = session_get (tc->s_index, tc->thread_index);
559
560   if (is_in_order)
561     {
562       enqueued = svm_fifo_enqueue (s->rx_fifo,
563                                    b->current_length,
564                                    vlib_buffer_get_current (b));
565       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
566                          && enqueued >= 0))
567         {
568           in_order_off = enqueued > b->current_length ? enqueued : 0;
569           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
570           if (rv > 0)
571             enqueued += rv;
572         }
573     }
574   else
575     {
576       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
577                                          b->current_length,
578                                          vlib_buffer_get_current (b));
579       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
580         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
581       /* if something was enqueued, report even this as success for ooo
582        * segment handling */
583       return rv;
584     }
585
586   if (queue_event)
587     {
588       /* Queue RX event on this fifo. Eventually these will need to be flushed
589        * by calling stream_server_flush_enqueue_events () */
590       session_worker_t *wrk;
591
592       wrk = session_main_get_worker (s->thread_index);
593       if (!(s->flags & SESSION_F_RX_EVT))
594         {
595           s->flags |= SESSION_F_RX_EVT;
596           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
597         }
598
599       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
600     }
601
602   return enqueued;
603 }
604
605 int
606 session_enqueue_dgram_connection (session_t * s,
607                                   session_dgram_hdr_t * hdr,
608                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
609 {
610   int rv;
611
612   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
613           >= b->current_length + sizeof (*hdr));
614
615   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
616     {
617       /* *INDENT-OFF* */
618       svm_fifo_seg_t segs[2] = {
619           { (u8 *) hdr, sizeof (*hdr) },
620           { vlib_buffer_get_current (b), b->current_length }
621       };
622       /* *INDENT-ON* */
623
624       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
625                                       0 /* allow_partial */ );
626     }
627   else
628     {
629       vlib_main_t *vm = vlib_get_main ();
630       svm_fifo_seg_t *segs = 0, *seg;
631       vlib_buffer_t *it = b;
632       u32 n_segs = 1;
633
634       vec_add2 (segs, seg, 1);
635       seg->data = (u8 *) hdr;
636       seg->len = sizeof (*hdr);
637       while (it)
638         {
639           vec_add2 (segs, seg, 1);
640           seg->data = vlib_buffer_get_current (it);
641           seg->len = it->current_length;
642           n_segs++;
643           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
644             break;
645           it = vlib_get_buffer (vm, it->next_buffer);
646         }
647       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
648                                       0 /* allow partial */ );
649       vec_free (segs);
650     }
651
652   if (queue_event && rv > 0)
653     {
654       /* Queue RX event on this fifo. Eventually these will need to be flushed
655        * by calling stream_server_flush_enqueue_events () */
656       session_worker_t *wrk;
657
658       wrk = session_main_get_worker (s->thread_index);
659       if (!(s->flags & SESSION_F_RX_EVT))
660         {
661           s->flags |= SESSION_F_RX_EVT;
662           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
663         }
664
665       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
666     }
667   return rv > 0 ? rv : 0;
668 }
669
670 int
671 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
672                             u32 offset, u32 max_bytes)
673 {
674   session_t *s = session_get (tc->s_index, tc->thread_index);
675   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
676 }
677
678 u32
679 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
680 {
681   session_t *s = session_get (tc->s_index, tc->thread_index);
682   u32 rv;
683
684   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
685   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
686
687   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
688     session_dequeue_notify (s);
689
690   return rv;
691 }
692
693 static inline int
694 session_notify_subscribers (u32 app_index, session_t * s,
695                             svm_fifo_t * f, session_evt_type_t evt_type)
696 {
697   app_worker_t *app_wrk;
698   application_t *app;
699   int i;
700
701   app = application_get (app_index);
702   if (!app)
703     return -1;
704
705   for (i = 0; i < f->shr->n_subscribers; i++)
706     {
707       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
708       if (!app_wrk)
709         continue;
710       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
711         return -1;
712     }
713
714   return 0;
715 }
716
717 /**
718  * Notify session peer that new data has been enqueued.
719  *
720  * @param s     Stream session for which the event is to be generated.
721  * @param lock  Flag to indicate if call should lock message queue.
722  *
723  * @return 0 on success or negative number if failed to send notification.
724  */
725 static inline int
726 session_enqueue_notify_inline (session_t * s)
727 {
728   app_worker_t *app_wrk;
729   u32 session_index;
730   u8 n_subscribers;
731
732   session_index = s->session_index;
733   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
734
735   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
736   if (PREDICT_FALSE (!app_wrk))
737     {
738       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
739       return 0;
740     }
741
742   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
743
744   s->flags &= ~SESSION_F_RX_EVT;
745
746   /* Application didn't confirm accept yet */
747   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
748     return 0;
749
750   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
751                                                      SESSION_IO_EVT_RX)))
752     return -1;
753
754   if (PREDICT_FALSE (n_subscribers))
755     {
756       s = session_get (session_index, vlib_get_thread_index ());
757       return session_notify_subscribers (app_wrk->app_index, s,
758                                          s->rx_fifo, SESSION_IO_EVT_RX);
759     }
760
761   return 0;
762 }
763
764 int
765 session_enqueue_notify (session_t * s)
766 {
767   return session_enqueue_notify_inline (s);
768 }
769
770 static void
771 session_enqueue_notify_rpc (void *arg)
772 {
773   u32 session_index = pointer_to_uword (arg);
774   session_t *s;
775
776   s = session_get_if_valid (session_index, vlib_get_thread_index ());
777   if (!s)
778     return;
779
780   session_enqueue_notify (s);
781 }
782
783 /**
784  * Like session_enqueue_notify, but can be called from a thread that does not
785  * own the session.
786  */
787 void
788 session_enqueue_notify_thread (session_handle_t sh)
789 {
790   u32 thread_index = session_thread_from_handle (sh);
791   u32 session_index = session_index_from_handle (sh);
792
793   /*
794    * Pass session index (u32) as opposed to handle (u64) in case pointers
795    * are not 64-bit.
796    */
797   session_send_rpc_evt_to_thread (thread_index,
798                                   session_enqueue_notify_rpc,
799                                   uword_to_pointer (session_index, void *));
800 }
801
802 int
803 session_dequeue_notify (session_t * s)
804 {
805   app_worker_t *app_wrk;
806
807   svm_fifo_clear_deq_ntf (s->tx_fifo);
808
809   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
810   if (PREDICT_FALSE (!app_wrk))
811     return -1;
812
813   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
814                                                      SESSION_IO_EVT_TX)))
815     return -1;
816
817   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
818     return session_notify_subscribers (app_wrk->app_index, s,
819                                        s->tx_fifo, SESSION_IO_EVT_TX);
820
821   return 0;
822 }
823
824 /**
825  * Flushes queue of sessions that are to be notified of new data
826  * enqueued events.
827  *
828  * @param thread_index Thread index for which the flush is to be performed.
829  * @return 0 on success or a positive number indicating the number of
830  *         failures due to API queue being full.
831  */
832 int
833 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
834 {
835   session_worker_t *wrk = session_main_get_worker (thread_index);
836   session_t *s;
837   int i, errors = 0;
838   u32 *indices;
839
840   indices = wrk->session_to_enqueue[transport_proto];
841
842   for (i = 0; i < vec_len (indices); i++)
843     {
844       s = session_get_if_valid (indices[i], thread_index);
845       if (PREDICT_FALSE (!s))
846         {
847           errors++;
848           continue;
849         }
850
851       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
852                            0 /* TODO/not needed */ );
853
854       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
855         errors++;
856     }
857
858   vec_reset_length (indices);
859   wrk->session_to_enqueue[transport_proto] = indices;
860
861   return errors;
862 }
863
864 int
865 session_main_flush_all_enqueue_events (u8 transport_proto)
866 {
867   vlib_thread_main_t *vtm = vlib_get_thread_main ();
868   int i, errors = 0;
869   for (i = 0; i < 1 + vtm->n_threads; i++)
870     errors += session_main_flush_enqueue_events (transport_proto, i);
871   return errors;
872 }
873
874 int
875 session_stream_connect_notify (transport_connection_t * tc,
876                                session_error_t err)
877 {
878   u32 opaque = 0, new_ti, new_si;
879   app_worker_t *app_wrk;
880   session_t *s = 0, *ho;
881
882   /*
883    * Cleanup half-open table
884    */
885   session_lookup_del_half_open (tc);
886
887   ho = ho_session_get (tc->s_index);
888   opaque = ho->opaque;
889   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
890   if (!app_wrk)
891     return -1;
892
893   if (err)
894     return app_worker_connect_notify (app_wrk, s, err, opaque);
895
896   s = session_alloc_for_connection (tc);
897   session_set_state (s, SESSION_STATE_CONNECTING);
898   s->app_wrk_index = app_wrk->wrk_index;
899   new_si = s->session_index;
900   new_ti = s->thread_index;
901
902   if ((err = app_worker_init_connected (app_wrk, s)))
903     {
904       session_free (s);
905       app_worker_connect_notify (app_wrk, 0, err, opaque);
906       return -1;
907     }
908
909   s = session_get (new_si, new_ti);
910   session_set_state (s, SESSION_STATE_READY);
911   session_lookup_add_connection (tc, session_handle (s));
912
913   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
914     {
915       session_lookup_del_connection (tc);
916       /* Avoid notifying app about rejected session cleanup */
917       s = session_get (new_si, new_ti);
918       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
919       session_free (s);
920       return -1;
921     }
922
923   return 0;
924 }
925
926 typedef union session_switch_pool_reply_args_
927 {
928   struct
929   {
930     u32 session_index;
931     u16 thread_index;
932     u8 is_closed;
933   };
934   u64 as_u64;
935 } session_switch_pool_reply_args_t;
936
937 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
938                "switch pool reply args size");
939
940 static void
941 session_switch_pool_reply (void *arg)
942 {
943   session_switch_pool_reply_args_t rargs;
944   session_t *s;
945
946   rargs.as_u64 = pointer_to_uword (arg);
947   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
948   if (!s)
949     return;
950
951   /* Session closed during migration. Clean everything up */
952   if (rargs.is_closed)
953     {
954       transport_cleanup (session_get_transport_proto (s), s->connection_index,
955                          s->thread_index);
956       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
957       session_free (s);
958       return;
959     }
960
961   /* Notify app that it has data on the new session */
962   session_enqueue_notify (s);
963 }
964
965 typedef struct _session_switch_pool_args
966 {
967   u32 session_index;
968   u32 thread_index;
969   u32 new_thread_index;
970   u32 new_session_index;
971 } session_switch_pool_args_t;
972
973 /**
974  * Notify old thread of the session pool switch
975  */
976 static void
977 session_switch_pool (void *cb_args)
978 {
979   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
980   session_switch_pool_reply_args_t rargs;
981   session_handle_t new_sh;
982   segment_manager_t *sm;
983   app_worker_t *app_wrk;
984   session_t *s;
985
986   ASSERT (args->thread_index == vlib_get_thread_index ());
987   s = session_get (args->session_index, args->thread_index);
988
989   /* Check if session closed during migration */
990   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
991
992   transport_cleanup (session_get_transport_proto (s), s->connection_index,
993                      s->thread_index);
994
995   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
996   if (app_wrk)
997     {
998       /* Cleanup fifo segment slice state for fifos */
999       sm = app_worker_get_connect_segment_manager (app_wrk);
1000       segment_manager_detach_fifo (sm, &s->rx_fifo);
1001       segment_manager_detach_fifo (sm, &s->tx_fifo);
1002
1003       /* Notify app, using old session, about the migration event */
1004       if (!rargs.is_closed)
1005         {
1006           new_sh = session_make_handle (args->new_session_index,
1007                                         args->new_thread_index);
1008           app_worker_migrate_notify (app_wrk, s, new_sh);
1009         }
1010     }
1011
1012   /* Trigger app read and fifo updates on the new thread */
1013   rargs.session_index = args->new_session_index;
1014   rargs.thread_index = args->new_thread_index;
1015   session_send_rpc_evt_to_thread (args->new_thread_index,
1016                                   session_switch_pool_reply,
1017                                   uword_to_pointer (rargs.as_u64, void *));
1018
1019   session_free (s);
1020   clib_mem_free (cb_args);
1021 }
1022
1023 /**
1024  * Move dgram session to the right thread
1025  */
1026 int
1027 session_dgram_connect_notify (transport_connection_t * tc,
1028                               u32 old_thread_index, session_t ** new_session)
1029 {
1030   session_t *new_s;
1031   session_switch_pool_args_t *rpc_args;
1032   segment_manager_t *sm;
1033   app_worker_t *app_wrk;
1034
1035   /*
1036    * Clone half-open session to the right thread.
1037    */
1038   new_s = session_clone_safe (tc->s_index, old_thread_index);
1039   new_s->connection_index = tc->c_index;
1040   session_set_state (new_s, SESSION_STATE_READY);
1041   new_s->flags |= SESSION_F_IS_MIGRATING;
1042
1043   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1044     session_lookup_add_connection (tc, session_handle (new_s));
1045
1046   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1047   if (app_wrk)
1048     {
1049       /* New set of fifos attached to the same shared memory */
1050       sm = app_worker_get_connect_segment_manager (app_wrk);
1051       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1052       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1053     }
1054
1055   /*
1056    * Ask thread owning the old session to clean it up and make us the tx
1057    * fifo owner
1058    */
1059   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1060   rpc_args->new_session_index = new_s->session_index;
1061   rpc_args->new_thread_index = new_s->thread_index;
1062   rpc_args->session_index = tc->s_index;
1063   rpc_args->thread_index = old_thread_index;
1064   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1065                                   rpc_args);
1066
1067   tc->s_index = new_s->session_index;
1068   new_s->connection_index = tc->c_index;
1069   *new_session = new_s;
1070   return 0;
1071 }
1072
1073 /**
1074  * Notification from transport that connection is being closed.
1075  *
1076  * A disconnect is sent to application but state is not removed. Once
1077  * disconnect is acknowledged by application, session disconnect is called.
1078  * Ultimately this leads to close being called on transport (passive close).
1079  */
1080 void
1081 session_transport_closing_notify (transport_connection_t * tc)
1082 {
1083   app_worker_t *app_wrk;
1084   session_t *s;
1085
1086   s = session_get (tc->s_index, tc->thread_index);
1087   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1088     return;
1089
1090   /* Wait for reply from app before sending notification as the
1091    * accept might be rejected */
1092   if (s->session_state == SESSION_STATE_ACCEPTING)
1093     {
1094       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1095       return;
1096     }
1097
1098   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1099   app_wrk = app_worker_get (s->app_wrk_index);
1100   app_worker_close_notify (app_wrk, s);
1101 }
1102
1103 /**
1104  * Notification from transport that connection is being deleted
1105  *
1106  * This removes the session if it is still valid. It should be called only on
1107  * previously fully established sessions. For instance failed connects should
1108  * call stream_session_connect_notify and indicate that the connect has
1109  * failed.
1110  */
1111 void
1112 session_transport_delete_notify (transport_connection_t * tc)
1113 {
1114   session_t *s;
1115
1116   /* App might've been removed already */
1117   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1118     return;
1119
1120   switch (s->session_state)
1121     {
1122     case SESSION_STATE_CREATED:
1123       /* Session was created but accept notification was not yet sent to the
1124        * app. Cleanup everything. */
1125       session_lookup_del_session (s);
1126       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1127       session_free (s);
1128       break;
1129     case SESSION_STATE_ACCEPTING:
1130     case SESSION_STATE_TRANSPORT_CLOSING:
1131     case SESSION_STATE_CLOSING:
1132     case SESSION_STATE_TRANSPORT_CLOSED:
1133       /* If transport finishes or times out before we get a reply
1134        * from the app, mark transport as closed and wait for reply
1135        * before removing the session. Cleanup session table in advance
1136        * because transport will soon be closed and closed sessions
1137        * are assumed to have been removed from the lookup table */
1138       session_lookup_del_session (s);
1139       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1140       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1141       svm_fifo_dequeue_drop_all (s->tx_fifo);
1142       break;
1143     case SESSION_STATE_APP_CLOSED:
1144       /* Cleanup lookup table as transport needs to still be valid.
1145        * Program transport close to ensure that all session events
1146        * have been cleaned up. Once transport close is called, the
1147        * session is just removed because both transport and app have
1148        * confirmed the close*/
1149       session_lookup_del_session (s);
1150       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1151       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1152       svm_fifo_dequeue_drop_all (s->tx_fifo);
1153       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1154       break;
1155     case SESSION_STATE_TRANSPORT_DELETED:
1156       break;
1157     case SESSION_STATE_CLOSED:
1158       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1159       session_delete (s);
1160       break;
1161     default:
1162       clib_warning ("session state %u", s->session_state);
1163       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1164       session_delete (s);
1165       break;
1166     }
1167 }
1168
1169 /**
1170  * Notification from transport that it is closed
1171  *
1172  * Should be called by transport, prior to calling delete notify, once it
1173  * knows that no more data will be exchanged. This could serve as an
1174  * early acknowledgment of an active close especially if transport delete
1175  * can be delayed a long time, e.g., tcp time-wait.
1176  */
1177 void
1178 session_transport_closed_notify (transport_connection_t * tc)
1179 {
1180   app_worker_t *app_wrk;
1181   session_t *s;
1182
1183   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1184     return;
1185
1186   /* Transport thinks that app requested close but it actually didn't.
1187    * Can happen for tcp:
1188    * 1)if fin and rst are received in close succession.
1189    * 2)if app shutdown the connection.  */
1190   if (s->session_state == SESSION_STATE_READY)
1191     {
1192       session_transport_closing_notify (tc);
1193       svm_fifo_dequeue_drop_all (s->tx_fifo);
1194       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1195     }
1196   /* If app close has not been received or has not yet resulted in
1197    * a transport close, only mark the session transport as closed */
1198   else if (s->session_state <= SESSION_STATE_CLOSING)
1199     session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1200   /* If app also closed, switch to closed */
1201   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1202     session_set_state (s, SESSION_STATE_CLOSED);
1203
1204   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1205   if (app_wrk)
1206     app_worker_transport_closed_notify (app_wrk, s);
1207 }
1208
1209 /**
1210  * Notify application that connection has been reset.
1211  */
1212 void
1213 session_transport_reset_notify (transport_connection_t * tc)
1214 {
1215   app_worker_t *app_wrk;
1216   session_t *s;
1217
1218   s = session_get (tc->s_index, tc->thread_index);
1219   svm_fifo_dequeue_drop_all (s->tx_fifo);
1220   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1221     return;
1222   if (s->session_state == SESSION_STATE_ACCEPTING)
1223     {
1224       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1225       return;
1226     }
1227   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1228   app_wrk = app_worker_get (s->app_wrk_index);
1229   app_worker_reset_notify (app_wrk, s);
1230 }
1231
1232 int
1233 session_stream_accept_notify (transport_connection_t * tc)
1234 {
1235   app_worker_t *app_wrk;
1236   session_t *s;
1237
1238   s = session_get (tc->s_index, tc->thread_index);
1239   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1240   if (!app_wrk)
1241     return -1;
1242   if (s->session_state != SESSION_STATE_CREATED)
1243     return 0;
1244   session_set_state (s, SESSION_STATE_ACCEPTING);
1245   if (app_worker_accept_notify (app_wrk, s))
1246     {
1247       /* On transport delete, no notifications should be sent. Unless, the
1248        * accept is retried and successful. */
1249       session_set_state (s, SESSION_STATE_CREATED);
1250       return -1;
1251     }
1252   return 0;
1253 }
1254
1255 /**
1256  * Accept a stream session. Optionally ping the server by callback.
1257  */
1258 int
1259 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1260                        u32 thread_index, u8 notify)
1261 {
1262   session_t *s;
1263   int rv;
1264
1265   s = session_alloc_for_connection (tc);
1266   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1267   session_set_state (s, SESSION_STATE_CREATED);
1268
1269   if ((rv = app_worker_init_accepted (s)))
1270     {
1271       session_free (s);
1272       return rv;
1273     }
1274
1275   session_lookup_add_connection (tc, session_handle (s));
1276
1277   /* Shoulder-tap the server */
1278   if (notify)
1279     {
1280       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1281       if ((rv = app_worker_accept_notify (app_wrk, s)))
1282         {
1283           session_lookup_del_session (s);
1284           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1285           session_free (s);
1286           return rv;
1287         }
1288     }
1289
1290   return 0;
1291 }
1292
1293 int
1294 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1295                       u32 thread_index)
1296 {
1297   app_worker_t *app_wrk;
1298   session_t *s;
1299   int rv;
1300
1301   s = session_alloc_for_connection (tc);
1302   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1303
1304   if ((rv = app_worker_init_accepted (s)))
1305     {
1306       session_free (s);
1307       return rv;
1308     }
1309
1310   session_lookup_add_connection (tc, session_handle (s));
1311   session_set_state (s, SESSION_STATE_ACCEPTING);
1312
1313   app_wrk = app_worker_get (s->app_wrk_index);
1314   if ((rv = app_worker_accept_notify (app_wrk, s)))
1315     {
1316       session_lookup_del_session (s);
1317       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1318       session_free (s);
1319       return rv;
1320     }
1321
1322   return 0;
1323 }
1324
1325 int
1326 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1327 {
1328   transport_connection_t *tc;
1329   transport_endpoint_cfg_t *tep;
1330   app_worker_t *app_wrk;
1331   session_handle_t sh;
1332   session_t *s;
1333   int rv;
1334
1335   tep = session_endpoint_to_transport_cfg (rmt);
1336   rv = transport_connect (rmt->transport_proto, tep);
1337   if (rv < 0)
1338     {
1339       SESSION_DBG ("Transport failed to open connection.");
1340       return rv;
1341     }
1342
1343   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1344
1345   /* For dgram type of service, allocate session and fifos now */
1346   app_wrk = app_worker_get (rmt->app_wrk_index);
1347   s = session_alloc_for_connection (tc);
1348   s->app_wrk_index = app_wrk->wrk_index;
1349   session_set_state (s, SESSION_STATE_OPENED);
1350   if (app_worker_init_connected (app_wrk, s))
1351     {
1352       session_free (s);
1353       return -1;
1354     }
1355
1356   sh = session_handle (s);
1357   *rsh = sh;
1358
1359   session_lookup_add_connection (tc, sh);
1360   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1361 }
1362
1363 int
1364 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1365 {
1366   transport_connection_t *tc;
1367   transport_endpoint_cfg_t *tep;
1368   app_worker_t *app_wrk;
1369   session_t *ho;
1370   int rv;
1371
1372   tep = session_endpoint_to_transport_cfg (rmt);
1373   rv = transport_connect (rmt->transport_proto, tep);
1374   if (rv < 0)
1375     {
1376       SESSION_DBG ("Transport failed to open connection.");
1377       return rv;
1378     }
1379
1380   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1381
1382   app_wrk = app_worker_get (rmt->app_wrk_index);
1383
1384   /* If transport offers a vc service, only allocate established
1385    * session once the connection has been established.
1386    * In the meantime allocate half-open session for tracking purposes
1387    * associate half-open connection to it and add session to app-worker
1388    * half-open table. These are needed to allocate the established
1389    * session on transport notification, and to cleanup the half-open
1390    * session if the app detaches before connection establishment.
1391    */
1392   ho = session_alloc_for_half_open (tc);
1393   ho->app_wrk_index = app_wrk->wrk_index;
1394   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1395   ho->opaque = rmt->opaque;
1396   *rsh = session_handle (ho);
1397
1398   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1399     session_lookup_add_half_open (tc, tc->c_index);
1400
1401   return 0;
1402 }
1403
1404 int
1405 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1406 {
1407   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1408
1409   /* Not supported for now */
1410   *rsh = SESSION_INVALID_HANDLE;
1411   return transport_connect (rmt->transport_proto, tep_cfg);
1412 }
1413
1414 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1415                                         session_handle_t *);
1416
1417 /* *INDENT-OFF* */
1418 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1419   session_open_vc,
1420   session_open_cl,
1421   session_open_app,
1422 };
1423 /* *INDENT-ON* */
1424
1425 /**
1426  * Ask transport to open connection to remote transport endpoint.
1427  *
1428  * Stores handle for matching request with reply since the call can be
1429  * asynchronous. For instance, for TCP the 3-way handshake must complete
1430  * before reply comes. Session is only created once connection is established.
1431  *
1432  * @param app_index Index of the application requesting the connect
1433  * @param st Session type requested.
1434  * @param tep Remote transport endpoint
1435  * @param opaque Opaque data (typically, api_context) the application expects
1436  *               on open completion.
1437  */
1438 int
1439 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1440 {
1441   transport_service_type_t tst;
1442   tst = transport_protocol_service_type (rmt->transport_proto);
1443   return session_open_srv_fns[tst](rmt, rsh);
1444 }
1445
1446 /**
1447  * Ask transport to listen on session endpoint.
1448  *
1449  * @param s Session for which listen will be called. Note that unlike
1450  *          established sessions, listen sessions are not associated to a
1451  *          thread.
1452  * @param sep Local endpoint to be listened on.
1453  */
1454 int
1455 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1456 {
1457   transport_endpoint_cfg_t *tep;
1458   int tc_index;
1459   u32 s_index;
1460
1461   /* Transport bind/listen */
1462   tep = session_endpoint_to_transport_cfg (sep);
1463   s_index = ls->session_index;
1464   tc_index = transport_start_listen (session_get_transport_proto (ls),
1465                                      s_index, tep);
1466
1467   if (tc_index < 0)
1468     return tc_index;
1469
1470   /* Attach transport to session. Lookup tables are populated by the app
1471    * worker because local tables (for ct sessions) are not backed by a fib */
1472   ls = listen_session_get (s_index);
1473   ls->connection_index = tc_index;
1474   ls->opaque = sep->opaque;
1475
1476   return 0;
1477 }
1478
1479 /**
1480  * Ask transport to stop listening on local transport endpoint.
1481  *
1482  * @param s Session to stop listening on. It must be in state LISTENING.
1483  */
1484 int
1485 session_stop_listen (session_t * s)
1486 {
1487   transport_proto_t tp = session_get_transport_proto (s);
1488   transport_connection_t *tc;
1489
1490   if (s->session_state != SESSION_STATE_LISTENING)
1491     return SESSION_E_NOLISTEN;
1492
1493   tc = transport_get_listener (tp, s->connection_index);
1494
1495   /* If no transport, assume everything was cleaned up already */
1496   if (!tc)
1497     return SESSION_E_NONE;
1498
1499   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1500     session_lookup_del_connection (tc);
1501
1502   transport_stop_listen (tp, s->connection_index);
1503   return 0;
1504 }
1505
1506 /**
1507  * Initialize session half-closing procedure.
1508  *
1509  * Note that half-closing will not change the state of the session.
1510  */
1511 void
1512 session_half_close (session_t *s)
1513 {
1514   if (!s)
1515     return;
1516
1517   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1518 }
1519
1520 /**
1521  * Initialize session closing procedure.
1522  *
1523  * Request is always sent to session node to ensure that all outstanding
1524  * requests are served before transport is notified.
1525  */
1526 void
1527 session_close (session_t * s)
1528 {
1529   if (!s)
1530     return;
1531
1532   if (s->session_state >= SESSION_STATE_CLOSING)
1533     {
1534       /* Session will only be removed once both app and transport
1535        * acknowledge the close */
1536       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1537           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1538         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1539       return;
1540     }
1541
1542   /* App closed so stop propagating dequeue notifications.
1543    * App might disconnect session before connected, in this case,
1544    * tx_fifo may not be setup yet, so clear only it's inited. */
1545   if (s->tx_fifo)
1546     svm_fifo_clear_deq_ntf (s->tx_fifo);
1547   session_set_state (s, SESSION_STATE_CLOSING);
1548   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1549 }
1550
1551 /**
1552  * Force a close without waiting for data to be flushed
1553  */
1554 void
1555 session_reset (session_t * s)
1556 {
1557   if (s->session_state >= SESSION_STATE_CLOSING)
1558     return;
1559   /* Drop all outstanding tx data
1560    * App might disconnect session before connected, in this case,
1561    * tx_fifo may not be setup yet, so clear only it's inited. */
1562   if (s->tx_fifo)
1563     svm_fifo_dequeue_drop_all (s->tx_fifo);
1564   session_set_state (s, SESSION_STATE_CLOSING);
1565   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1566 }
1567
1568 /**
1569  * Notify transport the session can be half-disconnected.
1570  *
1571  * Must be called from the session's thread.
1572  */
1573 void
1574 session_transport_half_close (session_t *s)
1575 {
1576   /* Only READY session can be half-closed */
1577   if (s->session_state != SESSION_STATE_READY)
1578     {
1579       return;
1580     }
1581
1582   transport_half_close (session_get_transport_proto (s), s->connection_index,
1583                         s->thread_index);
1584 }
1585
1586 /**
1587  * Notify transport the session can be disconnected. This should eventually
1588  * result in a delete notification that allows us to cleanup session state.
1589  * Called for both active/passive disconnects.
1590  *
1591  * Must be called from the session's thread.
1592  */
1593 void
1594 session_transport_close (session_t * s)
1595 {
1596   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1597     {
1598       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1599         session_set_state (s, SESSION_STATE_CLOSED);
1600       /* If transport is already deleted, just free the session */
1601       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1602         session_free_w_fifos (s);
1603       return;
1604     }
1605
1606   /* If the tx queue wasn't drained, the transport can continue to try
1607    * sending the outstanding data (in closed state it cannot). It MUST however
1608    * at one point, either after sending everything or after a timeout, call
1609    * delete notify. This will finally lead to the complete cleanup of the
1610    * session.
1611    */
1612   session_set_state (s, SESSION_STATE_APP_CLOSED);
1613
1614   transport_close (session_get_transport_proto (s), s->connection_index,
1615                    s->thread_index);
1616 }
1617
1618 /**
1619  * Force transport close
1620  */
1621 void
1622 session_transport_reset (session_t * s)
1623 {
1624   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1625     {
1626       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1627         session_set_state (s, SESSION_STATE_CLOSED);
1628       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1629         session_free_w_fifos (s);
1630       return;
1631     }
1632
1633   session_set_state (s, SESSION_STATE_APP_CLOSED);
1634   transport_reset (session_get_transport_proto (s), s->connection_index,
1635                    s->thread_index);
1636 }
1637
1638 /**
1639  * Cleanup transport and session state.
1640  *
1641  * Notify transport of the cleanup and free the session. This should
1642  * be called only if transport reported some error and is already
1643  * closed.
1644  */
1645 void
1646 session_transport_cleanup (session_t * s)
1647 {
1648   /* Delete from main lookup table before we axe the the transport */
1649   session_lookup_del_session (s);
1650   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1651     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1652                        s->thread_index);
1653   /* Since we called cleanup, no delete notification will come. So, make
1654    * sure the session is properly freed. */
1655   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1656   session_free (s);
1657 }
1658
1659 /**
1660  * Allocate worker mqs in share-able segment
1661  *
1662  * That can only be a newly created memfd segment, that must be mapped
1663  * by all apps/stack users unless private rx mqs are enabled.
1664  */
1665 void
1666 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1667 {
1668   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1669   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1670   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1671   uword mqs_seg_size;
1672   int i;
1673
1674   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1675
1676   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1677     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1678   };
1679   cfg->consumer_pid = 0;
1680   cfg->n_rings = 2;
1681   cfg->q_nitems = mq_q_length;
1682   cfg->ring_cfgs = rc;
1683
1684   /*
1685    * Compute mqs segment size based on rings config and leave space
1686    * for passing extended configuration messages, i.e., data allocated
1687    * outside of the rings. If provided with a config value, accept it
1688    * if larger than minimum size.
1689    */
1690   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1691   mqs_seg_size = mqs_seg_size + (1 << 20);
1692   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1693
1694   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1695   mqs_seg->ssvm.my_pid = getpid ();
1696   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1697
1698   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1699     {
1700       clib_warning ("failed to initialize queue segment");
1701       return;
1702     }
1703
1704   fifo_segment_init (mqs_seg);
1705
1706   /* Special fifo segment that's filled only with mqs */
1707   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1708
1709   for (i = 0; i < vec_len (smm->wrk); i++)
1710     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1711 }
1712
1713 fifo_segment_t *
1714 session_main_get_wrk_mqs_segment (void)
1715 {
1716   return &session_main.wrk_mqs_segment;
1717 }
1718
1719 u64
1720 session_segment_handle (session_t * s)
1721 {
1722   svm_fifo_t *f;
1723
1724   if (!s->rx_fifo)
1725     return SESSION_INVALID_HANDLE;
1726
1727   f = s->rx_fifo;
1728   return segment_manager_make_segment_handle (f->segment_manager,
1729                                               f->segment_index);
1730 }
1731
1732 /* *INDENT-OFF* */
1733 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1734     session_tx_fifo_peek_and_snd,
1735     session_tx_fifo_dequeue_and_snd,
1736     session_tx_fifo_dequeue_internal,
1737     session_tx_fifo_dequeue_and_snd
1738 };
1739 /* *INDENT-ON* */
1740
1741 void
1742 session_register_transport (transport_proto_t transport_proto,
1743                             const transport_proto_vft_t * vft, u8 is_ip4,
1744                             u32 output_node)
1745 {
1746   session_main_t *smm = &session_main;
1747   session_type_t session_type;
1748   u32 next_index = ~0;
1749
1750   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1751
1752   vec_validate (smm->session_type_to_next, session_type);
1753   vec_validate (smm->session_tx_fns, session_type);
1754
1755   if (output_node != ~0)
1756     next_index = vlib_node_add_next (vlib_get_main (),
1757                                      session_queue_node.index, output_node);
1758
1759   smm->session_type_to_next[session_type] = next_index;
1760   smm->session_tx_fns[session_type] =
1761     session_tx_fns[vft->transport_options.tx_type];
1762 }
1763
1764 void
1765 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1766 {
1767   session_main_t *smm = &session_main;
1768   session_update_time_fn *fi;
1769   u32 fi_pos = ~0;
1770   u8 found = 0;
1771
1772   vec_foreach (fi, smm->update_time_fns)
1773     {
1774       if (*fi == fn)
1775         {
1776           fi_pos = fi - smm->update_time_fns;
1777           found = 1;
1778           break;
1779         }
1780     }
1781
1782   if (is_add)
1783     {
1784       if (found)
1785         {
1786           clib_warning ("update time fn %p already registered", fn);
1787           return;
1788         }
1789       vec_add1 (smm->update_time_fns, fn);
1790     }
1791   else
1792     {
1793       vec_del1 (smm->update_time_fns, fi_pos);
1794     }
1795 }
1796
1797 transport_proto_t
1798 session_add_transport_proto (void)
1799 {
1800   session_main_t *smm = &session_main;
1801   session_worker_t *wrk;
1802   u32 thread;
1803
1804   smm->last_transport_proto_type += 1;
1805
1806   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1807     {
1808       wrk = session_main_get_worker (thread);
1809       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1810     }
1811
1812   return smm->last_transport_proto_type;
1813 }
1814
1815 transport_connection_t *
1816 session_get_transport (session_t * s)
1817 {
1818   if (s->session_state != SESSION_STATE_LISTENING)
1819     return transport_get_connection (session_get_transport_proto (s),
1820                                      s->connection_index, s->thread_index);
1821   else
1822     return transport_get_listener (session_get_transport_proto (s),
1823                                    s->connection_index);
1824 }
1825
1826 void
1827 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1828 {
1829   if (s->session_state != SESSION_STATE_LISTENING)
1830     return transport_get_endpoint (session_get_transport_proto (s),
1831                                    s->connection_index, s->thread_index, tep,
1832                                    is_lcl);
1833   else
1834     return transport_get_listener_endpoint (session_get_transport_proto (s),
1835                                             s->connection_index, tep, is_lcl);
1836 }
1837
1838 int
1839 session_transport_attribute (session_t *s, u8 is_get,
1840                              transport_endpt_attr_t *attr)
1841 {
1842   if (s->session_state < SESSION_STATE_READY)
1843     return -1;
1844
1845   return transport_connection_attribute (session_get_transport_proto (s),
1846                                          s->connection_index, s->thread_index,
1847                                          is_get, attr);
1848 }
1849
1850 transport_connection_t *
1851 listen_session_get_transport (session_t * s)
1852 {
1853   return transport_get_listener (session_get_transport_proto (s),
1854                                  s->connection_index);
1855 }
1856
1857 void
1858 session_queue_run_on_main_thread (vlib_main_t * vm)
1859 {
1860   ASSERT (vlib_get_thread_index () == 0);
1861   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1862 }
1863
1864 static void
1865 session_stats_collector_fn (vlib_stats_collector_data_t *d)
1866 {
1867   u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1868   session_main_t *smm = &session_main;
1869   session_worker_t *wrk;
1870   counter_t **counters;
1871   counter_t *cb;
1872
1873   n_workers = vec_len (smm->wrk);
1874   vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1875   counters = d->entry->data;
1876   cb = counters[0];
1877
1878   for (i = 0; i < vec_len (smm->wrk); i++)
1879     {
1880       wrk = session_main_get_worker (i);
1881       n_wrk_sessions = pool_elts (wrk->sessions);
1882       cb[i] = n_wrk_sessions;
1883       n_sessions += n_wrk_sessions;
1884     }
1885
1886   vlib_stats_set_gauge (d->private_data, n_sessions);
1887 }
1888
1889 static void
1890 session_stats_collector_init (void)
1891 {
1892   vlib_stats_collector_reg_t reg = {};
1893
1894   reg.entry_index =
1895     vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1896   reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1897   reg.collect_fn = session_stats_collector_fn;
1898   vlib_stats_register_collector_fn (&reg);
1899   vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1900 }
1901
1902 static clib_error_t *
1903 session_manager_main_enable (vlib_main_t * vm)
1904 {
1905   session_main_t *smm = &session_main;
1906   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1907   u32 num_threads, preallocated_sessions_per_worker;
1908   session_worker_t *wrk;
1909   int i;
1910
1911   /* We only initialize once and do not de-initialized on disable */
1912   if (smm->is_initialized)
1913     goto done;
1914
1915   num_threads = 1 /* main thread */  + vtm->n_threads;
1916
1917   if (num_threads < 1)
1918     return clib_error_return (0, "n_thread_stacks not set");
1919
1920   /* Allocate cache line aligned worker contexts */
1921   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1922   clib_spinlock_init (&session_main.pool_realloc_lock);
1923
1924   for (i = 0; i < num_threads; i++)
1925     {
1926       wrk = &smm->wrk[i];
1927       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1928       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1929       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1930       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1931       wrk->evts_pending_main =
1932         clib_llist_make_head (wrk->event_elts, evt_list);
1933       wrk->vm = vlib_get_main_by_index (i);
1934       wrk->last_vlib_time = vlib_time_now (vm);
1935       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1936       wrk->timerfd = -1;
1937       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1938
1939       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1940         session_wrk_enable_adaptive_mode (wrk);
1941     }
1942
1943   /* Allocate vpp event queues segment and queue */
1944   session_vpp_wrk_mqs_alloc (smm);
1945
1946   /* Initialize segment manager properties */
1947   segment_manager_main_init ();
1948
1949   /* Preallocate sessions */
1950   if (smm->preallocated_sessions)
1951     {
1952       if (num_threads == 1)
1953         {
1954           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1955         }
1956       else
1957         {
1958           int j;
1959           preallocated_sessions_per_worker =
1960             (1.1 * (f64) smm->preallocated_sessions /
1961              (f64) (num_threads - 1));
1962
1963           for (j = 1; j < num_threads; j++)
1964             {
1965               pool_init_fixed (smm->wrk[j].sessions,
1966                                preallocated_sessions_per_worker);
1967             }
1968         }
1969     }
1970
1971   session_lookup_init ();
1972   app_namespaces_init ();
1973   transport_init ();
1974   session_stats_collector_init ();
1975   smm->is_initialized = 1;
1976
1977 done:
1978
1979   smm->is_enabled = 1;
1980
1981   /* Enable transports */
1982   transport_enable_disable (vm, 1);
1983   session_debug_init ();
1984
1985   return 0;
1986 }
1987
1988 static void
1989 session_manager_main_disable (vlib_main_t * vm)
1990 {
1991   transport_enable_disable (vm, 0 /* is_en */ );
1992 }
1993
1994 /* in this new callback, cookie hint the index */
1995 void
1996 session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
1997 {
1998   session_worker_t *wrk;
1999   wrk = session_main_get_worker (vm->thread_index);
2000   session_dma_transfer *dma_transfer;
2001
2002   dma_transfer = &wrk->dma_trans[wrk->trans_head];
2003   vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2004            vec_len (dma_transfer->pending_tx_buffers));
2005   vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2006            vec_len (dma_transfer->pending_tx_nexts));
2007   vec_reset_length (dma_transfer->pending_tx_buffers);
2008   vec_reset_length (dma_transfer->pending_tx_nexts);
2009   wrk->trans_head++;
2010   if (wrk->trans_head == wrk->trans_size)
2011     wrk->trans_head = 0;
2012   return;
2013 }
2014
2015 static void
2016 session_prepare_dma_args (vlib_dma_config_t *args)
2017 {
2018   args->max_transfers = DMA_TRANS_SIZE;
2019   args->max_transfer_size = 65536;
2020   args->features = 0;
2021   args->sw_fallback = 1;
2022   args->barrier_before_last = 1;
2023   args->callback_fn = session_dma_completion_cb;
2024 }
2025
2026 static void
2027 session_node_enable_dma (u8 is_en, int n_vlibs)
2028 {
2029   vlib_dma_config_t args;
2030   session_prepare_dma_args (&args);
2031   session_worker_t *wrk;
2032   vlib_main_t *vm;
2033
2034   int config_index = -1;
2035
2036   if (is_en)
2037     {
2038       vm = vlib_get_main_by_index (0);
2039       config_index = vlib_dma_config_add (vm, &args);
2040     }
2041   else
2042     {
2043       vm = vlib_get_main_by_index (0);
2044       wrk = session_main_get_worker (0);
2045       if (wrk->config_index >= 0)
2046         vlib_dma_config_del (vm, wrk->config_index);
2047     }
2048   int i;
2049   for (i = 0; i < n_vlibs; i++)
2050     {
2051       vm = vlib_get_main_by_index (i);
2052       wrk = session_main_get_worker (vm->thread_index);
2053       wrk->config_index = config_index;
2054       if (is_en)
2055         {
2056           if (config_index >= 0)
2057             wrk->dma_enabled = true;
2058           wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2059             sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2060           bzero (wrk->dma_trans,
2061                  sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2062         }
2063       else
2064         {
2065           if (wrk->dma_trans)
2066             clib_mem_free (wrk->dma_trans);
2067         }
2068       wrk->trans_head = 0;
2069       wrk->trans_tail = 0;
2070       wrk->trans_size = DMA_TRANS_SIZE;
2071     }
2072 }
2073
2074 void
2075 session_node_enable_disable (u8 is_en)
2076 {
2077   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2078   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2079   session_main_t *sm = &session_main;
2080   vlib_main_t *vm;
2081   vlib_node_t *n;
2082   int n_vlibs, i;
2083
2084   n_vlibs = vlib_get_n_threads ();
2085   for (i = 0; i < n_vlibs; i++)
2086     {
2087       vm = vlib_get_main_by_index (i);
2088       /* main thread with workers and not polling */
2089       if (i == 0 && n_vlibs > 1)
2090         {
2091           vlib_node_set_state (vm, session_queue_node.index, mstate);
2092           if (is_en)
2093             {
2094               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2095               vlib_node_set_state (vm, session_queue_process_node.index,
2096                                    state);
2097               n = vlib_get_node (vm, session_queue_process_node.index);
2098               vlib_start_process (vm, n->runtime_index);
2099             }
2100           else
2101             {
2102               vlib_process_signal_event_mt (vm,
2103                                             session_queue_process_node.index,
2104                                             SESSION_Q_PROCESS_STOP, 0);
2105             }
2106           if (!sm->poll_main)
2107             continue;
2108         }
2109       vlib_node_set_state (vm, session_queue_node.index, state);
2110     }
2111
2112   if (sm->use_private_rx_mqs)
2113     application_enable_rx_mqs_nodes (is_en);
2114
2115   if (sm->dma_enabled)
2116     session_node_enable_dma (is_en, n_vlibs);
2117 }
2118
2119 clib_error_t *
2120 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2121 {
2122   clib_error_t *error = 0;
2123   if (is_en)
2124     {
2125       if (session_main.is_enabled)
2126         return 0;
2127
2128       error = session_manager_main_enable (vm);
2129       session_node_enable_disable (is_en);
2130     }
2131   else
2132     {
2133       session_main.is_enabled = 0;
2134       session_manager_main_disable (vm);
2135       session_node_enable_disable (is_en);
2136     }
2137
2138   return error;
2139 }
2140
2141 clib_error_t *
2142 session_main_init (vlib_main_t * vm)
2143 {
2144   session_main_t *smm = &session_main;
2145
2146   smm->is_enabled = 0;
2147   smm->session_enable_asap = 0;
2148   smm->poll_main = 0;
2149   smm->use_private_rx_mqs = 0;
2150   smm->no_adaptive = 0;
2151   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2152
2153   return 0;
2154 }
2155
2156 static clib_error_t *
2157 session_main_loop_init (vlib_main_t * vm)
2158 {
2159   session_main_t *smm = &session_main;
2160   if (smm->session_enable_asap)
2161     {
2162       vlib_worker_thread_barrier_sync (vm);
2163       vnet_session_enable_disable (vm, 1 /* is_en */ );
2164       vlib_worker_thread_barrier_release (vm);
2165     }
2166   return 0;
2167 }
2168
2169 VLIB_INIT_FUNCTION (session_main_init);
2170 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2171
2172 static clib_error_t *
2173 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2174 {
2175   session_main_t *smm = &session_main;
2176   u32 nitems;
2177   uword tmp;
2178
2179   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2180     {
2181       if (unformat (input, "wrk-mq-length %d", &nitems))
2182         {
2183           if (nitems >= 2048)
2184             smm->configured_wrk_mq_length = nitems;
2185           else
2186             clib_warning ("event queue length %d too small, ignored", nitems);
2187         }
2188       else if (unformat (input, "wrk-mqs-segment-size %U",
2189                          unformat_memory_size, &smm->wrk_mqs_segment_size))
2190         ;
2191       else if (unformat (input, "preallocated-sessions %d",
2192                          &smm->preallocated_sessions))
2193         ;
2194       else if (unformat (input, "v4-session-table-buckets %d",
2195                          &smm->configured_v4_session_table_buckets))
2196         ;
2197       else if (unformat (input, "v4-halfopen-table-buckets %d",
2198                          &smm->configured_v4_halfopen_table_buckets))
2199         ;
2200       else if (unformat (input, "v6-session-table-buckets %d",
2201                          &smm->configured_v6_session_table_buckets))
2202         ;
2203       else if (unformat (input, "v6-halfopen-table-buckets %d",
2204                          &smm->configured_v6_halfopen_table_buckets))
2205         ;
2206       else if (unformat (input, "v4-session-table-memory %U",
2207                          unformat_memory_size, &tmp))
2208         {
2209           if (tmp >= 0x100000000)
2210             return clib_error_return (0, "memory size %llx (%lld) too large",
2211                                       tmp, tmp);
2212           smm->configured_v4_session_table_memory = tmp;
2213         }
2214       else if (unformat (input, "v4-halfopen-table-memory %U",
2215                          unformat_memory_size, &tmp))
2216         {
2217           if (tmp >= 0x100000000)
2218             return clib_error_return (0, "memory size %llx (%lld) too large",
2219                                       tmp, tmp);
2220           smm->configured_v4_halfopen_table_memory = tmp;
2221         }
2222       else if (unformat (input, "v6-session-table-memory %U",
2223                          unformat_memory_size, &tmp))
2224         {
2225           if (tmp >= 0x100000000)
2226             return clib_error_return (0, "memory size %llx (%lld) too large",
2227                                       tmp, tmp);
2228           smm->configured_v6_session_table_memory = tmp;
2229         }
2230       else if (unformat (input, "v6-halfopen-table-memory %U",
2231                          unformat_memory_size, &tmp))
2232         {
2233           if (tmp >= 0x100000000)
2234             return clib_error_return (0, "memory size %llx (%lld) too large",
2235                                       tmp, tmp);
2236           smm->configured_v6_halfopen_table_memory = tmp;
2237         }
2238       else if (unformat (input, "local-endpoints-table-memory %U",
2239                          unformat_memory_size, &tmp))
2240         {
2241           if (tmp >= 0x100000000)
2242             return clib_error_return (0, "memory size %llx (%lld) too large",
2243                                       tmp, tmp);
2244           smm->local_endpoints_table_memory = tmp;
2245         }
2246       else if (unformat (input, "local-endpoints-table-buckets %d",
2247                          &smm->local_endpoints_table_buckets))
2248         ;
2249       else if (unformat (input, "enable"))
2250         smm->session_enable_asap = 1;
2251       else if (unformat (input, "use-app-socket-api"))
2252         (void) appns_sapi_enable_disable (1 /* is_enable */);
2253       else if (unformat (input, "poll-main"))
2254         smm->poll_main = 1;
2255       else if (unformat (input, "use-private-rx-mqs"))
2256         smm->use_private_rx_mqs = 1;
2257       else if (unformat (input, "no-adaptive"))
2258         smm->no_adaptive = 1;
2259       else if (unformat (input, "use-dma"))
2260         smm->dma_enabled = 1;
2261       /*
2262        * Deprecated but maintained for compatibility
2263        */
2264       else if (unformat (input, "evt_qs_memfd_seg"))
2265         ;
2266       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2267         ;
2268       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2269                          &smm->wrk_mqs_segment_size))
2270         ;
2271       else if (unformat (input, "event-queue-length %d", &nitems))
2272         {
2273           if (nitems >= 2048)
2274             smm->configured_wrk_mq_length = nitems;
2275           else
2276             clib_warning ("event queue length %d too small, ignored", nitems);
2277         }
2278       else
2279         return clib_error_return (0, "unknown input `%U'",
2280                                   format_unformat_error, input);
2281     }
2282   return 0;
2283 }
2284
2285 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2286
2287 /*
2288  * fd.io coding-style-patch-verification: ON
2289  *
2290  * Local Variables:
2291  * eval: (c-set-style "gnu")
2292  * End:
2293  */