4c57b1ac9bb832706d3f8bdb87313c694ce2004c
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24 #include <vlib/stats/stats.h>
25 #include <vlib/dma/dma.h>
26
27 session_main_t session_main;
28
29 static inline int
30 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
31                             session_evt_type_t evt_type)
32 {
33   session_worker_t *wrk = session_main_get_worker (thread_index);
34   session_event_t *evt;
35   svm_msg_q_msg_t msg;
36   svm_msg_q_t *mq;
37
38   mq = wrk->vpp_event_queue;
39   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
40     return -1;
41   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
42     {
43       svm_msg_q_unlock (mq);
44       return -2;
45     }
46   switch (evt_type)
47     {
48     case SESSION_CTRL_EVT_RPC:
49       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
50       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
51       evt->rpc_args.fp = data;
52       evt->rpc_args.arg = args;
53       break;
54     case SESSION_IO_EVT_RX:
55     case SESSION_IO_EVT_TX:
56     case SESSION_IO_EVT_TX_FLUSH:
57     case SESSION_IO_EVT_BUILTIN_RX:
58       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
59       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
60       evt->session_index = *(u32 *) data;
61       break;
62     case SESSION_IO_EVT_BUILTIN_TX:
63     case SESSION_CTRL_EVT_CLOSE:
64     case SESSION_CTRL_EVT_RESET:
65       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
66       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
67       evt->session_handle = session_handle ((session_t *) data);
68       break;
69     default:
70       clib_warning ("evt unhandled!");
71       svm_msg_q_unlock (mq);
72       return -1;
73     }
74   evt->event_type = evt_type;
75
76   svm_msg_q_add_and_unlock (mq, &msg);
77
78   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
79     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
80
81   return 0;
82 }
83
84 int
85 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
86 {
87   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
88                                      f->master_thread_index, evt_type);
89 }
90
91 int
92 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
93                                       session_evt_type_t evt_type)
94 {
95   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
96 }
97
98 int
99 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
100 {
101   /* only events supported are disconnect, shutdown and reset */
102   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
104           evt_type == SESSION_CTRL_EVT_RESET);
105   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 void
129 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
130 {
131   session_t *s = session_get (tc->s_index, tc->thread_index);
132
133   ASSERT (s->thread_index == vlib_get_thread_index ());
134   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
135
136   if (!(s->flags & SESSION_F_CUSTOM_TX))
137     {
138       s->flags |= SESSION_F_CUSTOM_TX;
139       if (svm_fifo_set_event (s->tx_fifo)
140           || transport_connection_is_descheduled (tc))
141         {
142           session_evt_elt_t *elt;
143           session_worker_t *wrk;
144
145           wrk = session_main_get_worker (tc->thread_index);
146           if (has_prio)
147             elt = session_evt_alloc_new (wrk);
148           else
149             elt = session_evt_alloc_old (wrk);
150           elt->evt.session_index = tc->s_index;
151           elt->evt.event_type = SESSION_IO_EVT_TX;
152           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
153
154           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
155             vlib_node_set_interrupt_pending (wrk->vm,
156                                              session_queue_node.index);
157         }
158     }
159 }
160
161 void
162 sesssion_reschedule_tx (transport_connection_t * tc)
163 {
164   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
165   session_evt_elt_t *elt;
166
167   ASSERT (tc->thread_index == vlib_get_thread_index ());
168
169   elt = session_evt_alloc_new (wrk);
170   elt->evt.session_index = tc->s_index;
171   elt->evt.event_type = SESSION_IO_EVT_TX;
172
173   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
174     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
175 }
176
177 static void
178 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
179 {
180   u32 thread_index = vlib_get_thread_index ();
181   session_evt_elt_t *elt;
182   session_worker_t *wrk;
183
184   /* If we are in the handler thread, or being called with the worker barrier
185    * held, just append a new event to pending disconnects vector. */
186   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
187     {
188       wrk = session_main_get_worker (s->thread_index);
189       elt = session_evt_alloc_ctrl (wrk);
190       clib_memset (&elt->evt, 0, sizeof (session_event_t));
191       elt->evt.session_handle = session_handle (s);
192       elt->evt.event_type = evt;
193
194       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
195         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
196     }
197   else
198     session_send_ctrl_evt_to_thread (s, evt);
199 }
200
201 session_t *
202 session_alloc (u32 thread_index)
203 {
204   session_worker_t *wrk = &session_main.wrk[thread_index];
205   session_t *s;
206
207   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
208   clib_memset (s, 0, sizeof (*s));
209   s->session_index = s - wrk->sessions;
210   s->thread_index = thread_index;
211   s->app_index = APP_INVALID_INDEX;
212
213   return s;
214 }
215
216 void
217 session_free (session_t * s)
218 {
219   session_worker_t *wrk = &session_main.wrk[s->thread_index];
220
221   SESSION_EVT (SESSION_EVT_FREE, s);
222   if (CLIB_DEBUG)
223     clib_memset (s, 0xFA, sizeof (*s));
224   pool_put (wrk->sessions, s);
225 }
226
227 u8
228 session_is_valid (u32 si, u8 thread_index)
229 {
230   session_t *s;
231   transport_connection_t *tc;
232
233   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
234
235   if (s->thread_index != thread_index || s->session_index != si)
236     return 0;
237
238   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
239       || s->session_state <= SESSION_STATE_LISTENING)
240     return 1;
241
242   if (s->session_state == SESSION_STATE_CONNECTING &&
243       (s->flags & SESSION_F_HALF_OPEN))
244     return 1;
245
246   tc = session_get_transport (s);
247   if (s->connection_index != tc->c_index
248       || s->thread_index != tc->thread_index || tc->s_index != si)
249     return 0;
250
251   return 1;
252 }
253
254 static void
255 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
256 {
257   app_worker_t *app_wrk;
258
259   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
260   if (!app_wrk)
261     return;
262   app_worker_cleanup_notify (app_wrk, s, ntf);
263 }
264
265 void
266 session_free_w_fifos (session_t * s)
267 {
268   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
269   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
270   session_free (s);
271 }
272
273 /**
274  * Cleans up session and lookup table.
275  *
276  * Transport connection must still be valid.
277  */
278 static void
279 session_delete (session_t * s)
280 {
281   int rv;
282
283   /* Delete from the main lookup table. */
284   if ((rv = session_lookup_del_session (s)))
285     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
286
287   session_free_w_fifos (s);
288 }
289
290 void
291 session_cleanup_half_open (session_handle_t ho_handle)
292 {
293   session_t *ho = session_get_from_handle (ho_handle);
294
295   /* App transports can migrate their half-opens */
296   if (ho->flags & SESSION_F_IS_MIGRATING)
297     {
298       /* Session still migrating, move to closed state to signal that the
299        * session should be removed. */
300       if (ho->connection_index == ~0)
301         {
302           ho->session_state = SESSION_STATE_CLOSED;
303           return;
304         }
305       /* Migrated transports are no longer half-opens */
306       transport_cleanup (session_get_transport_proto (ho),
307                          ho->connection_index, ho->app_index /* overloaded */);
308     }
309   else
310     transport_cleanup_half_open (session_get_transport_proto (ho),
311                                  ho->connection_index);
312   session_free (ho);
313 }
314
315 static void
316 session_half_open_free (session_t *ho)
317 {
318   app_worker_t *app_wrk;
319
320   ASSERT (vlib_get_thread_index () <= 1);
321   app_wrk = app_worker_get (ho->app_wrk_index);
322   app_worker_del_half_open (app_wrk, ho);
323   session_free (ho);
324 }
325
326 static void
327 session_half_open_free_rpc (void *args)
328 {
329   session_t *ho = ho_session_get (pointer_to_uword (args));
330   session_half_open_free (ho);
331 }
332
333 void
334 session_half_open_delete_notify (transport_connection_t *tc)
335 {
336   /* Notification from ctrl thread accepted without rpc */
337   if (!tc->thread_index)
338     {
339       session_half_open_free (ho_session_get (tc->s_index));
340     }
341   else
342     {
343       void *args = uword_to_pointer ((uword) tc->s_index, void *);
344       session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
345                                             args);
346     }
347 }
348
349 void
350 session_half_open_migrate_notify (transport_connection_t *tc)
351 {
352   session_t *ho;
353
354   ho = ho_session_get (tc->s_index);
355   ho->flags |= SESSION_F_IS_MIGRATING;
356   ho->connection_index = ~0;
357 }
358
359 int
360 session_half_open_migrated_notify (transport_connection_t *tc)
361 {
362   session_t *ho;
363
364   ho = ho_session_get (tc->s_index);
365
366   /* App probably detached so the half-open must be cleaned up */
367   if (ho->session_state == SESSION_STATE_CLOSED)
368     {
369       session_half_open_delete_notify (tc);
370       return -1;
371     }
372   ho->connection_index = tc->c_index;
373   /* Overload app index for half-open with new thread */
374   ho->app_index = tc->thread_index;
375   return 0;
376 }
377
378 session_t *
379 session_alloc_for_connection (transport_connection_t * tc)
380 {
381   session_t *s;
382   u32 thread_index = tc->thread_index;
383
384   ASSERT (thread_index == vlib_get_thread_index ()
385           || transport_protocol_is_cl (tc->proto));
386
387   s = session_alloc (thread_index);
388   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
389   s->session_state = SESSION_STATE_CLOSED;
390
391   /* Attach transport to session and vice versa */
392   s->connection_index = tc->c_index;
393   tc->s_index = s->session_index;
394   return s;
395 }
396
397 session_t *
398 session_alloc_for_half_open (transport_connection_t *tc)
399 {
400   session_t *s;
401
402   s = ho_session_alloc ();
403   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
404   s->connection_index = tc->c_index;
405   tc->s_index = s->session_index;
406   return s;
407 }
408
409 /**
410  * Discards bytes from buffer chain
411  *
412  * It discards n_bytes_to_drop starting at first buffer after chain_b
413  */
414 always_inline void
415 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
416                                      vlib_buffer_t ** chain_b,
417                                      u32 n_bytes_to_drop)
418 {
419   vlib_buffer_t *next = *chain_b;
420   u32 to_drop = n_bytes_to_drop;
421   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
422   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
423     {
424       next = vlib_get_buffer (vm, next->next_buffer);
425       if (next->current_length > to_drop)
426         {
427           vlib_buffer_advance (next, to_drop);
428           to_drop = 0;
429         }
430       else
431         {
432           to_drop -= next->current_length;
433           next->current_length = 0;
434         }
435     }
436   *chain_b = next;
437
438   if (to_drop == 0)
439     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
440 }
441
442 /**
443  * Enqueue buffer chain tail
444  */
445 always_inline int
446 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
447                             u32 offset, u8 is_in_order)
448 {
449   vlib_buffer_t *chain_b;
450   u32 chain_bi, len, diff;
451   vlib_main_t *vm = vlib_get_main ();
452   u8 *data;
453   u32 written = 0;
454   int rv = 0;
455
456   if (is_in_order && offset)
457     {
458       diff = offset - b->current_length;
459       if (diff > b->total_length_not_including_first_buffer)
460         return 0;
461       chain_b = b;
462       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
463       chain_bi = vlib_get_buffer_index (vm, chain_b);
464     }
465   else
466     chain_bi = b->next_buffer;
467
468   do
469     {
470       chain_b = vlib_get_buffer (vm, chain_bi);
471       data = vlib_buffer_get_current (chain_b);
472       len = chain_b->current_length;
473       if (!len)
474         continue;
475       if (is_in_order)
476         {
477           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
478           if (rv == len)
479             {
480               written += rv;
481             }
482           else if (rv < len)
483             {
484               return (rv > 0) ? (written + rv) : written;
485             }
486           else if (rv > len)
487             {
488               written += rv;
489
490               /* written more than what was left in chain */
491               if (written > b->total_length_not_including_first_buffer)
492                 return written;
493
494               /* drop the bytes that have already been delivered */
495               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
496             }
497         }
498       else
499         {
500           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
501           if (rv)
502             {
503               clib_warning ("failed to enqueue multi-buffer seg");
504               return -1;
505             }
506           offset += len;
507         }
508     }
509   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
510           ? chain_b->next_buffer : 0));
511
512   if (is_in_order)
513     return written;
514
515   return 0;
516 }
517
518 void
519 session_fifo_tuning (session_t * s, svm_fifo_t * f,
520                      session_ft_action_t act, u32 len)
521 {
522   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
523     {
524       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
525       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
526       if (CLIB_ASSERT_ENABLE)
527         {
528           segment_manager_t *sm;
529           sm = segment_manager_get (f->segment_manager);
530           ASSERT (f->shr->size >= 4096);
531           ASSERT (f->shr->size <= sm->max_fifo_size);
532         }
533     }
534 }
535
536 /*
537  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
538  * event but on request can queue notification events for later delivery by
539  * calling stream_server_flush_enqueue_events().
540  *
541  * @param tc Transport connection which is to be enqueued data
542  * @param b Buffer to be enqueued
543  * @param offset Offset at which to start enqueueing if out-of-order
544  * @param queue_event Flag to indicate if peer is to be notified or if event
545  *                    is to be queued. The former is useful when more data is
546  *                    enqueued and only one event is to be generated.
547  * @param is_in_order Flag to indicate if data is in order
548  * @return Number of bytes enqueued or a negative value if enqueueing failed.
549  */
550 int
551 session_enqueue_stream_connection (transport_connection_t * tc,
552                                    vlib_buffer_t * b, u32 offset,
553                                    u8 queue_event, u8 is_in_order)
554 {
555   session_t *s;
556   int enqueued = 0, rv, in_order_off;
557
558   s = session_get (tc->s_index, tc->thread_index);
559
560   if (is_in_order)
561     {
562       enqueued = svm_fifo_enqueue (s->rx_fifo,
563                                    b->current_length,
564                                    vlib_buffer_get_current (b));
565       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
566                          && enqueued >= 0))
567         {
568           in_order_off = enqueued > b->current_length ? enqueued : 0;
569           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
570           if (rv > 0)
571             enqueued += rv;
572         }
573     }
574   else
575     {
576       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
577                                          b->current_length,
578                                          vlib_buffer_get_current (b));
579       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
580         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
581       /* if something was enqueued, report even this as success for ooo
582        * segment handling */
583       return rv;
584     }
585
586   if (queue_event)
587     {
588       /* Queue RX event on this fifo. Eventually these will need to be flushed
589        * by calling stream_server_flush_enqueue_events () */
590       session_worker_t *wrk;
591
592       wrk = session_main_get_worker (s->thread_index);
593       if (!(s->flags & SESSION_F_RX_EVT))
594         {
595           s->flags |= SESSION_F_RX_EVT;
596           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
597         }
598
599       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
600     }
601
602   return enqueued;
603 }
604
605 int
606 session_enqueue_dgram_connection (session_t * s,
607                                   session_dgram_hdr_t * hdr,
608                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
609 {
610   int rv;
611
612   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
613           >= b->current_length + sizeof (*hdr));
614
615   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
616     {
617       /* *INDENT-OFF* */
618       svm_fifo_seg_t segs[2] = {
619           { (u8 *) hdr, sizeof (*hdr) },
620           { vlib_buffer_get_current (b), b->current_length }
621       };
622       /* *INDENT-ON* */
623
624       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
625                                       0 /* allow_partial */ );
626     }
627   else
628     {
629       vlib_main_t *vm = vlib_get_main ();
630       svm_fifo_seg_t *segs = 0, *seg;
631       vlib_buffer_t *it = b;
632       u32 n_segs = 1;
633
634       vec_add2 (segs, seg, 1);
635       seg->data = (u8 *) hdr;
636       seg->len = sizeof (*hdr);
637       while (it)
638         {
639           vec_add2 (segs, seg, 1);
640           seg->data = vlib_buffer_get_current (it);
641           seg->len = it->current_length;
642           n_segs++;
643           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
644             break;
645           it = vlib_get_buffer (vm, it->next_buffer);
646         }
647       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
648                                       0 /* allow partial */ );
649       vec_free (segs);
650     }
651
652   if (queue_event && rv > 0)
653     {
654       /* Queue RX event on this fifo. Eventually these will need to be flushed
655        * by calling stream_server_flush_enqueue_events () */
656       session_worker_t *wrk;
657
658       wrk = session_main_get_worker (s->thread_index);
659       if (!(s->flags & SESSION_F_RX_EVT))
660         {
661           s->flags |= SESSION_F_RX_EVT;
662           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
663         }
664
665       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
666     }
667   return rv > 0 ? rv : 0;
668 }
669
670 int
671 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
672                             u32 offset, u32 max_bytes)
673 {
674   session_t *s = session_get (tc->s_index, tc->thread_index);
675   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
676 }
677
678 u32
679 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
680 {
681   session_t *s = session_get (tc->s_index, tc->thread_index);
682   u32 rv;
683
684   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
685   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
686
687   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
688     session_dequeue_notify (s);
689
690   return rv;
691 }
692
693 static inline int
694 session_notify_subscribers (u32 app_index, session_t * s,
695                             svm_fifo_t * f, session_evt_type_t evt_type)
696 {
697   app_worker_t *app_wrk;
698   application_t *app;
699   int i;
700
701   app = application_get (app_index);
702   if (!app)
703     return -1;
704
705   for (i = 0; i < f->shr->n_subscribers; i++)
706     {
707       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
708       if (!app_wrk)
709         continue;
710       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
711         return -1;
712     }
713
714   return 0;
715 }
716
717 /**
718  * Notify session peer that new data has been enqueued.
719  *
720  * @param s     Stream session for which the event is to be generated.
721  * @param lock  Flag to indicate if call should lock message queue.
722  *
723  * @return 0 on success or negative number if failed to send notification.
724  */
725 static inline int
726 session_enqueue_notify_inline (session_t * s)
727 {
728   app_worker_t *app_wrk;
729   u32 session_index;
730   u8 n_subscribers;
731
732   session_index = s->session_index;
733   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
734
735   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
736   if (PREDICT_FALSE (!app_wrk))
737     {
738       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
739       return 0;
740     }
741
742   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
743
744   s->flags &= ~SESSION_F_RX_EVT;
745
746   /* Application didn't confirm accept yet */
747   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
748     return 0;
749
750   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
751                                                      SESSION_IO_EVT_RX)))
752     return -1;
753
754   if (PREDICT_FALSE (n_subscribers))
755     {
756       s = session_get (session_index, vlib_get_thread_index ());
757       return session_notify_subscribers (app_wrk->app_index, s,
758                                          s->rx_fifo, SESSION_IO_EVT_RX);
759     }
760
761   return 0;
762 }
763
764 int
765 session_enqueue_notify (session_t * s)
766 {
767   return session_enqueue_notify_inline (s);
768 }
769
770 static void
771 session_enqueue_notify_rpc (void *arg)
772 {
773   u32 session_index = pointer_to_uword (arg);
774   session_t *s;
775
776   s = session_get_if_valid (session_index, vlib_get_thread_index ());
777   if (!s)
778     return;
779
780   session_enqueue_notify (s);
781 }
782
783 /**
784  * Like session_enqueue_notify, but can be called from a thread that does not
785  * own the session.
786  */
787 void
788 session_enqueue_notify_thread (session_handle_t sh)
789 {
790   u32 thread_index = session_thread_from_handle (sh);
791   u32 session_index = session_index_from_handle (sh);
792
793   /*
794    * Pass session index (u32) as opposed to handle (u64) in case pointers
795    * are not 64-bit.
796    */
797   session_send_rpc_evt_to_thread (thread_index,
798                                   session_enqueue_notify_rpc,
799                                   uword_to_pointer (session_index, void *));
800 }
801
802 int
803 session_dequeue_notify (session_t * s)
804 {
805   app_worker_t *app_wrk;
806
807   svm_fifo_clear_deq_ntf (s->tx_fifo);
808
809   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
810   if (PREDICT_FALSE (!app_wrk))
811     return -1;
812
813   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
814                                                      SESSION_IO_EVT_TX)))
815     return -1;
816
817   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
818     return session_notify_subscribers (app_wrk->app_index, s,
819                                        s->tx_fifo, SESSION_IO_EVT_TX);
820
821   return 0;
822 }
823
824 /**
825  * Flushes queue of sessions that are to be notified of new data
826  * enqueued events.
827  *
828  * @param thread_index Thread index for which the flush is to be performed.
829  * @return 0 on success or a positive number indicating the number of
830  *         failures due to API queue being full.
831  */
832 int
833 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
834 {
835   session_worker_t *wrk = session_main_get_worker (thread_index);
836   session_t *s;
837   int i, errors = 0;
838   u32 *indices;
839
840   indices = wrk->session_to_enqueue[transport_proto];
841
842   for (i = 0; i < vec_len (indices); i++)
843     {
844       s = session_get_if_valid (indices[i], thread_index);
845       if (PREDICT_FALSE (!s))
846         {
847           errors++;
848           continue;
849         }
850
851       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
852                            0 /* TODO/not needed */ );
853
854       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
855         errors++;
856     }
857
858   vec_reset_length (indices);
859   wrk->session_to_enqueue[transport_proto] = indices;
860
861   return errors;
862 }
863
864 int
865 session_main_flush_all_enqueue_events (u8 transport_proto)
866 {
867   vlib_thread_main_t *vtm = vlib_get_thread_main ();
868   int i, errors = 0;
869   for (i = 0; i < 1 + vtm->n_threads; i++)
870     errors += session_main_flush_enqueue_events (transport_proto, i);
871   return errors;
872 }
873
874 int
875 session_stream_connect_notify (transport_connection_t * tc,
876                                session_error_t err)
877 {
878   u32 opaque = 0, new_ti, new_si;
879   app_worker_t *app_wrk;
880   session_t *s = 0, *ho;
881
882   /*
883    * Cleanup half-open table
884    */
885   session_lookup_del_half_open (tc);
886
887   ho = ho_session_get (tc->s_index);
888   opaque = ho->opaque;
889   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
890   if (!app_wrk)
891     return -1;
892
893   if (err)
894     return app_worker_connect_notify (app_wrk, s, err, opaque);
895
896   s = session_alloc_for_connection (tc);
897   s->session_state = SESSION_STATE_CONNECTING;
898   s->app_wrk_index = app_wrk->wrk_index;
899   new_si = s->session_index;
900   new_ti = s->thread_index;
901
902   if ((err = app_worker_init_connected (app_wrk, s)))
903     {
904       session_free (s);
905       app_worker_connect_notify (app_wrk, 0, err, opaque);
906       return -1;
907     }
908
909   s = session_get (new_si, new_ti);
910   s->session_state = SESSION_STATE_READY;
911   session_lookup_add_connection (tc, session_handle (s));
912
913   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
914     {
915       session_lookup_del_connection (tc);
916       /* Avoid notifying app about rejected session cleanup */
917       s = session_get (new_si, new_ti);
918       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
919       session_free (s);
920       return -1;
921     }
922
923   return 0;
924 }
925
926 typedef union session_switch_pool_reply_args_
927 {
928   struct
929   {
930     u32 session_index;
931     u16 thread_index;
932     u8 is_closed;
933   };
934   u64 as_u64;
935 } session_switch_pool_reply_args_t;
936
937 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
938                "switch pool reply args size");
939
940 static void
941 session_switch_pool_reply (void *arg)
942 {
943   session_switch_pool_reply_args_t rargs;
944   session_t *s;
945
946   rargs.as_u64 = pointer_to_uword (arg);
947   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
948   if (!s)
949     return;
950
951   /* Session closed during migration. Clean everything up */
952   if (rargs.is_closed)
953     {
954       transport_cleanup (session_get_transport_proto (s), s->connection_index,
955                          s->thread_index);
956       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
957       session_free (s);
958       return;
959     }
960
961   /* Notify app that it has data on the new session */
962   session_enqueue_notify (s);
963 }
964
965 typedef struct _session_switch_pool_args
966 {
967   u32 session_index;
968   u32 thread_index;
969   u32 new_thread_index;
970   u32 new_session_index;
971 } session_switch_pool_args_t;
972
973 /**
974  * Notify old thread of the session pool switch
975  */
976 static void
977 session_switch_pool (void *cb_args)
978 {
979   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
980   session_switch_pool_reply_args_t rargs;
981   session_handle_t new_sh;
982   segment_manager_t *sm;
983   app_worker_t *app_wrk;
984   session_t *s;
985
986   ASSERT (args->thread_index == vlib_get_thread_index ());
987   s = session_get (args->session_index, args->thread_index);
988
989   /* Check if session closed during migration */
990   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
991
992   transport_cleanup (session_get_transport_proto (s), s->connection_index,
993                      s->thread_index);
994
995   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
996   if (app_wrk)
997     {
998       /* Cleanup fifo segment slice state for fifos */
999       sm = app_worker_get_connect_segment_manager (app_wrk);
1000       segment_manager_detach_fifo (sm, &s->rx_fifo);
1001       segment_manager_detach_fifo (sm, &s->tx_fifo);
1002
1003       /* Notify app, using old session, about the migration event */
1004       if (!rargs.is_closed)
1005         {
1006           new_sh = session_make_handle (args->new_session_index,
1007                                         args->new_thread_index);
1008           app_worker_migrate_notify (app_wrk, s, new_sh);
1009         }
1010     }
1011
1012   /* Trigger app read and fifo updates on the new thread */
1013   rargs.session_index = args->new_session_index;
1014   rargs.thread_index = args->new_thread_index;
1015   session_send_rpc_evt_to_thread (args->new_thread_index,
1016                                   session_switch_pool_reply,
1017                                   uword_to_pointer (rargs.as_u64, void *));
1018
1019   session_free (s);
1020   clib_mem_free (cb_args);
1021 }
1022
1023 /**
1024  * Move dgram session to the right thread
1025  */
1026 int
1027 session_dgram_connect_notify (transport_connection_t * tc,
1028                               u32 old_thread_index, session_t ** new_session)
1029 {
1030   session_t *new_s;
1031   session_switch_pool_args_t *rpc_args;
1032   segment_manager_t *sm;
1033   app_worker_t *app_wrk;
1034
1035   /*
1036    * Clone half-open session to the right thread.
1037    */
1038   new_s = session_clone_safe (tc->s_index, old_thread_index);
1039   new_s->connection_index = tc->c_index;
1040   new_s->session_state = SESSION_STATE_READY;
1041   new_s->flags |= SESSION_F_IS_MIGRATING;
1042
1043   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1044     session_lookup_add_connection (tc, session_handle (new_s));
1045
1046   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1047   if (app_wrk)
1048     {
1049       /* New set of fifos attached to the same shared memory */
1050       sm = app_worker_get_connect_segment_manager (app_wrk);
1051       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1052       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1053     }
1054
1055   /*
1056    * Ask thread owning the old session to clean it up and make us the tx
1057    * fifo owner
1058    */
1059   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1060   rpc_args->new_session_index = new_s->session_index;
1061   rpc_args->new_thread_index = new_s->thread_index;
1062   rpc_args->session_index = tc->s_index;
1063   rpc_args->thread_index = old_thread_index;
1064   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1065                                   rpc_args);
1066
1067   tc->s_index = new_s->session_index;
1068   new_s->connection_index = tc->c_index;
1069   *new_session = new_s;
1070   return 0;
1071 }
1072
1073 /**
1074  * Notification from transport that connection is being closed.
1075  *
1076  * A disconnect is sent to application but state is not removed. Once
1077  * disconnect is acknowledged by application, session disconnect is called.
1078  * Ultimately this leads to close being called on transport (passive close).
1079  */
1080 void
1081 session_transport_closing_notify (transport_connection_t * tc)
1082 {
1083   app_worker_t *app_wrk;
1084   session_t *s;
1085
1086   s = session_get (tc->s_index, tc->thread_index);
1087   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1088     return;
1089
1090   /* Wait for reply from app before sending notification as the
1091    * accept might be rejected */
1092   if (s->session_state == SESSION_STATE_ACCEPTING)
1093     {
1094       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1095       return;
1096     }
1097
1098   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1099   app_wrk = app_worker_get (s->app_wrk_index);
1100   app_worker_close_notify (app_wrk, s);
1101 }
1102
1103 /**
1104  * Notification from transport that connection is being deleted
1105  *
1106  * This removes the session if it is still valid. It should be called only on
1107  * previously fully established sessions. For instance failed connects should
1108  * call stream_session_connect_notify and indicate that the connect has
1109  * failed.
1110  */
1111 void
1112 session_transport_delete_notify (transport_connection_t * tc)
1113 {
1114   session_t *s;
1115
1116   /* App might've been removed already */
1117   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1118     return;
1119
1120   switch (s->session_state)
1121     {
1122     case SESSION_STATE_CREATED:
1123       /* Session was created but accept notification was not yet sent to the
1124        * app. Cleanup everything. */
1125       session_lookup_del_session (s);
1126       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1127       session_free (s);
1128       break;
1129     case SESSION_STATE_ACCEPTING:
1130     case SESSION_STATE_TRANSPORT_CLOSING:
1131     case SESSION_STATE_CLOSING:
1132     case SESSION_STATE_TRANSPORT_CLOSED:
1133       /* If transport finishes or times out before we get a reply
1134        * from the app, mark transport as closed and wait for reply
1135        * before removing the session. Cleanup session table in advance
1136        * because transport will soon be closed and closed sessions
1137        * are assumed to have been removed from the lookup table */
1138       session_lookup_del_session (s);
1139       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1140       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1141       svm_fifo_dequeue_drop_all (s->tx_fifo);
1142       break;
1143     case SESSION_STATE_APP_CLOSED:
1144       /* Cleanup lookup table as transport needs to still be valid.
1145        * Program transport close to ensure that all session events
1146        * have been cleaned up. Once transport close is called, the
1147        * session is just removed because both transport and app have
1148        * confirmed the close*/
1149       session_lookup_del_session (s);
1150       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1151       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1152       svm_fifo_dequeue_drop_all (s->tx_fifo);
1153       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1154       break;
1155     case SESSION_STATE_TRANSPORT_DELETED:
1156       break;
1157     case SESSION_STATE_CLOSED:
1158       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1159       session_delete (s);
1160       break;
1161     default:
1162       clib_warning ("session state %u", s->session_state);
1163       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1164       session_delete (s);
1165       break;
1166     }
1167 }
1168
1169 /**
1170  * Notification from transport that it is closed
1171  *
1172  * Should be called by transport, prior to calling delete notify, once it
1173  * knows that no more data will be exchanged. This could serve as an
1174  * early acknowledgment of an active close especially if transport delete
1175  * can be delayed a long time, e.g., tcp time-wait.
1176  */
1177 void
1178 session_transport_closed_notify (transport_connection_t * tc)
1179 {
1180   app_worker_t *app_wrk;
1181   session_t *s;
1182
1183   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1184     return;
1185
1186   /* Transport thinks that app requested close but it actually didn't.
1187    * Can happen for tcp:
1188    * 1)if fin and rst are received in close succession.
1189    * 2)if app shutdown the connection.  */
1190   if (s->session_state == SESSION_STATE_READY)
1191     {
1192       session_transport_closing_notify (tc);
1193       svm_fifo_dequeue_drop_all (s->tx_fifo);
1194       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1195     }
1196   /* If app close has not been received or has not yet resulted in
1197    * a transport close, only mark the session transport as closed */
1198   else if (s->session_state <= SESSION_STATE_CLOSING)
1199     {
1200       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1201     }
1202   /* If app also closed, switch to closed */
1203   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1204     s->session_state = SESSION_STATE_CLOSED;
1205
1206   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1207   if (app_wrk)
1208     app_worker_transport_closed_notify (app_wrk, s);
1209 }
1210
1211 /**
1212  * Notify application that connection has been reset.
1213  */
1214 void
1215 session_transport_reset_notify (transport_connection_t * tc)
1216 {
1217   app_worker_t *app_wrk;
1218   session_t *s;
1219
1220   s = session_get (tc->s_index, tc->thread_index);
1221   svm_fifo_dequeue_drop_all (s->tx_fifo);
1222   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1223     return;
1224   if (s->session_state == SESSION_STATE_ACCEPTING)
1225     {
1226       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1227       return;
1228     }
1229   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1230   app_wrk = app_worker_get (s->app_wrk_index);
1231   app_worker_reset_notify (app_wrk, s);
1232 }
1233
1234 int
1235 session_stream_accept_notify (transport_connection_t * tc)
1236 {
1237   app_worker_t *app_wrk;
1238   session_t *s;
1239
1240   s = session_get (tc->s_index, tc->thread_index);
1241   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1242   if (!app_wrk)
1243     return -1;
1244   if (s->session_state != SESSION_STATE_CREATED)
1245     return 0;
1246   s->session_state = SESSION_STATE_ACCEPTING;
1247   if (app_worker_accept_notify (app_wrk, s))
1248     {
1249       /* On transport delete, no notifications should be sent. Unless, the
1250        * accept is retried and successful. */
1251       s->session_state = SESSION_STATE_CREATED;
1252       return -1;
1253     }
1254   return 0;
1255 }
1256
1257 /**
1258  * Accept a stream session. Optionally ping the server by callback.
1259  */
1260 int
1261 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1262                        u32 thread_index, u8 notify)
1263 {
1264   session_t *s;
1265   int rv;
1266
1267   s = session_alloc_for_connection (tc);
1268   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1269   s->session_state = SESSION_STATE_CREATED;
1270
1271   if ((rv = app_worker_init_accepted (s)))
1272     {
1273       session_free (s);
1274       return rv;
1275     }
1276
1277   session_lookup_add_connection (tc, session_handle (s));
1278
1279   /* Shoulder-tap the server */
1280   if (notify)
1281     {
1282       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1283       if ((rv = app_worker_accept_notify (app_wrk, s)))
1284         {
1285           session_lookup_del_session (s);
1286           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1287           session_free (s);
1288           return rv;
1289         }
1290     }
1291
1292   return 0;
1293 }
1294
1295 int
1296 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1297                       u32 thread_index)
1298 {
1299   app_worker_t *app_wrk;
1300   session_t *s;
1301   int rv;
1302
1303   s = session_alloc_for_connection (tc);
1304   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1305
1306   if ((rv = app_worker_init_accepted (s)))
1307     {
1308       session_free (s);
1309       return rv;
1310     }
1311
1312   session_lookup_add_connection (tc, session_handle (s));
1313   s->session_state = SESSION_STATE_ACCEPTING;
1314
1315   app_wrk = app_worker_get (s->app_wrk_index);
1316   if ((rv = app_worker_accept_notify (app_wrk, s)))
1317     {
1318       session_lookup_del_session (s);
1319       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1320       session_free (s);
1321       return rv;
1322     }
1323
1324   return 0;
1325 }
1326
1327 int
1328 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1329 {
1330   transport_connection_t *tc;
1331   transport_endpoint_cfg_t *tep;
1332   app_worker_t *app_wrk;
1333   session_handle_t sh;
1334   session_t *s;
1335   int rv;
1336
1337   tep = session_endpoint_to_transport_cfg (rmt);
1338   rv = transport_connect (rmt->transport_proto, tep);
1339   if (rv < 0)
1340     {
1341       SESSION_DBG ("Transport failed to open connection.");
1342       return rv;
1343     }
1344
1345   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1346
1347   /* For dgram type of service, allocate session and fifos now */
1348   app_wrk = app_worker_get (rmt->app_wrk_index);
1349   s = session_alloc_for_connection (tc);
1350   s->app_wrk_index = app_wrk->wrk_index;
1351   s->session_state = SESSION_STATE_OPENED;
1352   if (app_worker_init_connected (app_wrk, s))
1353     {
1354       session_free (s);
1355       return -1;
1356     }
1357
1358   sh = session_handle (s);
1359   *rsh = sh;
1360
1361   session_lookup_add_connection (tc, sh);
1362   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1363 }
1364
1365 int
1366 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1367 {
1368   transport_connection_t *tc;
1369   transport_endpoint_cfg_t *tep;
1370   app_worker_t *app_wrk;
1371   session_t *ho;
1372   int rv;
1373
1374   tep = session_endpoint_to_transport_cfg (rmt);
1375   rv = transport_connect (rmt->transport_proto, tep);
1376   if (rv < 0)
1377     {
1378       SESSION_DBG ("Transport failed to open connection.");
1379       return rv;
1380     }
1381
1382   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1383
1384   app_wrk = app_worker_get (rmt->app_wrk_index);
1385
1386   /* If transport offers a vc service, only allocate established
1387    * session once the connection has been established.
1388    * In the meantime allocate half-open session for tracking purposes
1389    * associate half-open connection to it and add session to app-worker
1390    * half-open table. These are needed to allocate the established
1391    * session on transport notification, and to cleanup the half-open
1392    * session if the app detaches before connection establishment.
1393    */
1394   ho = session_alloc_for_half_open (tc);
1395   ho->app_wrk_index = app_wrk->wrk_index;
1396   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1397   ho->opaque = rmt->opaque;
1398   *rsh = session_handle (ho);
1399
1400   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1401     session_lookup_add_half_open (tc, tc->c_index);
1402
1403   return 0;
1404 }
1405
1406 int
1407 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1408 {
1409   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1410
1411   /* Not supported for now */
1412   *rsh = SESSION_INVALID_HANDLE;
1413   return transport_connect (rmt->transport_proto, tep_cfg);
1414 }
1415
1416 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1417                                         session_handle_t *);
1418
1419 /* *INDENT-OFF* */
1420 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1421   session_open_vc,
1422   session_open_cl,
1423   session_open_app,
1424 };
1425 /* *INDENT-ON* */
1426
1427 /**
1428  * Ask transport to open connection to remote transport endpoint.
1429  *
1430  * Stores handle for matching request with reply since the call can be
1431  * asynchronous. For instance, for TCP the 3-way handshake must complete
1432  * before reply comes. Session is only created once connection is established.
1433  *
1434  * @param app_index Index of the application requesting the connect
1435  * @param st Session type requested.
1436  * @param tep Remote transport endpoint
1437  * @param opaque Opaque data (typically, api_context) the application expects
1438  *               on open completion.
1439  */
1440 int
1441 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1442 {
1443   transport_service_type_t tst;
1444   tst = transport_protocol_service_type (rmt->transport_proto);
1445   return session_open_srv_fns[tst](rmt, rsh);
1446 }
1447
1448 /**
1449  * Ask transport to listen on session endpoint.
1450  *
1451  * @param s Session for which listen will be called. Note that unlike
1452  *          established sessions, listen sessions are not associated to a
1453  *          thread.
1454  * @param sep Local endpoint to be listened on.
1455  */
1456 int
1457 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1458 {
1459   transport_endpoint_cfg_t *tep;
1460   int tc_index;
1461   u32 s_index;
1462
1463   /* Transport bind/listen */
1464   tep = session_endpoint_to_transport_cfg (sep);
1465   s_index = ls->session_index;
1466   tc_index = transport_start_listen (session_get_transport_proto (ls),
1467                                      s_index, tep);
1468
1469   if (tc_index < 0)
1470     return tc_index;
1471
1472   /* Attach transport to session. Lookup tables are populated by the app
1473    * worker because local tables (for ct sessions) are not backed by a fib */
1474   ls = listen_session_get (s_index);
1475   ls->connection_index = tc_index;
1476   ls->opaque = sep->opaque;
1477
1478   return 0;
1479 }
1480
1481 /**
1482  * Ask transport to stop listening on local transport endpoint.
1483  *
1484  * @param s Session to stop listening on. It must be in state LISTENING.
1485  */
1486 int
1487 session_stop_listen (session_t * s)
1488 {
1489   transport_proto_t tp = session_get_transport_proto (s);
1490   transport_connection_t *tc;
1491
1492   if (s->session_state != SESSION_STATE_LISTENING)
1493     return SESSION_E_NOLISTEN;
1494
1495   tc = transport_get_listener (tp, s->connection_index);
1496
1497   /* If no transport, assume everything was cleaned up already */
1498   if (!tc)
1499     return SESSION_E_NONE;
1500
1501   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1502     session_lookup_del_connection (tc);
1503
1504   transport_stop_listen (tp, s->connection_index);
1505   return 0;
1506 }
1507
1508 /**
1509  * Initialize session half-closing procedure.
1510  *
1511  * Note that half-closing will not change the state of the session.
1512  */
1513 void
1514 session_half_close (session_t *s)
1515 {
1516   if (!s)
1517     return;
1518
1519   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1520 }
1521
1522 /**
1523  * Initialize session closing procedure.
1524  *
1525  * Request is always sent to session node to ensure that all outstanding
1526  * requests are served before transport is notified.
1527  */
1528 void
1529 session_close (session_t * s)
1530 {
1531   if (!s)
1532     return;
1533
1534   if (s->session_state >= SESSION_STATE_CLOSING)
1535     {
1536       /* Session will only be removed once both app and transport
1537        * acknowledge the close */
1538       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1539           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1540         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1541       return;
1542     }
1543
1544   /* App closed so stop propagating dequeue notifications */
1545   svm_fifo_clear_deq_ntf (s->tx_fifo);
1546   s->session_state = SESSION_STATE_CLOSING;
1547   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1548 }
1549
1550 /**
1551  * Force a close without waiting for data to be flushed
1552  */
1553 void
1554 session_reset (session_t * s)
1555 {
1556   if (s->session_state >= SESSION_STATE_CLOSING)
1557     return;
1558   /* Drop all outstanding tx data */
1559   svm_fifo_dequeue_drop_all (s->tx_fifo);
1560   s->session_state = SESSION_STATE_CLOSING;
1561   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1562 }
1563
1564 /**
1565  * Notify transport the session can be half-disconnected.
1566  *
1567  * Must be called from the session's thread.
1568  */
1569 void
1570 session_transport_half_close (session_t *s)
1571 {
1572   /* Only READY session can be half-closed */
1573   if (s->session_state != SESSION_STATE_READY)
1574     {
1575       return;
1576     }
1577
1578   transport_half_close (session_get_transport_proto (s), s->connection_index,
1579                         s->thread_index);
1580 }
1581
1582 /**
1583  * Notify transport the session can be disconnected. This should eventually
1584  * result in a delete notification that allows us to cleanup session state.
1585  * Called for both active/passive disconnects.
1586  *
1587  * Must be called from the session's thread.
1588  */
1589 void
1590 session_transport_close (session_t * s)
1591 {
1592   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1593     {
1594       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1595         s->session_state = SESSION_STATE_CLOSED;
1596       /* If transport is already deleted, just free the session */
1597       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1598         session_free_w_fifos (s);
1599       return;
1600     }
1601
1602   /* If the tx queue wasn't drained, the transport can continue to try
1603    * sending the outstanding data (in closed state it cannot). It MUST however
1604    * at one point, either after sending everything or after a timeout, call
1605    * delete notify. This will finally lead to the complete cleanup of the
1606    * session.
1607    */
1608   s->session_state = SESSION_STATE_APP_CLOSED;
1609
1610   transport_close (session_get_transport_proto (s), s->connection_index,
1611                    s->thread_index);
1612 }
1613
1614 /**
1615  * Force transport close
1616  */
1617 void
1618 session_transport_reset (session_t * s)
1619 {
1620   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1621     {
1622       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1623         s->session_state = SESSION_STATE_CLOSED;
1624       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1625         session_free_w_fifos (s);
1626       return;
1627     }
1628
1629   s->session_state = SESSION_STATE_APP_CLOSED;
1630   transport_reset (session_get_transport_proto (s), s->connection_index,
1631                    s->thread_index);
1632 }
1633
1634 /**
1635  * Cleanup transport and session state.
1636  *
1637  * Notify transport of the cleanup and free the session. This should
1638  * be called only if transport reported some error and is already
1639  * closed.
1640  */
1641 void
1642 session_transport_cleanup (session_t * s)
1643 {
1644   /* Delete from main lookup table before we axe the the transport */
1645   session_lookup_del_session (s);
1646   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1647     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1648                        s->thread_index);
1649   /* Since we called cleanup, no delete notification will come. So, make
1650    * sure the session is properly freed. */
1651   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1652   session_free (s);
1653 }
1654
1655 /**
1656  * Allocate worker mqs in share-able segment
1657  *
1658  * That can only be a newly created memfd segment, that must be mapped
1659  * by all apps/stack users unless private rx mqs are enabled.
1660  */
1661 void
1662 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1663 {
1664   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1665   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1666   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1667   uword mqs_seg_size;
1668   int i;
1669
1670   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1671
1672   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1673     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1674   };
1675   cfg->consumer_pid = 0;
1676   cfg->n_rings = 2;
1677   cfg->q_nitems = mq_q_length;
1678   cfg->ring_cfgs = rc;
1679
1680   /*
1681    * Compute mqs segment size based on rings config and leave space
1682    * for passing extended configuration messages, i.e., data allocated
1683    * outside of the rings. If provided with a config value, accept it
1684    * if larger than minimum size.
1685    */
1686   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1687   mqs_seg_size = mqs_seg_size + (1 << 20);
1688   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1689
1690   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1691   mqs_seg->ssvm.my_pid = getpid ();
1692   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1693
1694   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1695     {
1696       clib_warning ("failed to initialize queue segment");
1697       return;
1698     }
1699
1700   fifo_segment_init (mqs_seg);
1701
1702   /* Special fifo segment that's filled only with mqs */
1703   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1704
1705   for (i = 0; i < vec_len (smm->wrk); i++)
1706     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1707 }
1708
1709 fifo_segment_t *
1710 session_main_get_wrk_mqs_segment (void)
1711 {
1712   return &session_main.wrk_mqs_segment;
1713 }
1714
1715 u64
1716 session_segment_handle (session_t * s)
1717 {
1718   svm_fifo_t *f;
1719
1720   if (!s->rx_fifo)
1721     return SESSION_INVALID_HANDLE;
1722
1723   f = s->rx_fifo;
1724   return segment_manager_make_segment_handle (f->segment_manager,
1725                                               f->segment_index);
1726 }
1727
1728 /* *INDENT-OFF* */
1729 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1730     session_tx_fifo_peek_and_snd,
1731     session_tx_fifo_dequeue_and_snd,
1732     session_tx_fifo_dequeue_internal,
1733     session_tx_fifo_dequeue_and_snd
1734 };
1735 /* *INDENT-ON* */
1736
1737 void
1738 session_register_transport (transport_proto_t transport_proto,
1739                             const transport_proto_vft_t * vft, u8 is_ip4,
1740                             u32 output_node)
1741 {
1742   session_main_t *smm = &session_main;
1743   session_type_t session_type;
1744   u32 next_index = ~0;
1745
1746   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1747
1748   vec_validate (smm->session_type_to_next, session_type);
1749   vec_validate (smm->session_tx_fns, session_type);
1750
1751   if (output_node != ~0)
1752     next_index = vlib_node_add_next (vlib_get_main (),
1753                                      session_queue_node.index, output_node);
1754
1755   smm->session_type_to_next[session_type] = next_index;
1756   smm->session_tx_fns[session_type] =
1757     session_tx_fns[vft->transport_options.tx_type];
1758 }
1759
1760 void
1761 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1762 {
1763   session_main_t *smm = &session_main;
1764   session_update_time_fn *fi;
1765   u32 fi_pos = ~0;
1766   u8 found = 0;
1767
1768   vec_foreach (fi, smm->update_time_fns)
1769     {
1770       if (*fi == fn)
1771         {
1772           fi_pos = fi - smm->update_time_fns;
1773           found = 1;
1774           break;
1775         }
1776     }
1777
1778   if (is_add)
1779     {
1780       if (found)
1781         {
1782           clib_warning ("update time fn %p already registered", fn);
1783           return;
1784         }
1785       vec_add1 (smm->update_time_fns, fn);
1786     }
1787   else
1788     {
1789       vec_del1 (smm->update_time_fns, fi_pos);
1790     }
1791 }
1792
1793 transport_proto_t
1794 session_add_transport_proto (void)
1795 {
1796   session_main_t *smm = &session_main;
1797   session_worker_t *wrk;
1798   u32 thread;
1799
1800   smm->last_transport_proto_type += 1;
1801
1802   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1803     {
1804       wrk = session_main_get_worker (thread);
1805       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1806     }
1807
1808   return smm->last_transport_proto_type;
1809 }
1810
1811 transport_connection_t *
1812 session_get_transport (session_t * s)
1813 {
1814   if (s->session_state != SESSION_STATE_LISTENING)
1815     return transport_get_connection (session_get_transport_proto (s),
1816                                      s->connection_index, s->thread_index);
1817   else
1818     return transport_get_listener (session_get_transport_proto (s),
1819                                    s->connection_index);
1820 }
1821
1822 void
1823 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1824 {
1825   if (s->session_state != SESSION_STATE_LISTENING)
1826     return transport_get_endpoint (session_get_transport_proto (s),
1827                                    s->connection_index, s->thread_index, tep,
1828                                    is_lcl);
1829   else
1830     return transport_get_listener_endpoint (session_get_transport_proto (s),
1831                                             s->connection_index, tep, is_lcl);
1832 }
1833
1834 int
1835 session_transport_attribute (session_t *s, u8 is_get,
1836                              transport_endpt_attr_t *attr)
1837 {
1838   if (s->session_state < SESSION_STATE_READY)
1839     return -1;
1840
1841   return transport_connection_attribute (session_get_transport_proto (s),
1842                                          s->connection_index, s->thread_index,
1843                                          is_get, attr);
1844 }
1845
1846 transport_connection_t *
1847 listen_session_get_transport (session_t * s)
1848 {
1849   return transport_get_listener (session_get_transport_proto (s),
1850                                  s->connection_index);
1851 }
1852
1853 void
1854 session_queue_run_on_main_thread (vlib_main_t * vm)
1855 {
1856   ASSERT (vlib_get_thread_index () == 0);
1857   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1858 }
1859
1860 static void
1861 session_stats_collector_fn (vlib_stats_collector_data_t *d)
1862 {
1863   u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1864   session_main_t *smm = &session_main;
1865   session_worker_t *wrk;
1866   counter_t **counters;
1867   counter_t *cb;
1868
1869   n_workers = vec_len (smm->wrk);
1870   vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1871   counters = d->entry->data;
1872   cb = counters[0];
1873
1874   for (i = 0; i < vec_len (smm->wrk); i++)
1875     {
1876       wrk = session_main_get_worker (i);
1877       n_wrk_sessions = pool_elts (wrk->sessions);
1878       cb[i] = n_wrk_sessions;
1879       n_sessions += n_wrk_sessions;
1880     }
1881
1882   vlib_stats_set_gauge (d->private_data, n_sessions);
1883 }
1884
1885 static void
1886 session_stats_collector_init (void)
1887 {
1888   vlib_stats_collector_reg_t reg = {};
1889
1890   reg.entry_index =
1891     vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1892   reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1893   reg.collect_fn = session_stats_collector_fn;
1894   vlib_stats_register_collector_fn (&reg);
1895   vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1896 }
1897
1898 static clib_error_t *
1899 session_manager_main_enable (vlib_main_t * vm)
1900 {
1901   session_main_t *smm = &session_main;
1902   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1903   u32 num_threads, preallocated_sessions_per_worker;
1904   session_worker_t *wrk;
1905   int i;
1906
1907   /* We only initialize once and do not de-initialized on disable */
1908   if (smm->is_initialized)
1909     goto done;
1910
1911   num_threads = 1 /* main thread */  + vtm->n_threads;
1912
1913   if (num_threads < 1)
1914     return clib_error_return (0, "n_thread_stacks not set");
1915
1916   /* Allocate cache line aligned worker contexts */
1917   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1918   clib_spinlock_init (&session_main.pool_realloc_lock);
1919
1920   for (i = 0; i < num_threads; i++)
1921     {
1922       wrk = &smm->wrk[i];
1923       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1924       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1925       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1926       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1927       wrk->evts_pending_main =
1928         clib_llist_make_head (wrk->event_elts, evt_list);
1929       wrk->vm = vlib_get_main_by_index (i);
1930       wrk->last_vlib_time = vlib_time_now (vm);
1931       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1932       wrk->timerfd = -1;
1933       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1934
1935       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1936         session_wrk_enable_adaptive_mode (wrk);
1937     }
1938
1939   /* Allocate vpp event queues segment and queue */
1940   session_vpp_wrk_mqs_alloc (smm);
1941
1942   /* Initialize segment manager properties */
1943   segment_manager_main_init ();
1944
1945   /* Preallocate sessions */
1946   if (smm->preallocated_sessions)
1947     {
1948       if (num_threads == 1)
1949         {
1950           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1951         }
1952       else
1953         {
1954           int j;
1955           preallocated_sessions_per_worker =
1956             (1.1 * (f64) smm->preallocated_sessions /
1957              (f64) (num_threads - 1));
1958
1959           for (j = 1; j < num_threads; j++)
1960             {
1961               pool_init_fixed (smm->wrk[j].sessions,
1962                                preallocated_sessions_per_worker);
1963             }
1964         }
1965     }
1966
1967   session_lookup_init ();
1968   app_namespaces_init ();
1969   transport_init ();
1970   session_stats_collector_init ();
1971   smm->is_initialized = 1;
1972
1973 done:
1974
1975   smm->is_enabled = 1;
1976
1977   /* Enable transports */
1978   transport_enable_disable (vm, 1);
1979   session_debug_init ();
1980
1981   return 0;
1982 }
1983
1984 static void
1985 session_manager_main_disable (vlib_main_t * vm)
1986 {
1987   transport_enable_disable (vm, 0 /* is_en */ );
1988 }
1989
1990 /* in this new callback, cookie hint the index */
1991 void
1992 session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
1993 {
1994   session_worker_t *wrk;
1995   wrk = session_main_get_worker (vm->thread_index);
1996   session_dma_transfer *dma_transfer;
1997
1998   dma_transfer = &wrk->dma_trans[wrk->trans_head];
1999   vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2000            vec_len (dma_transfer->pending_tx_buffers));
2001   vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2002            vec_len (dma_transfer->pending_tx_nexts));
2003   vec_reset_length (dma_transfer->pending_tx_buffers);
2004   vec_reset_length (dma_transfer->pending_tx_nexts);
2005   wrk->trans_head++;
2006   if (wrk->trans_head == wrk->trans_size)
2007     wrk->trans_head = 0;
2008   return;
2009 }
2010
2011 static void
2012 session_prepare_dma_args (vlib_dma_config_t *args)
2013 {
2014   args->max_transfers = DMA_TRANS_SIZE;
2015   args->max_transfer_size = 65536;
2016   args->features = 0;
2017   args->sw_fallback = 1;
2018   args->barrier_before_last = 1;
2019   args->callback_fn = session_dma_completion_cb;
2020 }
2021
2022 static void
2023 session_node_enable_dma (u8 is_en, int n_vlibs)
2024 {
2025   vlib_dma_config_t args;
2026   session_prepare_dma_args (&args);
2027   session_worker_t *wrk;
2028   vlib_main_t *vm;
2029
2030   int config_index = -1;
2031
2032   if (is_en)
2033     {
2034       vm = vlib_get_main_by_index (0);
2035       config_index = vlib_dma_config_add (vm, &args);
2036     }
2037   else
2038     {
2039       vm = vlib_get_main_by_index (0);
2040       wrk = session_main_get_worker (0);
2041       if (wrk->config_index >= 0)
2042         vlib_dma_config_del (vm, wrk->config_index);
2043     }
2044   int i;
2045   for (i = 0; i < n_vlibs; i++)
2046     {
2047       vm = vlib_get_main_by_index (i);
2048       wrk = session_main_get_worker (vm->thread_index);
2049       wrk->config_index = config_index;
2050       if (is_en)
2051         {
2052           if (config_index >= 0)
2053             wrk->dma_enabled = true;
2054           wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2055             sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2056           bzero (wrk->dma_trans,
2057                  sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2058         }
2059       else
2060         {
2061           if (wrk->dma_trans)
2062             clib_mem_free (wrk->dma_trans);
2063         }
2064       wrk->trans_head = 0;
2065       wrk->trans_tail = 0;
2066       wrk->trans_size = DMA_TRANS_SIZE;
2067     }
2068 }
2069
2070 void
2071 session_node_enable_disable (u8 is_en)
2072 {
2073   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2074   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2075   session_main_t *sm = &session_main;
2076   vlib_main_t *vm;
2077   vlib_node_t *n;
2078   int n_vlibs, i;
2079
2080   n_vlibs = vlib_get_n_threads ();
2081   for (i = 0; i < n_vlibs; i++)
2082     {
2083       vm = vlib_get_main_by_index (i);
2084       /* main thread with workers and not polling */
2085       if (i == 0 && n_vlibs > 1)
2086         {
2087           vlib_node_set_state (vm, session_queue_node.index, mstate);
2088           if (is_en)
2089             {
2090               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2091               vlib_node_set_state (vm, session_queue_process_node.index,
2092                                    state);
2093               n = vlib_get_node (vm, session_queue_process_node.index);
2094               vlib_start_process (vm, n->runtime_index);
2095             }
2096           else
2097             {
2098               vlib_process_signal_event_mt (vm,
2099                                             session_queue_process_node.index,
2100                                             SESSION_Q_PROCESS_STOP, 0);
2101             }
2102           if (!sm->poll_main)
2103             continue;
2104         }
2105       vlib_node_set_state (vm, session_queue_node.index, state);
2106     }
2107
2108   if (sm->use_private_rx_mqs)
2109     application_enable_rx_mqs_nodes (is_en);
2110
2111   if (sm->dma_enabled)
2112     session_node_enable_dma (is_en, n_vlibs);
2113 }
2114
2115 clib_error_t *
2116 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2117 {
2118   clib_error_t *error = 0;
2119   if (is_en)
2120     {
2121       if (session_main.is_enabled)
2122         return 0;
2123
2124       error = session_manager_main_enable (vm);
2125       session_node_enable_disable (is_en);
2126     }
2127   else
2128     {
2129       session_main.is_enabled = 0;
2130       session_manager_main_disable (vm);
2131       session_node_enable_disable (is_en);
2132     }
2133
2134   return error;
2135 }
2136
2137 clib_error_t *
2138 session_main_init (vlib_main_t * vm)
2139 {
2140   session_main_t *smm = &session_main;
2141
2142   smm->is_enabled = 0;
2143   smm->session_enable_asap = 0;
2144   smm->poll_main = 0;
2145   smm->use_private_rx_mqs = 0;
2146   smm->no_adaptive = 0;
2147   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2148
2149   return 0;
2150 }
2151
2152 static clib_error_t *
2153 session_main_loop_init (vlib_main_t * vm)
2154 {
2155   session_main_t *smm = &session_main;
2156   if (smm->session_enable_asap)
2157     {
2158       vlib_worker_thread_barrier_sync (vm);
2159       vnet_session_enable_disable (vm, 1 /* is_en */ );
2160       vlib_worker_thread_barrier_release (vm);
2161     }
2162   return 0;
2163 }
2164
2165 VLIB_INIT_FUNCTION (session_main_init);
2166 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2167
2168 static clib_error_t *
2169 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2170 {
2171   session_main_t *smm = &session_main;
2172   u32 nitems;
2173   uword tmp;
2174
2175   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2176     {
2177       if (unformat (input, "wrk-mq-length %d", &nitems))
2178         {
2179           if (nitems >= 2048)
2180             smm->configured_wrk_mq_length = nitems;
2181           else
2182             clib_warning ("event queue length %d too small, ignored", nitems);
2183         }
2184       else if (unformat (input, "wrk-mqs-segment-size %U",
2185                          unformat_memory_size, &smm->wrk_mqs_segment_size))
2186         ;
2187       else if (unformat (input, "preallocated-sessions %d",
2188                          &smm->preallocated_sessions))
2189         ;
2190       else if (unformat (input, "v4-session-table-buckets %d",
2191                          &smm->configured_v4_session_table_buckets))
2192         ;
2193       else if (unformat (input, "v4-halfopen-table-buckets %d",
2194                          &smm->configured_v4_halfopen_table_buckets))
2195         ;
2196       else if (unformat (input, "v6-session-table-buckets %d",
2197                          &smm->configured_v6_session_table_buckets))
2198         ;
2199       else if (unformat (input, "v6-halfopen-table-buckets %d",
2200                          &smm->configured_v6_halfopen_table_buckets))
2201         ;
2202       else if (unformat (input, "v4-session-table-memory %U",
2203                          unformat_memory_size, &tmp))
2204         {
2205           if (tmp >= 0x100000000)
2206             return clib_error_return (0, "memory size %llx (%lld) too large",
2207                                       tmp, tmp);
2208           smm->configured_v4_session_table_memory = tmp;
2209         }
2210       else if (unformat (input, "v4-halfopen-table-memory %U",
2211                          unformat_memory_size, &tmp))
2212         {
2213           if (tmp >= 0x100000000)
2214             return clib_error_return (0, "memory size %llx (%lld) too large",
2215                                       tmp, tmp);
2216           smm->configured_v4_halfopen_table_memory = tmp;
2217         }
2218       else if (unformat (input, "v6-session-table-memory %U",
2219                          unformat_memory_size, &tmp))
2220         {
2221           if (tmp >= 0x100000000)
2222             return clib_error_return (0, "memory size %llx (%lld) too large",
2223                                       tmp, tmp);
2224           smm->configured_v6_session_table_memory = tmp;
2225         }
2226       else if (unformat (input, "v6-halfopen-table-memory %U",
2227                          unformat_memory_size, &tmp))
2228         {
2229           if (tmp >= 0x100000000)
2230             return clib_error_return (0, "memory size %llx (%lld) too large",
2231                                       tmp, tmp);
2232           smm->configured_v6_halfopen_table_memory = tmp;
2233         }
2234       else if (unformat (input, "local-endpoints-table-memory %U",
2235                          unformat_memory_size, &tmp))
2236         {
2237           if (tmp >= 0x100000000)
2238             return clib_error_return (0, "memory size %llx (%lld) too large",
2239                                       tmp, tmp);
2240           smm->local_endpoints_table_memory = tmp;
2241         }
2242       else if (unformat (input, "local-endpoints-table-buckets %d",
2243                          &smm->local_endpoints_table_buckets))
2244         ;
2245       else if (unformat (input, "enable"))
2246         smm->session_enable_asap = 1;
2247       else if (unformat (input, "use-app-socket-api"))
2248         (void) appns_sapi_enable_disable (1 /* is_enable */);
2249       else if (unformat (input, "poll-main"))
2250         smm->poll_main = 1;
2251       else if (unformat (input, "use-private-rx-mqs"))
2252         smm->use_private_rx_mqs = 1;
2253       else if (unformat (input, "no-adaptive"))
2254         smm->no_adaptive = 1;
2255       else if (unformat (input, "use-dma"))
2256         smm->dma_enabled = 1;
2257       /*
2258        * Deprecated but maintained for compatibility
2259        */
2260       else if (unformat (input, "evt_qs_memfd_seg"))
2261         ;
2262       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2263         ;
2264       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2265                          &smm->wrk_mqs_segment_size))
2266         ;
2267       else if (unformat (input, "event-queue-length %d", &nitems))
2268         {
2269           if (nitems >= 2048)
2270             smm->configured_wrk_mq_length = nitems;
2271           else
2272             clib_warning ("event queue length %d too small, ignored", nitems);
2273         }
2274       else
2275         return clib_error_return (0, "unknown input `%U'",
2276                                   format_unformat_error, input);
2277     }
2278   return 0;
2279 }
2280
2281 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2282
2283 /*
2284  * fd.io coding-style-patch-verification: ON
2285  *
2286  * Local Variables:
2287  * eval: (c-set-style "gnu")
2288  * End:
2289  */