1b9f9de2a1b09daf70eb0f3380145d4cdc7ce807
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24 #include <vlib/stats/stats.h>
25 #include <vlib/dma/dma.h>
26
27 session_main_t session_main;
28
29 static inline int
30 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
31                             session_evt_type_t evt_type)
32 {
33   session_worker_t *wrk = session_main_get_worker (thread_index);
34   session_event_t *evt;
35   svm_msg_q_msg_t msg;
36   svm_msg_q_t *mq;
37
38   mq = wrk->vpp_event_queue;
39   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
40     return -1;
41   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
42     {
43       svm_msg_q_unlock (mq);
44       return -2;
45     }
46   switch (evt_type)
47     {
48     case SESSION_CTRL_EVT_RPC:
49       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
50       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
51       evt->rpc_args.fp = data;
52       evt->rpc_args.arg = args;
53       break;
54     case SESSION_IO_EVT_RX:
55     case SESSION_IO_EVT_TX:
56     case SESSION_IO_EVT_TX_FLUSH:
57     case SESSION_IO_EVT_BUILTIN_RX:
58       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
59       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
60       evt->session_index = *(u32 *) data;
61       break;
62     case SESSION_IO_EVT_TX_MAIN:
63     case SESSION_CTRL_EVT_CLOSE:
64     case SESSION_CTRL_EVT_RESET:
65       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
66       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
67       evt->session_handle = session_handle ((session_t *) data);
68       break;
69     default:
70       clib_warning ("evt unhandled!");
71       svm_msg_q_unlock (mq);
72       return -1;
73     }
74   evt->event_type = evt_type;
75
76   svm_msg_q_add_and_unlock (mq, &msg);
77
78   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
79     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
80
81   return 0;
82 }
83
84 int
85 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
86 {
87   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
88                                      f->master_thread_index, evt_type);
89 }
90
91 int
92 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
93                                       session_evt_type_t evt_type)
94 {
95   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
96 }
97
98 int
99 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
100 {
101   /* only events supported are disconnect, shutdown and reset */
102   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
104           evt_type == SESSION_CTRL_EVT_RESET);
105   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 void
129 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
130 {
131   session_t *s = session_get (tc->s_index, tc->thread_index);
132
133   ASSERT (s->thread_index == vlib_get_thread_index ());
134   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
135
136   if (!(s->flags & SESSION_F_CUSTOM_TX))
137     {
138       s->flags |= SESSION_F_CUSTOM_TX;
139       if (svm_fifo_set_event (s->tx_fifo)
140           || transport_connection_is_descheduled (tc))
141         {
142           session_evt_elt_t *elt;
143           session_worker_t *wrk;
144
145           wrk = session_main_get_worker (tc->thread_index);
146           if (has_prio)
147             elt = session_evt_alloc_new (wrk);
148           else
149             elt = session_evt_alloc_old (wrk);
150           elt->evt.session_index = tc->s_index;
151           elt->evt.event_type = SESSION_IO_EVT_TX;
152           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
153
154           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
155             vlib_node_set_interrupt_pending (wrk->vm,
156                                              session_queue_node.index);
157         }
158     }
159 }
160
161 void
162 sesssion_reschedule_tx (transport_connection_t * tc)
163 {
164   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
165   session_evt_elt_t *elt;
166
167   ASSERT (tc->thread_index == vlib_get_thread_index ());
168
169   elt = session_evt_alloc_new (wrk);
170   elt->evt.session_index = tc->s_index;
171   elt->evt.event_type = SESSION_IO_EVT_TX;
172
173   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
174     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
175 }
176
177 static void
178 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
179 {
180   u32 thread_index = vlib_get_thread_index ();
181   session_evt_elt_t *elt;
182   session_worker_t *wrk;
183
184   /* If we are in the handler thread, or being called with the worker barrier
185    * held, just append a new event to pending disconnects vector. */
186   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
187     {
188       wrk = session_main_get_worker (s->thread_index);
189       elt = session_evt_alloc_ctrl (wrk);
190       clib_memset (&elt->evt, 0, sizeof (session_event_t));
191       elt->evt.session_handle = session_handle (s);
192       elt->evt.event_type = evt;
193
194       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
195         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
196     }
197   else
198     session_send_ctrl_evt_to_thread (s, evt);
199 }
200
201 session_t *
202 session_alloc (u32 thread_index)
203 {
204   session_worker_t *wrk = &session_main.wrk[thread_index];
205   session_t *s;
206
207   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
208   clib_memset (s, 0, sizeof (*s));
209   s->session_index = s - wrk->sessions;
210   s->thread_index = thread_index;
211   s->app_index = APP_INVALID_INDEX;
212
213   return s;
214 }
215
216 void
217 session_free (session_t * s)
218 {
219   session_worker_t *wrk = &session_main.wrk[s->thread_index];
220
221   SESSION_EVT (SESSION_EVT_FREE, s);
222   if (CLIB_DEBUG)
223     clib_memset (s, 0xFA, sizeof (*s));
224   pool_put (wrk->sessions, s);
225 }
226
227 u8
228 session_is_valid (u32 si, u8 thread_index)
229 {
230   session_t *s;
231   transport_connection_t *tc;
232
233   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
234
235   if (s->thread_index != thread_index || s->session_index != si)
236     return 0;
237
238   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
239       || s->session_state <= SESSION_STATE_LISTENING)
240     return 1;
241
242   if (s->session_state == SESSION_STATE_CONNECTING &&
243       (s->flags & SESSION_F_HALF_OPEN))
244     return 1;
245
246   tc = session_get_transport (s);
247   if (s->connection_index != tc->c_index
248       || s->thread_index != tc->thread_index || tc->s_index != si)
249     return 0;
250
251   return 1;
252 }
253
254 static void
255 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
256 {
257   app_worker_t *app_wrk;
258
259   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
260   if (!app_wrk)
261     return;
262   app_worker_cleanup_notify (app_wrk, s, ntf);
263 }
264
265 void
266 session_free_w_fifos (session_t * s)
267 {
268   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
269   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
270   session_free (s);
271 }
272
273 /**
274  * Cleans up session and lookup table.
275  *
276  * Transport connection must still be valid.
277  */
278 static void
279 session_delete (session_t * s)
280 {
281   int rv;
282
283   /* Delete from the main lookup table. */
284   if ((rv = session_lookup_del_session (s)))
285     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
286
287   session_free_w_fifos (s);
288 }
289
290 void
291 session_cleanup_half_open (session_handle_t ho_handle)
292 {
293   session_t *ho = session_get_from_handle (ho_handle);
294
295   /* App transports can migrate their half-opens */
296   if (ho->flags & SESSION_F_IS_MIGRATING)
297     {
298       /* Session still migrating, move to closed state to signal that the
299        * session should be removed. */
300       if (ho->connection_index == ~0)
301         {
302           session_set_state (ho, SESSION_STATE_CLOSED);
303           return;
304         }
305       /* Migrated transports are no longer half-opens */
306       transport_cleanup (session_get_transport_proto (ho),
307                          ho->connection_index, ho->app_index /* overloaded */);
308     }
309   else
310     {
311       /* Cleanup half-open session lookup table if need be */
312       if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSING)
313         {
314           transport_connection_t *tc;
315           tc = transport_get_half_open (session_get_transport_proto (ho),
316                                         ho->connection_index);
317           if (tc && !(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
318             session_lookup_del_half_open (tc);
319         }
320       transport_cleanup_half_open (session_get_transport_proto (ho),
321                                    ho->connection_index);
322     }
323   session_free (ho);
324 }
325
326 static void
327 session_half_open_free (session_t *ho)
328 {
329   app_worker_t *app_wrk;
330
331   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
332   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
333   if (app_wrk)
334     app_worker_del_half_open (app_wrk, ho);
335   session_free (ho);
336 }
337
338 static void
339 session_half_open_free_rpc (void *args)
340 {
341   session_t *ho = ho_session_get (pointer_to_uword (args));
342   session_half_open_free (ho);
343 }
344
345 void
346 session_half_open_delete_notify (transport_connection_t *tc)
347 {
348   session_t *ho = ho_session_get (tc->s_index);
349
350   /* Cleanup half-open lookup table if need be */
351   if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSING)
352     {
353       if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
354         session_lookup_del_half_open (tc);
355     }
356
357   /* Notification from ctrl thread accepted without rpc */
358   if (tc->thread_index == transport_cl_thread ())
359     {
360       session_half_open_free (ho);
361     }
362   else
363     {
364       void *args = uword_to_pointer ((uword) tc->s_index, void *);
365       session_send_rpc_evt_to_thread_force (transport_cl_thread (),
366                                             session_half_open_free_rpc, args);
367     }
368 }
369
370 void
371 session_half_open_migrate_notify (transport_connection_t *tc)
372 {
373   session_t *ho;
374
375   /* Support half-open migrations only for transports with no lookup */
376   ASSERT (tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP);
377
378   ho = ho_session_get (tc->s_index);
379   ho->flags |= SESSION_F_IS_MIGRATING;
380   ho->connection_index = ~0;
381 }
382
383 int
384 session_half_open_migrated_notify (transport_connection_t *tc)
385 {
386   session_t *ho;
387
388   ho = ho_session_get (tc->s_index);
389
390   /* App probably detached so the half-open must be cleaned up */
391   if (ho->session_state == SESSION_STATE_CLOSED)
392     {
393       session_half_open_delete_notify (tc);
394       return -1;
395     }
396   ho->connection_index = tc->c_index;
397   /* Overload app index for half-open with new thread */
398   ho->app_index = tc->thread_index;
399   return 0;
400 }
401
402 session_t *
403 session_alloc_for_connection (transport_connection_t * tc)
404 {
405   session_t *s;
406   u32 thread_index = tc->thread_index;
407
408   ASSERT (thread_index == vlib_get_thread_index ()
409           || transport_protocol_is_cl (tc->proto));
410
411   s = session_alloc (thread_index);
412   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
413   session_set_state (s, SESSION_STATE_CLOSED);
414
415   /* Attach transport to session and vice versa */
416   s->connection_index = tc->c_index;
417   tc->s_index = s->session_index;
418   return s;
419 }
420
421 session_t *
422 session_alloc_for_half_open (transport_connection_t *tc)
423 {
424   session_t *s;
425
426   s = ho_session_alloc ();
427   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
428   s->connection_index = tc->c_index;
429   tc->s_index = s->session_index;
430   return s;
431 }
432
433 /**
434  * Discards bytes from buffer chain
435  *
436  * It discards n_bytes_to_drop starting at first buffer after chain_b
437  */
438 always_inline void
439 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
440                                      vlib_buffer_t ** chain_b,
441                                      u32 n_bytes_to_drop)
442 {
443   vlib_buffer_t *next = *chain_b;
444   u32 to_drop = n_bytes_to_drop;
445   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
446   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
447     {
448       next = vlib_get_buffer (vm, next->next_buffer);
449       if (next->current_length > to_drop)
450         {
451           vlib_buffer_advance (next, to_drop);
452           to_drop = 0;
453         }
454       else
455         {
456           to_drop -= next->current_length;
457           next->current_length = 0;
458         }
459     }
460   *chain_b = next;
461
462   if (to_drop == 0)
463     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
464 }
465
466 /**
467  * Enqueue buffer chain tail
468  */
469 always_inline int
470 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
471                             u32 offset, u8 is_in_order)
472 {
473   vlib_buffer_t *chain_b;
474   u32 chain_bi, len, diff;
475   vlib_main_t *vm = vlib_get_main ();
476   u8 *data;
477   u32 written = 0;
478   int rv = 0;
479
480   if (is_in_order && offset)
481     {
482       diff = offset - b->current_length;
483       if (diff > b->total_length_not_including_first_buffer)
484         return 0;
485       chain_b = b;
486       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
487       chain_bi = vlib_get_buffer_index (vm, chain_b);
488     }
489   else
490     chain_bi = b->next_buffer;
491
492   do
493     {
494       chain_b = vlib_get_buffer (vm, chain_bi);
495       data = vlib_buffer_get_current (chain_b);
496       len = chain_b->current_length;
497       if (!len)
498         continue;
499       if (is_in_order)
500         {
501           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
502           if (rv == len)
503             {
504               written += rv;
505             }
506           else if (rv < len)
507             {
508               return (rv > 0) ? (written + rv) : written;
509             }
510           else if (rv > len)
511             {
512               written += rv;
513
514               /* written more than what was left in chain */
515               if (written > b->total_length_not_including_first_buffer)
516                 return written;
517
518               /* drop the bytes that have already been delivered */
519               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
520             }
521         }
522       else
523         {
524           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
525           if (rv)
526             {
527               clib_warning ("failed to enqueue multi-buffer seg");
528               return -1;
529             }
530           offset += len;
531         }
532     }
533   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
534           ? chain_b->next_buffer : 0));
535
536   if (is_in_order)
537     return written;
538
539   return 0;
540 }
541
542 void
543 session_fifo_tuning (session_t * s, svm_fifo_t * f,
544                      session_ft_action_t act, u32 len)
545 {
546   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
547     {
548       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
549       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
550       if (CLIB_ASSERT_ENABLE)
551         {
552           segment_manager_t *sm;
553           sm = segment_manager_get (f->segment_manager);
554           ASSERT (f->shr->size >= 4096);
555           ASSERT (f->shr->size <= sm->max_fifo_size);
556         }
557     }
558 }
559
560 /*
561  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
562  * event but on request can queue notification events for later delivery by
563  * calling stream_server_flush_enqueue_events().
564  *
565  * @param tc Transport connection which is to be enqueued data
566  * @param b Buffer to be enqueued
567  * @param offset Offset at which to start enqueueing if out-of-order
568  * @param queue_event Flag to indicate if peer is to be notified or if event
569  *                    is to be queued. The former is useful when more data is
570  *                    enqueued and only one event is to be generated.
571  * @param is_in_order Flag to indicate if data is in order
572  * @return Number of bytes enqueued or a negative value if enqueueing failed.
573  */
574 int
575 session_enqueue_stream_connection (transport_connection_t * tc,
576                                    vlib_buffer_t * b, u32 offset,
577                                    u8 queue_event, u8 is_in_order)
578 {
579   session_t *s;
580   int enqueued = 0, rv, in_order_off;
581
582   s = session_get (tc->s_index, tc->thread_index);
583
584   if (is_in_order)
585     {
586       enqueued = svm_fifo_enqueue (s->rx_fifo,
587                                    b->current_length,
588                                    vlib_buffer_get_current (b));
589       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
590                          && enqueued >= 0))
591         {
592           in_order_off = enqueued > b->current_length ? enqueued : 0;
593           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
594           if (rv > 0)
595             enqueued += rv;
596         }
597     }
598   else
599     {
600       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
601                                          b->current_length,
602                                          vlib_buffer_get_current (b));
603       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
604         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
605       /* if something was enqueued, report even this as success for ooo
606        * segment handling */
607       return rv;
608     }
609
610   if (queue_event)
611     {
612       /* Queue RX event on this fifo. Eventually these will need to be flushed
613        * by calling stream_server_flush_enqueue_events () */
614       session_worker_t *wrk;
615
616       wrk = session_main_get_worker (s->thread_index);
617       if (!(s->flags & SESSION_F_RX_EVT))
618         {
619           s->flags |= SESSION_F_RX_EVT;
620           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
621         }
622
623       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
624     }
625
626   return enqueued;
627 }
628
629 int
630 session_enqueue_dgram_connection (session_t * s,
631                                   session_dgram_hdr_t * hdr,
632                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
633 {
634   int rv;
635
636   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
637           >= b->current_length + sizeof (*hdr));
638
639   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
640     {
641       /* *INDENT-OFF* */
642       svm_fifo_seg_t segs[2] = {
643           { (u8 *) hdr, sizeof (*hdr) },
644           { vlib_buffer_get_current (b), b->current_length }
645       };
646       /* *INDENT-ON* */
647
648       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
649                                       0 /* allow_partial */ );
650     }
651   else
652     {
653       vlib_main_t *vm = vlib_get_main ();
654       svm_fifo_seg_t *segs = 0, *seg;
655       vlib_buffer_t *it = b;
656       u32 n_segs = 1;
657
658       vec_add2 (segs, seg, 1);
659       seg->data = (u8 *) hdr;
660       seg->len = sizeof (*hdr);
661       while (it)
662         {
663           vec_add2 (segs, seg, 1);
664           seg->data = vlib_buffer_get_current (it);
665           seg->len = it->current_length;
666           n_segs++;
667           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
668             break;
669           it = vlib_get_buffer (vm, it->next_buffer);
670         }
671       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
672                                       0 /* allow partial */ );
673       vec_free (segs);
674     }
675
676   if (queue_event && rv > 0)
677     {
678       /* Queue RX event on this fifo. Eventually these will need to be flushed
679        * by calling stream_server_flush_enqueue_events () */
680       session_worker_t *wrk;
681
682       wrk = session_main_get_worker (s->thread_index);
683       if (!(s->flags & SESSION_F_RX_EVT))
684         {
685           s->flags |= SESSION_F_RX_EVT;
686           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
687         }
688
689       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
690     }
691   return rv > 0 ? rv : 0;
692 }
693
694 int
695 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
696                             u32 offset, u32 max_bytes)
697 {
698   session_t *s = session_get (tc->s_index, tc->thread_index);
699   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
700 }
701
702 u32
703 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
704 {
705   session_t *s = session_get (tc->s_index, tc->thread_index);
706   u32 rv;
707
708   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
709   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
710
711   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
712     session_dequeue_notify (s);
713
714   return rv;
715 }
716
717 static inline int
718 session_notify_subscribers (u32 app_index, session_t * s,
719                             svm_fifo_t * f, session_evt_type_t evt_type)
720 {
721   app_worker_t *app_wrk;
722   application_t *app;
723   int i;
724
725   app = application_get (app_index);
726   if (!app)
727     return -1;
728
729   for (i = 0; i < f->shr->n_subscribers; i++)
730     {
731       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
732       if (!app_wrk)
733         continue;
734       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
735         return -1;
736     }
737
738   return 0;
739 }
740
741 /**
742  * Notify session peer that new data has been enqueued.
743  *
744  * @param s     Stream session for which the event is to be generated.
745  * @param lock  Flag to indicate if call should lock message queue.
746  *
747  * @return 0 on success or negative number if failed to send notification.
748  */
749 static inline int
750 session_enqueue_notify_inline (session_t * s)
751 {
752   app_worker_t *app_wrk;
753   u32 session_index;
754   u8 n_subscribers;
755   u32 thread_index;
756
757   session_index = s->session_index;
758   thread_index = s->thread_index;
759   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
760
761   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
762   if (PREDICT_FALSE (!app_wrk))
763     {
764       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
765       return 0;
766     }
767
768   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
769
770   s->flags &= ~SESSION_F_RX_EVT;
771
772   /* Application didn't confirm accept yet */
773   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
774     return 0;
775
776   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
777                                                      SESSION_IO_EVT_RX)))
778     return -1;
779
780   if (PREDICT_FALSE (n_subscribers))
781     {
782       s = session_get (session_index, thread_index);
783       return session_notify_subscribers (app_wrk->app_index, s,
784                                          s->rx_fifo, SESSION_IO_EVT_RX);
785     }
786
787   return 0;
788 }
789
790 int
791 session_enqueue_notify (session_t * s)
792 {
793   return session_enqueue_notify_inline (s);
794 }
795
796 static void
797 session_enqueue_notify_rpc (void *arg)
798 {
799   u32 session_index = pointer_to_uword (arg);
800   session_t *s;
801
802   s = session_get_if_valid (session_index, vlib_get_thread_index ());
803   if (!s)
804     return;
805
806   session_enqueue_notify (s);
807 }
808
809 /**
810  * Like session_enqueue_notify, but can be called from a thread that does not
811  * own the session.
812  */
813 void
814 session_enqueue_notify_thread (session_handle_t sh)
815 {
816   u32 thread_index = session_thread_from_handle (sh);
817   u32 session_index = session_index_from_handle (sh);
818
819   /*
820    * Pass session index (u32) as opposed to handle (u64) in case pointers
821    * are not 64-bit.
822    */
823   session_send_rpc_evt_to_thread (thread_index,
824                                   session_enqueue_notify_rpc,
825                                   uword_to_pointer (session_index, void *));
826 }
827
828 int
829 session_dequeue_notify (session_t * s)
830 {
831   app_worker_t *app_wrk;
832
833   svm_fifo_clear_deq_ntf (s->tx_fifo);
834
835   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
836   if (PREDICT_FALSE (!app_wrk))
837     return -1;
838
839   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
840                                                      SESSION_IO_EVT_TX)))
841     return -1;
842
843   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
844     return session_notify_subscribers (app_wrk->app_index, s,
845                                        s->tx_fifo, SESSION_IO_EVT_TX);
846
847   return 0;
848 }
849
850 /**
851  * Flushes queue of sessions that are to be notified of new data
852  * enqueued events.
853  *
854  * @param thread_index Thread index for which the flush is to be performed.
855  * @return 0 on success or a positive number indicating the number of
856  *         failures due to API queue being full.
857  */
858 int
859 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
860 {
861   session_worker_t *wrk = session_main_get_worker (thread_index);
862   session_t *s;
863   int i, errors = 0;
864   u32 *indices;
865
866   indices = wrk->session_to_enqueue[transport_proto];
867
868   for (i = 0; i < vec_len (indices); i++)
869     {
870       s = session_get_if_valid (indices[i], thread_index);
871       if (PREDICT_FALSE (!s))
872         {
873           errors++;
874           continue;
875         }
876
877       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
878                            0 /* TODO/not needed */ );
879
880       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
881         errors++;
882     }
883
884   vec_reset_length (indices);
885   wrk->session_to_enqueue[transport_proto] = indices;
886
887   return errors;
888 }
889
890 int
891 session_main_flush_all_enqueue_events (u8 transport_proto)
892 {
893   vlib_thread_main_t *vtm = vlib_get_thread_main ();
894   int i, errors = 0;
895   for (i = 0; i < 1 + vtm->n_threads; i++)
896     errors += session_main_flush_enqueue_events (transport_proto, i);
897   return errors;
898 }
899
900 int
901 session_stream_connect_notify (transport_connection_t * tc,
902                                session_error_t err)
903 {
904   u32 opaque = 0, new_ti, new_si;
905   app_worker_t *app_wrk;
906   session_t *s = 0, *ho;
907
908   /*
909    * Cleanup half-open table
910    */
911   session_lookup_del_half_open (tc);
912
913   ho = ho_session_get (tc->s_index);
914   session_set_state (ho, SESSION_STATE_TRANSPORT_CLOSING);
915   opaque = ho->opaque;
916   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
917   if (!app_wrk)
918     return -1;
919
920   if (err)
921     return app_worker_connect_notify (app_wrk, s, err, opaque);
922
923   s = session_alloc_for_connection (tc);
924   session_set_state (s, SESSION_STATE_CONNECTING);
925   s->app_wrk_index = app_wrk->wrk_index;
926   new_si = s->session_index;
927   new_ti = s->thread_index;
928
929   if ((err = app_worker_init_connected (app_wrk, s)))
930     {
931       session_free (s);
932       app_worker_connect_notify (app_wrk, 0, err, opaque);
933       return -1;
934     }
935
936   s = session_get (new_si, new_ti);
937   session_set_state (s, SESSION_STATE_READY);
938   session_lookup_add_connection (tc, session_handle (s));
939
940   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
941     {
942       session_lookup_del_connection (tc);
943       /* Avoid notifying app about rejected session cleanup */
944       s = session_get (new_si, new_ti);
945       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
946       session_free (s);
947       return -1;
948     }
949
950   return 0;
951 }
952
953 typedef union session_switch_pool_reply_args_
954 {
955   struct
956   {
957     u32 session_index;
958     u16 thread_index;
959     u8 is_closed;
960   };
961   u64 as_u64;
962 } session_switch_pool_reply_args_t;
963
964 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
965                "switch pool reply args size");
966
967 static void
968 session_switch_pool_reply (void *arg)
969 {
970   session_switch_pool_reply_args_t rargs;
971   session_t *s;
972
973   rargs.as_u64 = pointer_to_uword (arg);
974   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
975   if (!s)
976     return;
977
978   /* Session closed during migration. Clean everything up */
979   if (rargs.is_closed)
980     {
981       transport_cleanup (session_get_transport_proto (s), s->connection_index,
982                          s->thread_index);
983       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
984       session_free (s);
985       return;
986     }
987
988   /* Notify app that it has data on the new session */
989   session_enqueue_notify (s);
990 }
991
992 typedef struct _session_switch_pool_args
993 {
994   u32 session_index;
995   u32 thread_index;
996   u32 new_thread_index;
997   u32 new_session_index;
998 } session_switch_pool_args_t;
999
1000 /**
1001  * Notify old thread of the session pool switch
1002  */
1003 static void
1004 session_switch_pool (void *cb_args)
1005 {
1006   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
1007   session_switch_pool_reply_args_t rargs;
1008   session_handle_t new_sh;
1009   segment_manager_t *sm;
1010   app_worker_t *app_wrk;
1011   session_t *s;
1012
1013   ASSERT (args->thread_index == vlib_get_thread_index ());
1014   s = session_get (args->session_index, args->thread_index);
1015
1016   /* Check if session closed during migration */
1017   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
1018
1019   transport_cleanup (session_get_transport_proto (s), s->connection_index,
1020                      s->thread_index);
1021
1022   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1023   if (app_wrk)
1024     {
1025       /* Cleanup fifo segment slice state for fifos */
1026       sm = app_worker_get_connect_segment_manager (app_wrk);
1027       segment_manager_detach_fifo (sm, &s->rx_fifo);
1028       segment_manager_detach_fifo (sm, &s->tx_fifo);
1029
1030       /* Notify app, using old session, about the migration event */
1031       if (!rargs.is_closed)
1032         {
1033           new_sh = session_make_handle (args->new_session_index,
1034                                         args->new_thread_index);
1035           app_worker_migrate_notify (app_wrk, s, new_sh);
1036         }
1037     }
1038
1039   /* Trigger app read and fifo updates on the new thread */
1040   rargs.session_index = args->new_session_index;
1041   rargs.thread_index = args->new_thread_index;
1042   session_send_rpc_evt_to_thread (args->new_thread_index,
1043                                   session_switch_pool_reply,
1044                                   uword_to_pointer (rargs.as_u64, void *));
1045
1046   session_free (s);
1047   clib_mem_free (cb_args);
1048 }
1049
1050 /**
1051  * Move dgram session to the right thread
1052  */
1053 int
1054 session_dgram_connect_notify (transport_connection_t * tc,
1055                               u32 old_thread_index, session_t ** new_session)
1056 {
1057   session_t *new_s;
1058   session_switch_pool_args_t *rpc_args;
1059   segment_manager_t *sm;
1060   app_worker_t *app_wrk;
1061
1062   /*
1063    * Clone half-open session to the right thread.
1064    */
1065   new_s = session_clone_safe (tc->s_index, old_thread_index);
1066   new_s->connection_index = tc->c_index;
1067   session_set_state (new_s, SESSION_STATE_READY);
1068   new_s->flags |= SESSION_F_IS_MIGRATING;
1069
1070   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1071     session_lookup_add_connection (tc, session_handle (new_s));
1072
1073   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1074   if (app_wrk)
1075     {
1076       /* New set of fifos attached to the same shared memory */
1077       sm = app_worker_get_connect_segment_manager (app_wrk);
1078       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1079       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1080     }
1081
1082   /*
1083    * Ask thread owning the old session to clean it up and make us the tx
1084    * fifo owner
1085    */
1086   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1087   rpc_args->new_session_index = new_s->session_index;
1088   rpc_args->new_thread_index = new_s->thread_index;
1089   rpc_args->session_index = tc->s_index;
1090   rpc_args->thread_index = old_thread_index;
1091   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1092                                   rpc_args);
1093
1094   tc->s_index = new_s->session_index;
1095   new_s->connection_index = tc->c_index;
1096   *new_session = new_s;
1097   return 0;
1098 }
1099
1100 /**
1101  * Notification from transport that connection is being closed.
1102  *
1103  * A disconnect is sent to application but state is not removed. Once
1104  * disconnect is acknowledged by application, session disconnect is called.
1105  * Ultimately this leads to close being called on transport (passive close).
1106  */
1107 void
1108 session_transport_closing_notify (transport_connection_t * tc)
1109 {
1110   app_worker_t *app_wrk;
1111   session_t *s;
1112
1113   s = session_get (tc->s_index, tc->thread_index);
1114   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1115     return;
1116
1117   /* Wait for reply from app before sending notification as the
1118    * accept might be rejected */
1119   if (s->session_state == SESSION_STATE_ACCEPTING)
1120     {
1121       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1122       return;
1123     }
1124
1125   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1126   app_wrk = app_worker_get (s->app_wrk_index);
1127   app_worker_close_notify (app_wrk, s);
1128 }
1129
1130 /**
1131  * Notification from transport that connection is being deleted
1132  *
1133  * This removes the session if it is still valid. It should be called only on
1134  * previously fully established sessions. For instance failed connects should
1135  * call stream_session_connect_notify and indicate that the connect has
1136  * failed.
1137  */
1138 void
1139 session_transport_delete_notify (transport_connection_t * tc)
1140 {
1141   session_t *s;
1142
1143   /* App might've been removed already */
1144   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1145     return;
1146
1147   switch (s->session_state)
1148     {
1149     case SESSION_STATE_CREATED:
1150       /* Session was created but accept notification was not yet sent to the
1151        * app. Cleanup everything. */
1152       session_lookup_del_session (s);
1153       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1154       session_free (s);
1155       break;
1156     case SESSION_STATE_ACCEPTING:
1157     case SESSION_STATE_TRANSPORT_CLOSING:
1158     case SESSION_STATE_CLOSING:
1159     case SESSION_STATE_TRANSPORT_CLOSED:
1160       /* If transport finishes or times out before we get a reply
1161        * from the app, mark transport as closed and wait for reply
1162        * before removing the session. Cleanup session table in advance
1163        * because transport will soon be closed and closed sessions
1164        * are assumed to have been removed from the lookup table */
1165       session_lookup_del_session (s);
1166       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1167       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1168       svm_fifo_dequeue_drop_all (s->tx_fifo);
1169       break;
1170     case SESSION_STATE_APP_CLOSED:
1171       /* Cleanup lookup table as transport needs to still be valid.
1172        * Program transport close to ensure that all session events
1173        * have been cleaned up. Once transport close is called, the
1174        * session is just removed because both transport and app have
1175        * confirmed the close*/
1176       session_lookup_del_session (s);
1177       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1178       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1179       svm_fifo_dequeue_drop_all (s->tx_fifo);
1180       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1181       break;
1182     case SESSION_STATE_TRANSPORT_DELETED:
1183       break;
1184     case SESSION_STATE_CLOSED:
1185       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1186       session_delete (s);
1187       break;
1188     default:
1189       clib_warning ("session state %u", s->session_state);
1190       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1191       session_delete (s);
1192       break;
1193     }
1194 }
1195
1196 /**
1197  * Notification from transport that it is closed
1198  *
1199  * Should be called by transport, prior to calling delete notify, once it
1200  * knows that no more data will be exchanged. This could serve as an
1201  * early acknowledgment of an active close especially if transport delete
1202  * can be delayed a long time, e.g., tcp time-wait.
1203  */
1204 void
1205 session_transport_closed_notify (transport_connection_t * tc)
1206 {
1207   app_worker_t *app_wrk;
1208   session_t *s;
1209
1210   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1211     return;
1212
1213   /* Transport thinks that app requested close but it actually didn't.
1214    * Can happen for tcp:
1215    * 1)if fin and rst are received in close succession.
1216    * 2)if app shutdown the connection.  */
1217   if (s->session_state == SESSION_STATE_READY)
1218     {
1219       session_transport_closing_notify (tc);
1220       svm_fifo_dequeue_drop_all (s->tx_fifo);
1221       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1222     }
1223   /* If app close has not been received or has not yet resulted in
1224    * a transport close, only mark the session transport as closed */
1225   else if (s->session_state <= SESSION_STATE_CLOSING)
1226     session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1227   /* If app also closed, switch to closed */
1228   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1229     session_set_state (s, SESSION_STATE_CLOSED);
1230
1231   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1232   if (app_wrk)
1233     app_worker_transport_closed_notify (app_wrk, s);
1234 }
1235
1236 /**
1237  * Notify application that connection has been reset.
1238  */
1239 void
1240 session_transport_reset_notify (transport_connection_t * tc)
1241 {
1242   app_worker_t *app_wrk;
1243   session_t *s;
1244
1245   s = session_get (tc->s_index, tc->thread_index);
1246   svm_fifo_dequeue_drop_all (s->tx_fifo);
1247   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1248     return;
1249   if (s->session_state == SESSION_STATE_ACCEPTING)
1250     {
1251       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1252       return;
1253     }
1254   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1255   app_wrk = app_worker_get (s->app_wrk_index);
1256   app_worker_reset_notify (app_wrk, s);
1257 }
1258
1259 int
1260 session_stream_accept_notify (transport_connection_t * tc)
1261 {
1262   app_worker_t *app_wrk;
1263   session_t *s;
1264
1265   s = session_get (tc->s_index, tc->thread_index);
1266   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1267   if (!app_wrk)
1268     return -1;
1269   if (s->session_state != SESSION_STATE_CREATED)
1270     return 0;
1271   session_set_state (s, SESSION_STATE_ACCEPTING);
1272   if (app_worker_accept_notify (app_wrk, s))
1273     {
1274       /* On transport delete, no notifications should be sent. Unless, the
1275        * accept is retried and successful. */
1276       session_set_state (s, SESSION_STATE_CREATED);
1277       return -1;
1278     }
1279   return 0;
1280 }
1281
1282 /**
1283  * Accept a stream session. Optionally ping the server by callback.
1284  */
1285 int
1286 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1287                        u32 thread_index, u8 notify)
1288 {
1289   session_t *s;
1290   int rv;
1291
1292   s = session_alloc_for_connection (tc);
1293   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1294   session_set_state (s, SESSION_STATE_CREATED);
1295
1296   if ((rv = app_worker_init_accepted (s)))
1297     {
1298       session_free (s);
1299       return rv;
1300     }
1301
1302   session_lookup_add_connection (tc, session_handle (s));
1303
1304   /* Shoulder-tap the server */
1305   if (notify)
1306     {
1307       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1308       if ((rv = app_worker_accept_notify (app_wrk, s)))
1309         {
1310           session_lookup_del_session (s);
1311           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1312           session_free (s);
1313           return rv;
1314         }
1315     }
1316
1317   return 0;
1318 }
1319
1320 int
1321 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1322                       u32 thread_index)
1323 {
1324   app_worker_t *app_wrk;
1325   session_t *s;
1326   int rv;
1327
1328   s = session_alloc_for_connection (tc);
1329   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1330
1331   if ((rv = app_worker_init_accepted (s)))
1332     {
1333       session_free (s);
1334       return rv;
1335     }
1336
1337   session_lookup_add_connection (tc, session_handle (s));
1338   session_set_state (s, SESSION_STATE_ACCEPTING);
1339
1340   app_wrk = app_worker_get (s->app_wrk_index);
1341   if ((rv = app_worker_accept_notify (app_wrk, s)))
1342     {
1343       session_lookup_del_session (s);
1344       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1345       session_free (s);
1346       return rv;
1347     }
1348
1349   return 0;
1350 }
1351
1352 int
1353 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1354 {
1355   transport_connection_t *tc;
1356   transport_endpoint_cfg_t *tep;
1357   app_worker_t *app_wrk;
1358   session_handle_t sh;
1359   session_t *s;
1360   int rv;
1361
1362   tep = session_endpoint_to_transport_cfg (rmt);
1363   rv = transport_connect (rmt->transport_proto, tep);
1364   if (rv < 0)
1365     {
1366       SESSION_DBG ("Transport failed to open connection.");
1367       return rv;
1368     }
1369
1370   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1371
1372   /* For dgram type of service, allocate session and fifos now */
1373   app_wrk = app_worker_get (rmt->app_wrk_index);
1374   s = session_alloc_for_connection (tc);
1375   s->app_wrk_index = app_wrk->wrk_index;
1376   session_set_state (s, SESSION_STATE_OPENED);
1377   if (app_worker_init_connected (app_wrk, s))
1378     {
1379       session_free (s);
1380       return -1;
1381     }
1382
1383   sh = session_handle (s);
1384   *rsh = sh;
1385
1386   session_lookup_add_connection (tc, sh);
1387   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1388 }
1389
1390 int
1391 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1392 {
1393   transport_connection_t *tc;
1394   transport_endpoint_cfg_t *tep;
1395   app_worker_t *app_wrk;
1396   session_t *ho;
1397   int rv;
1398
1399   tep = session_endpoint_to_transport_cfg (rmt);
1400   rv = transport_connect (rmt->transport_proto, tep);
1401   if (rv < 0)
1402     {
1403       SESSION_DBG ("Transport failed to open connection.");
1404       return rv;
1405     }
1406
1407   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1408
1409   app_wrk = app_worker_get (rmt->app_wrk_index);
1410
1411   /* If transport offers a vc service, only allocate established
1412    * session once the connection has been established.
1413    * In the meantime allocate half-open session for tracking purposes
1414    * associate half-open connection to it and add session to app-worker
1415    * half-open table. These are needed to allocate the established
1416    * session on transport notification, and to cleanup the half-open
1417    * session if the app detaches before connection establishment.
1418    */
1419   ho = session_alloc_for_half_open (tc);
1420   ho->app_wrk_index = app_wrk->wrk_index;
1421   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1422   ho->opaque = rmt->opaque;
1423   *rsh = session_handle (ho);
1424
1425   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1426     session_lookup_add_half_open (tc, tc->c_index);
1427
1428   return 0;
1429 }
1430
1431 int
1432 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1433 {
1434   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1435
1436   /* Not supported for now */
1437   *rsh = SESSION_INVALID_HANDLE;
1438   return transport_connect (rmt->transport_proto, tep_cfg);
1439 }
1440
1441 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1442                                         session_handle_t *);
1443
1444 /* *INDENT-OFF* */
1445 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1446   session_open_vc,
1447   session_open_cl,
1448   session_open_app,
1449 };
1450 /* *INDENT-ON* */
1451
1452 /**
1453  * Ask transport to open connection to remote transport endpoint.
1454  *
1455  * Stores handle for matching request with reply since the call can be
1456  * asynchronous. For instance, for TCP the 3-way handshake must complete
1457  * before reply comes. Session is only created once connection is established.
1458  *
1459  * @param app_index Index of the application requesting the connect
1460  * @param st Session type requested.
1461  * @param tep Remote transport endpoint
1462  * @param opaque Opaque data (typically, api_context) the application expects
1463  *               on open completion.
1464  */
1465 int
1466 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1467 {
1468   transport_service_type_t tst;
1469   tst = transport_protocol_service_type (rmt->transport_proto);
1470   return session_open_srv_fns[tst](rmt, rsh);
1471 }
1472
1473 /**
1474  * Ask transport to listen on session endpoint.
1475  *
1476  * @param s Session for which listen will be called. Note that unlike
1477  *          established sessions, listen sessions are not associated to a
1478  *          thread.
1479  * @param sep Local endpoint to be listened on.
1480  */
1481 int
1482 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1483 {
1484   transport_endpoint_cfg_t *tep;
1485   int tc_index;
1486   u32 s_index;
1487
1488   /* Transport bind/listen */
1489   tep = session_endpoint_to_transport_cfg (sep);
1490   s_index = ls->session_index;
1491   tc_index = transport_start_listen (session_get_transport_proto (ls),
1492                                      s_index, tep);
1493
1494   if (tc_index < 0)
1495     return tc_index;
1496
1497   /* Attach transport to session. Lookup tables are populated by the app
1498    * worker because local tables (for ct sessions) are not backed by a fib */
1499   ls = listen_session_get (s_index);
1500   ls->connection_index = tc_index;
1501   ls->opaque = sep->opaque;
1502
1503   return 0;
1504 }
1505
1506 /**
1507  * Ask transport to stop listening on local transport endpoint.
1508  *
1509  * @param s Session to stop listening on. It must be in state LISTENING.
1510  */
1511 int
1512 session_stop_listen (session_t * s)
1513 {
1514   transport_proto_t tp = session_get_transport_proto (s);
1515   transport_connection_t *tc;
1516
1517   if (s->session_state != SESSION_STATE_LISTENING)
1518     return SESSION_E_NOLISTEN;
1519
1520   tc = transport_get_listener (tp, s->connection_index);
1521
1522   /* If no transport, assume everything was cleaned up already */
1523   if (!tc)
1524     return SESSION_E_NONE;
1525
1526   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1527     session_lookup_del_connection (tc);
1528
1529   transport_stop_listen (tp, s->connection_index);
1530   return 0;
1531 }
1532
1533 /**
1534  * Initialize session half-closing procedure.
1535  *
1536  * Note that half-closing will not change the state of the session.
1537  */
1538 void
1539 session_half_close (session_t *s)
1540 {
1541   if (!s)
1542     return;
1543
1544   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1545 }
1546
1547 /**
1548  * Initialize session closing procedure.
1549  *
1550  * Request is always sent to session node to ensure that all outstanding
1551  * requests are served before transport is notified.
1552  */
1553 void
1554 session_close (session_t * s)
1555 {
1556   if (!s || (s->flags & SESSION_F_APP_CLOSED))
1557     return;
1558
1559   /* Transports can close and delete their state independent of app closes
1560    * and transport initiated state transitions can hide app closes. Instead
1561    * of extending the state machine to support separate tracking of app and
1562    * transport initiated closes, use a flag. */
1563   s->flags |= SESSION_F_APP_CLOSED;
1564
1565   if (s->session_state >= SESSION_STATE_CLOSING)
1566     {
1567       /* Session will only be removed once both app and transport
1568        * acknowledge the close */
1569       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1570           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1571         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1572       return;
1573     }
1574
1575   /* App closed so stop propagating dequeue notifications.
1576    * App might disconnect session before connected, in this case,
1577    * tx_fifo may not be setup yet, so clear only it's inited. */
1578   if (s->tx_fifo)
1579     svm_fifo_clear_deq_ntf (s->tx_fifo);
1580   session_set_state (s, SESSION_STATE_CLOSING);
1581   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1582 }
1583
1584 /**
1585  * Force a close without waiting for data to be flushed
1586  */
1587 void
1588 session_reset (session_t * s)
1589 {
1590   if (s->session_state >= SESSION_STATE_CLOSING)
1591     return;
1592   /* Drop all outstanding tx data
1593    * App might disconnect session before connected, in this case,
1594    * tx_fifo may not be setup yet, so clear only it's inited. */
1595   if (s->tx_fifo)
1596     svm_fifo_dequeue_drop_all (s->tx_fifo);
1597   session_set_state (s, SESSION_STATE_CLOSING);
1598   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1599 }
1600
1601 /**
1602  * Notify transport the session can be half-disconnected.
1603  *
1604  * Must be called from the session's thread.
1605  */
1606 void
1607 session_transport_half_close (session_t *s)
1608 {
1609   /* Only READY session can be half-closed */
1610   if (s->session_state != SESSION_STATE_READY)
1611     {
1612       return;
1613     }
1614
1615   transport_half_close (session_get_transport_proto (s), s->connection_index,
1616                         s->thread_index);
1617 }
1618
1619 /**
1620  * Notify transport the session can be disconnected. This should eventually
1621  * result in a delete notification that allows us to cleanup session state.
1622  * Called for both active/passive disconnects.
1623  *
1624  * Must be called from the session's thread.
1625  */
1626 void
1627 session_transport_close (session_t * s)
1628 {
1629   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1630     {
1631       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1632         session_set_state (s, SESSION_STATE_CLOSED);
1633       /* If transport is already deleted, just free the session */
1634       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1635         session_free_w_fifos (s);
1636       return;
1637     }
1638
1639   /* If the tx queue wasn't drained, the transport can continue to try
1640    * sending the outstanding data (in closed state it cannot). It MUST however
1641    * at one point, either after sending everything or after a timeout, call
1642    * delete notify. This will finally lead to the complete cleanup of the
1643    * session.
1644    */
1645   session_set_state (s, SESSION_STATE_APP_CLOSED);
1646
1647   transport_close (session_get_transport_proto (s), s->connection_index,
1648                    s->thread_index);
1649 }
1650
1651 /**
1652  * Force transport close
1653  */
1654 void
1655 session_transport_reset (session_t * s)
1656 {
1657   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1658     {
1659       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1660         session_set_state (s, SESSION_STATE_CLOSED);
1661       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1662         session_free_w_fifos (s);
1663       return;
1664     }
1665
1666   session_set_state (s, SESSION_STATE_APP_CLOSED);
1667   transport_reset (session_get_transport_proto (s), s->connection_index,
1668                    s->thread_index);
1669 }
1670
1671 /**
1672  * Cleanup transport and session state.
1673  *
1674  * Notify transport of the cleanup and free the session. This should
1675  * be called only if transport reported some error and is already
1676  * closed.
1677  */
1678 void
1679 session_transport_cleanup (session_t * s)
1680 {
1681   /* Delete from main lookup table before we axe the the transport */
1682   session_lookup_del_session (s);
1683   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1684     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1685                        s->thread_index);
1686   /* Since we called cleanup, no delete notification will come. So, make
1687    * sure the session is properly freed. */
1688   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1689   session_free (s);
1690 }
1691
1692 /**
1693  * Allocate worker mqs in share-able segment
1694  *
1695  * That can only be a newly created memfd segment, that must be mapped
1696  * by all apps/stack users unless private rx mqs are enabled.
1697  */
1698 void
1699 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1700 {
1701   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1702   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1703   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1704   uword mqs_seg_size;
1705   int i;
1706
1707   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1708
1709   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1710     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1711   };
1712   cfg->consumer_pid = 0;
1713   cfg->n_rings = 2;
1714   cfg->q_nitems = mq_q_length;
1715   cfg->ring_cfgs = rc;
1716
1717   /*
1718    * Compute mqs segment size based on rings config and leave space
1719    * for passing extended configuration messages, i.e., data allocated
1720    * outside of the rings. If provided with a config value, accept it
1721    * if larger than minimum size.
1722    */
1723   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1724   mqs_seg_size = mqs_seg_size + (1 << 20);
1725   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1726
1727   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1728   mqs_seg->ssvm.my_pid = getpid ();
1729   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1730
1731   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1732     {
1733       clib_warning ("failed to initialize queue segment");
1734       return;
1735     }
1736
1737   fifo_segment_init (mqs_seg);
1738
1739   /* Special fifo segment that's filled only with mqs */
1740   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1741
1742   for (i = 0; i < vec_len (smm->wrk); i++)
1743     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1744 }
1745
1746 fifo_segment_t *
1747 session_main_get_wrk_mqs_segment (void)
1748 {
1749   return &session_main.wrk_mqs_segment;
1750 }
1751
1752 u64
1753 session_segment_handle (session_t * s)
1754 {
1755   svm_fifo_t *f;
1756
1757   if (!s->rx_fifo)
1758     return SESSION_INVALID_HANDLE;
1759
1760   f = s->rx_fifo;
1761   return segment_manager_make_segment_handle (f->segment_manager,
1762                                               f->segment_index);
1763 }
1764
1765 /* *INDENT-OFF* */
1766 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1767     session_tx_fifo_peek_and_snd,
1768     session_tx_fifo_dequeue_and_snd,
1769     session_tx_fifo_dequeue_internal,
1770     session_tx_fifo_dequeue_and_snd
1771 };
1772 /* *INDENT-ON* */
1773
1774 void
1775 session_register_transport (transport_proto_t transport_proto,
1776                             const transport_proto_vft_t * vft, u8 is_ip4,
1777                             u32 output_node)
1778 {
1779   session_main_t *smm = &session_main;
1780   session_type_t session_type;
1781   u32 next_index = ~0;
1782
1783   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1784
1785   vec_validate (smm->session_type_to_next, session_type);
1786   vec_validate (smm->session_tx_fns, session_type);
1787
1788   if (output_node != ~0)
1789     next_index = vlib_node_add_next (vlib_get_main (),
1790                                      session_queue_node.index, output_node);
1791
1792   smm->session_type_to_next[session_type] = next_index;
1793   smm->session_tx_fns[session_type] =
1794     session_tx_fns[vft->transport_options.tx_type];
1795 }
1796
1797 void
1798 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1799 {
1800   session_main_t *smm = &session_main;
1801   session_update_time_fn *fi;
1802   u32 fi_pos = ~0;
1803   u8 found = 0;
1804
1805   vec_foreach (fi, smm->update_time_fns)
1806     {
1807       if (*fi == fn)
1808         {
1809           fi_pos = fi - smm->update_time_fns;
1810           found = 1;
1811           break;
1812         }
1813     }
1814
1815   if (is_add)
1816     {
1817       if (found)
1818         {
1819           clib_warning ("update time fn %p already registered", fn);
1820           return;
1821         }
1822       vec_add1 (smm->update_time_fns, fn);
1823     }
1824   else
1825     {
1826       vec_del1 (smm->update_time_fns, fi_pos);
1827     }
1828 }
1829
1830 transport_proto_t
1831 session_add_transport_proto (void)
1832 {
1833   session_main_t *smm = &session_main;
1834   session_worker_t *wrk;
1835   u32 thread;
1836
1837   smm->last_transport_proto_type += 1;
1838
1839   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1840     {
1841       wrk = session_main_get_worker (thread);
1842       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1843     }
1844
1845   return smm->last_transport_proto_type;
1846 }
1847
1848 transport_connection_t *
1849 session_get_transport (session_t * s)
1850 {
1851   if (s->session_state != SESSION_STATE_LISTENING)
1852     return transport_get_connection (session_get_transport_proto (s),
1853                                      s->connection_index, s->thread_index);
1854   else
1855     return transport_get_listener (session_get_transport_proto (s),
1856                                    s->connection_index);
1857 }
1858
1859 void
1860 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1861 {
1862   if (s->session_state != SESSION_STATE_LISTENING)
1863     return transport_get_endpoint (session_get_transport_proto (s),
1864                                    s->connection_index, s->thread_index, tep,
1865                                    is_lcl);
1866   else
1867     return transport_get_listener_endpoint (session_get_transport_proto (s),
1868                                             s->connection_index, tep, is_lcl);
1869 }
1870
1871 int
1872 session_transport_attribute (session_t *s, u8 is_get,
1873                              transport_endpt_attr_t *attr)
1874 {
1875   if (s->session_state < SESSION_STATE_READY)
1876     return -1;
1877
1878   return transport_connection_attribute (session_get_transport_proto (s),
1879                                          s->connection_index, s->thread_index,
1880                                          is_get, attr);
1881 }
1882
1883 transport_connection_t *
1884 listen_session_get_transport (session_t * s)
1885 {
1886   return transport_get_listener (session_get_transport_proto (s),
1887                                  s->connection_index);
1888 }
1889
1890 void
1891 session_queue_run_on_main_thread (vlib_main_t * vm)
1892 {
1893   ASSERT (vlib_get_thread_index () == 0);
1894   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1895 }
1896
1897 static void
1898 session_stats_collector_fn (vlib_stats_collector_data_t *d)
1899 {
1900   u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1901   session_main_t *smm = &session_main;
1902   session_worker_t *wrk;
1903   counter_t **counters;
1904   counter_t *cb;
1905
1906   n_workers = vec_len (smm->wrk);
1907   vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1908   counters = d->entry->data;
1909   cb = counters[0];
1910
1911   for (i = 0; i < vec_len (smm->wrk); i++)
1912     {
1913       wrk = session_main_get_worker (i);
1914       n_wrk_sessions = pool_elts (wrk->sessions);
1915       cb[i] = n_wrk_sessions;
1916       n_sessions += n_wrk_sessions;
1917     }
1918
1919   vlib_stats_set_gauge (d->private_data, n_sessions);
1920 }
1921
1922 static void
1923 session_stats_collector_init (void)
1924 {
1925   vlib_stats_collector_reg_t reg = {};
1926
1927   reg.entry_index =
1928     vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1929   reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1930   reg.collect_fn = session_stats_collector_fn;
1931   vlib_stats_register_collector_fn (&reg);
1932   vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1933 }
1934
1935 static clib_error_t *
1936 session_manager_main_enable (vlib_main_t * vm)
1937 {
1938   session_main_t *smm = &session_main;
1939   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1940   u32 num_threads, preallocated_sessions_per_worker;
1941   session_worker_t *wrk;
1942   int i;
1943
1944   /* We only initialize once and do not de-initialized on disable */
1945   if (smm->is_initialized)
1946     goto done;
1947
1948   num_threads = 1 /* main thread */  + vtm->n_threads;
1949
1950   if (num_threads < 1)
1951     return clib_error_return (0, "n_thread_stacks not set");
1952
1953   /* Allocate cache line aligned worker contexts */
1954   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1955   clib_spinlock_init (&session_main.pool_realloc_lock);
1956
1957   for (i = 0; i < num_threads; i++)
1958     {
1959       wrk = &smm->wrk[i];
1960       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1961       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1962       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1963       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1964       wrk->evts_pending_main =
1965         clib_llist_make_head (wrk->event_elts, evt_list);
1966       wrk->vm = vlib_get_main_by_index (i);
1967       wrk->last_vlib_time = vlib_time_now (vm);
1968       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1969       wrk->timerfd = -1;
1970       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1971
1972       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1973         session_wrk_enable_adaptive_mode (wrk);
1974     }
1975
1976   /* Allocate vpp event queues segment and queue */
1977   session_vpp_wrk_mqs_alloc (smm);
1978
1979   /* Initialize segment manager properties */
1980   segment_manager_main_init ();
1981
1982   /* Preallocate sessions */
1983   if (smm->preallocated_sessions)
1984     {
1985       if (num_threads == 1)
1986         {
1987           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1988         }
1989       else
1990         {
1991           int j;
1992           preallocated_sessions_per_worker =
1993             (1.1 * (f64) smm->preallocated_sessions /
1994              (f64) (num_threads - 1));
1995
1996           for (j = 1; j < num_threads; j++)
1997             {
1998               pool_init_fixed (smm->wrk[j].sessions,
1999                                preallocated_sessions_per_worker);
2000             }
2001         }
2002     }
2003
2004   session_lookup_init ();
2005   app_namespaces_init ();
2006   transport_init ();
2007   session_stats_collector_init ();
2008   smm->is_initialized = 1;
2009
2010 done:
2011
2012   smm->is_enabled = 1;
2013
2014   /* Enable transports */
2015   transport_enable_disable (vm, 1);
2016   session_debug_init ();
2017
2018   return 0;
2019 }
2020
2021 static void
2022 session_manager_main_disable (vlib_main_t * vm)
2023 {
2024   transport_enable_disable (vm, 0 /* is_en */ );
2025 }
2026
2027 /* in this new callback, cookie hint the index */
2028 void
2029 session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
2030 {
2031   session_worker_t *wrk;
2032   wrk = session_main_get_worker (vm->thread_index);
2033   session_dma_transfer *dma_transfer;
2034
2035   dma_transfer = &wrk->dma_trans[wrk->trans_head];
2036   vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2037            vec_len (dma_transfer->pending_tx_buffers));
2038   vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2039            vec_len (dma_transfer->pending_tx_nexts));
2040   vec_reset_length (dma_transfer->pending_tx_buffers);
2041   vec_reset_length (dma_transfer->pending_tx_nexts);
2042   wrk->trans_head++;
2043   if (wrk->trans_head == wrk->trans_size)
2044     wrk->trans_head = 0;
2045   return;
2046 }
2047
2048 static void
2049 session_prepare_dma_args (vlib_dma_config_t *args)
2050 {
2051   args->max_batches = 16;
2052   args->max_transfers = DMA_TRANS_SIZE;
2053   args->max_transfer_size = 65536;
2054   args->features = 0;
2055   args->sw_fallback = 1;
2056   args->barrier_before_last = 1;
2057   args->callback_fn = session_dma_completion_cb;
2058 }
2059
2060 static void
2061 session_node_enable_dma (u8 is_en, int n_vlibs)
2062 {
2063   vlib_dma_config_t args;
2064   session_prepare_dma_args (&args);
2065   session_worker_t *wrk;
2066   vlib_main_t *vm;
2067
2068   int config_index = -1;
2069
2070   if (is_en)
2071     {
2072       vm = vlib_get_main_by_index (0);
2073       config_index = vlib_dma_config_add (vm, &args);
2074     }
2075   else
2076     {
2077       vm = vlib_get_main_by_index (0);
2078       wrk = session_main_get_worker (0);
2079       if (wrk->config_index >= 0)
2080         vlib_dma_config_del (vm, wrk->config_index);
2081     }
2082   int i;
2083   for (i = 0; i < n_vlibs; i++)
2084     {
2085       vm = vlib_get_main_by_index (i);
2086       wrk = session_main_get_worker (vm->thread_index);
2087       wrk->config_index = config_index;
2088       if (is_en)
2089         {
2090           if (config_index >= 0)
2091             wrk->dma_enabled = true;
2092           wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2093             sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2094           bzero (wrk->dma_trans,
2095                  sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2096         }
2097       else
2098         {
2099           if (wrk->dma_trans)
2100             clib_mem_free (wrk->dma_trans);
2101         }
2102       wrk->trans_head = 0;
2103       wrk->trans_tail = 0;
2104       wrk->trans_size = DMA_TRANS_SIZE;
2105     }
2106 }
2107
2108 void
2109 session_node_enable_disable (u8 is_en)
2110 {
2111   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2112   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2113   session_main_t *sm = &session_main;
2114   vlib_main_t *vm;
2115   vlib_node_t *n;
2116   int n_vlibs, i;
2117
2118   n_vlibs = vlib_get_n_threads ();
2119   for (i = 0; i < n_vlibs; i++)
2120     {
2121       vm = vlib_get_main_by_index (i);
2122       /* main thread with workers and not polling */
2123       if (i == 0 && n_vlibs > 1)
2124         {
2125           vlib_node_set_state (vm, session_queue_node.index, mstate);
2126           if (is_en)
2127             {
2128               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2129               vlib_node_set_state (vm, session_queue_process_node.index,
2130                                    state);
2131               n = vlib_get_node (vm, session_queue_process_node.index);
2132               vlib_start_process (vm, n->runtime_index);
2133             }
2134           else
2135             {
2136               vlib_process_signal_event_mt (vm,
2137                                             session_queue_process_node.index,
2138                                             SESSION_Q_PROCESS_STOP, 0);
2139             }
2140           if (!sm->poll_main)
2141             continue;
2142         }
2143       vlib_node_set_state (vm, session_queue_node.index, state);
2144     }
2145
2146   if (sm->use_private_rx_mqs)
2147     application_enable_rx_mqs_nodes (is_en);
2148
2149   if (sm->dma_enabled)
2150     session_node_enable_dma (is_en, n_vlibs);
2151 }
2152
2153 clib_error_t *
2154 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2155 {
2156   clib_error_t *error = 0;
2157   if (is_en)
2158     {
2159       if (session_main.is_enabled)
2160         return 0;
2161
2162       error = session_manager_main_enable (vm);
2163       session_node_enable_disable (is_en);
2164     }
2165   else
2166     {
2167       session_main.is_enabled = 0;
2168       session_manager_main_disable (vm);
2169       session_node_enable_disable (is_en);
2170     }
2171
2172   return error;
2173 }
2174
2175 clib_error_t *
2176 session_main_init (vlib_main_t * vm)
2177 {
2178   session_main_t *smm = &session_main;
2179
2180   smm->is_enabled = 0;
2181   smm->session_enable_asap = 0;
2182   smm->poll_main = 0;
2183   smm->use_private_rx_mqs = 0;
2184   smm->no_adaptive = 0;
2185   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2186
2187   return 0;
2188 }
2189
2190 static clib_error_t *
2191 session_main_loop_init (vlib_main_t * vm)
2192 {
2193   session_main_t *smm = &session_main;
2194   if (smm->session_enable_asap)
2195     {
2196       vlib_worker_thread_barrier_sync (vm);
2197       vnet_session_enable_disable (vm, 1 /* is_en */ );
2198       vlib_worker_thread_barrier_release (vm);
2199     }
2200   return 0;
2201 }
2202
2203 VLIB_INIT_FUNCTION (session_main_init);
2204 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2205
2206 static clib_error_t *
2207 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2208 {
2209   session_main_t *smm = &session_main;
2210   u32 nitems;
2211   uword tmp;
2212
2213   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2214     {
2215       if (unformat (input, "wrk-mq-length %d", &nitems))
2216         {
2217           if (nitems >= 2048)
2218             smm->configured_wrk_mq_length = nitems;
2219           else
2220             clib_warning ("event queue length %d too small, ignored", nitems);
2221         }
2222       else if (unformat (input, "wrk-mqs-segment-size %U",
2223                          unformat_memory_size, &smm->wrk_mqs_segment_size))
2224         ;
2225       else if (unformat (input, "preallocated-sessions %d",
2226                          &smm->preallocated_sessions))
2227         ;
2228       else if (unformat (input, "v4-session-table-buckets %d",
2229                          &smm->configured_v4_session_table_buckets))
2230         ;
2231       else if (unformat (input, "v4-halfopen-table-buckets %d",
2232                          &smm->configured_v4_halfopen_table_buckets))
2233         ;
2234       else if (unformat (input, "v6-session-table-buckets %d",
2235                          &smm->configured_v6_session_table_buckets))
2236         ;
2237       else if (unformat (input, "v6-halfopen-table-buckets %d",
2238                          &smm->configured_v6_halfopen_table_buckets))
2239         ;
2240       else if (unformat (input, "v4-session-table-memory %U",
2241                          unformat_memory_size, &tmp))
2242         {
2243           if (tmp >= 0x100000000)
2244             return clib_error_return (0, "memory size %llx (%lld) too large",
2245                                       tmp, tmp);
2246           smm->configured_v4_session_table_memory = tmp;
2247         }
2248       else if (unformat (input, "v4-halfopen-table-memory %U",
2249                          unformat_memory_size, &tmp))
2250         {
2251           if (tmp >= 0x100000000)
2252             return clib_error_return (0, "memory size %llx (%lld) too large",
2253                                       tmp, tmp);
2254           smm->configured_v4_halfopen_table_memory = tmp;
2255         }
2256       else if (unformat (input, "v6-session-table-memory %U",
2257                          unformat_memory_size, &tmp))
2258         {
2259           if (tmp >= 0x100000000)
2260             return clib_error_return (0, "memory size %llx (%lld) too large",
2261                                       tmp, tmp);
2262           smm->configured_v6_session_table_memory = tmp;
2263         }
2264       else if (unformat (input, "v6-halfopen-table-memory %U",
2265                          unformat_memory_size, &tmp))
2266         {
2267           if (tmp >= 0x100000000)
2268             return clib_error_return (0, "memory size %llx (%lld) too large",
2269                                       tmp, tmp);
2270           smm->configured_v6_halfopen_table_memory = tmp;
2271         }
2272       else if (unformat (input, "local-endpoints-table-memory %U",
2273                          unformat_memory_size, &tmp))
2274         {
2275           if (tmp >= 0x100000000)
2276             return clib_error_return (0, "memory size %llx (%lld) too large",
2277                                       tmp, tmp);
2278           smm->local_endpoints_table_memory = tmp;
2279         }
2280       else if (unformat (input, "local-endpoints-table-buckets %d",
2281                          &smm->local_endpoints_table_buckets))
2282         ;
2283       else if (unformat (input, "enable"))
2284         smm->session_enable_asap = 1;
2285       else if (unformat (input, "use-app-socket-api"))
2286         (void) appns_sapi_enable_disable (1 /* is_enable */);
2287       else if (unformat (input, "poll-main"))
2288         smm->poll_main = 1;
2289       else if (unformat (input, "use-private-rx-mqs"))
2290         smm->use_private_rx_mqs = 1;
2291       else if (unformat (input, "no-adaptive"))
2292         smm->no_adaptive = 1;
2293       else if (unformat (input, "use-dma"))
2294         smm->dma_enabled = 1;
2295       /*
2296        * Deprecated but maintained for compatibility
2297        */
2298       else if (unformat (input, "evt_qs_memfd_seg"))
2299         ;
2300       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2301         ;
2302       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2303                          &smm->wrk_mqs_segment_size))
2304         ;
2305       else if (unformat (input, "event-queue-length %d", &nitems))
2306         {
2307           if (nitems >= 2048)
2308             smm->configured_wrk_mq_length = nitems;
2309           else
2310             clib_warning ("event queue length %d too small, ignored", nitems);
2311         }
2312       else
2313         return clib_error_return (0, "unknown input `%U'",
2314                                   format_unformat_error, input);
2315     }
2316   return 0;
2317 }
2318
2319 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2320
2321 /*
2322  * fd.io coding-style-patch-verification: ON
2323  *
2324  * Local Variables:
2325  * eval: (c-set-style "gnu")
2326  * End:
2327  */