fe05dd21f6854b80fbe93fa5b791df74dcbf4167
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24 #include <vlib/stats/stats.h>
25 #include <vlib/dma/dma.h>
26
27 session_main_t session_main;
28
29 static inline int
30 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
31                             session_evt_type_t evt_type)
32 {
33   session_worker_t *wrk = session_main_get_worker (thread_index);
34   session_event_t *evt;
35   svm_msg_q_msg_t msg;
36   svm_msg_q_t *mq;
37
38   mq = wrk->vpp_event_queue;
39   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
40     return -1;
41   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
42     {
43       svm_msg_q_unlock (mq);
44       return -2;
45     }
46   switch (evt_type)
47     {
48     case SESSION_CTRL_EVT_RPC:
49       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
50       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
51       evt->rpc_args.fp = data;
52       evt->rpc_args.arg = args;
53       break;
54     case SESSION_IO_EVT_RX:
55     case SESSION_IO_EVT_TX:
56     case SESSION_IO_EVT_TX_FLUSH:
57     case SESSION_IO_EVT_BUILTIN_RX:
58       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
59       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
60       evt->session_index = *(u32 *) data;
61       break;
62     case SESSION_IO_EVT_BUILTIN_TX:
63     case SESSION_CTRL_EVT_CLOSE:
64     case SESSION_CTRL_EVT_RESET:
65       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
66       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
67       evt->session_handle = session_handle ((session_t *) data);
68       break;
69     default:
70       clib_warning ("evt unhandled!");
71       svm_msg_q_unlock (mq);
72       return -1;
73     }
74   evt->event_type = evt_type;
75
76   svm_msg_q_add_and_unlock (mq, &msg);
77
78   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
79     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
80
81   return 0;
82 }
83
84 int
85 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
86 {
87   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
88                                      f->master_thread_index, evt_type);
89 }
90
91 int
92 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
93                                       session_evt_type_t evt_type)
94 {
95   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
96 }
97
98 int
99 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
100 {
101   /* only events supported are disconnect, shutdown and reset */
102   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
104           evt_type == SESSION_CTRL_EVT_RESET);
105   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 void
129 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
130 {
131   session_t *s = session_get (tc->s_index, tc->thread_index);
132
133   ASSERT (s->thread_index == vlib_get_thread_index ());
134   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
135
136   if (!(s->flags & SESSION_F_CUSTOM_TX))
137     {
138       s->flags |= SESSION_F_CUSTOM_TX;
139       if (svm_fifo_set_event (s->tx_fifo)
140           || transport_connection_is_descheduled (tc))
141         {
142           session_evt_elt_t *elt;
143           session_worker_t *wrk;
144
145           wrk = session_main_get_worker (tc->thread_index);
146           if (has_prio)
147             elt = session_evt_alloc_new (wrk);
148           else
149             elt = session_evt_alloc_old (wrk);
150           elt->evt.session_index = tc->s_index;
151           elt->evt.event_type = SESSION_IO_EVT_TX;
152           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
153
154           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
155             vlib_node_set_interrupt_pending (wrk->vm,
156                                              session_queue_node.index);
157         }
158     }
159 }
160
161 void
162 sesssion_reschedule_tx (transport_connection_t * tc)
163 {
164   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
165   session_evt_elt_t *elt;
166
167   ASSERT (tc->thread_index == vlib_get_thread_index ());
168
169   elt = session_evt_alloc_new (wrk);
170   elt->evt.session_index = tc->s_index;
171   elt->evt.event_type = SESSION_IO_EVT_TX;
172
173   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
174     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
175 }
176
177 static void
178 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
179 {
180   u32 thread_index = vlib_get_thread_index ();
181   session_evt_elt_t *elt;
182   session_worker_t *wrk;
183
184   /* If we are in the handler thread, or being called with the worker barrier
185    * held, just append a new event to pending disconnects vector. */
186   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
187     {
188       wrk = session_main_get_worker (s->thread_index);
189       elt = session_evt_alloc_ctrl (wrk);
190       clib_memset (&elt->evt, 0, sizeof (session_event_t));
191       elt->evt.session_handle = session_handle (s);
192       elt->evt.event_type = evt;
193
194       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
195         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
196     }
197   else
198     session_send_ctrl_evt_to_thread (s, evt);
199 }
200
201 session_t *
202 session_alloc (u32 thread_index)
203 {
204   session_worker_t *wrk = &session_main.wrk[thread_index];
205   session_t *s;
206
207   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
208   clib_memset (s, 0, sizeof (*s));
209   s->session_index = s - wrk->sessions;
210   s->thread_index = thread_index;
211   s->app_index = APP_INVALID_INDEX;
212
213   return s;
214 }
215
216 void
217 session_free (session_t * s)
218 {
219   session_worker_t *wrk = &session_main.wrk[s->thread_index];
220
221   SESSION_EVT (SESSION_EVT_FREE, s);
222   if (CLIB_DEBUG)
223     clib_memset (s, 0xFA, sizeof (*s));
224   pool_put (wrk->sessions, s);
225 }
226
227 u8
228 session_is_valid (u32 si, u8 thread_index)
229 {
230   session_t *s;
231   transport_connection_t *tc;
232
233   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
234
235   if (s->thread_index != thread_index || s->session_index != si)
236     return 0;
237
238   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
239       || s->session_state <= SESSION_STATE_LISTENING)
240     return 1;
241
242   if (s->session_state == SESSION_STATE_CONNECTING &&
243       (s->flags & SESSION_F_HALF_OPEN))
244     return 1;
245
246   tc = session_get_transport (s);
247   if (s->connection_index != tc->c_index
248       || s->thread_index != tc->thread_index || tc->s_index != si)
249     return 0;
250
251   return 1;
252 }
253
254 static void
255 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
256 {
257   app_worker_t *app_wrk;
258
259   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
260   if (!app_wrk)
261     return;
262   app_worker_cleanup_notify (app_wrk, s, ntf);
263 }
264
265 void
266 session_free_w_fifos (session_t * s)
267 {
268   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
269   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
270   session_free (s);
271 }
272
273 /**
274  * Cleans up session and lookup table.
275  *
276  * Transport connection must still be valid.
277  */
278 static void
279 session_delete (session_t * s)
280 {
281   int rv;
282
283   /* Delete from the main lookup table. */
284   if ((rv = session_lookup_del_session (s)))
285     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
286
287   session_free_w_fifos (s);
288 }
289
290 void
291 session_cleanup_half_open (session_handle_t ho_handle)
292 {
293   session_t *ho = session_get_from_handle (ho_handle);
294
295   /* App transports can migrate their half-opens */
296   if (ho->flags & SESSION_F_IS_MIGRATING)
297     {
298       /* Session still migrating, move to closed state to signal that the
299        * session should be removed. */
300       if (ho->connection_index == ~0)
301         {
302           session_set_state (ho, SESSION_STATE_CLOSED);
303           return;
304         }
305       /* Migrated transports are no longer half-opens */
306       transport_cleanup (session_get_transport_proto (ho),
307                          ho->connection_index, ho->app_index /* overloaded */);
308     }
309   else
310     transport_cleanup_half_open (session_get_transport_proto (ho),
311                                  ho->connection_index);
312   session_free (ho);
313 }
314
315 static void
316 session_half_open_free (session_t *ho)
317 {
318   app_worker_t *app_wrk;
319
320   ASSERT (vlib_get_thread_index () <= 1);
321   app_wrk = app_worker_get (ho->app_wrk_index);
322   app_worker_del_half_open (app_wrk, ho);
323   session_free (ho);
324 }
325
326 static void
327 session_half_open_free_rpc (void *args)
328 {
329   session_t *ho = ho_session_get (pointer_to_uword (args));
330   session_half_open_free (ho);
331 }
332
333 void
334 session_half_open_delete_notify (transport_connection_t *tc)
335 {
336   /* Notification from ctrl thread accepted without rpc */
337   if (tc->thread_index == transport_cl_thread ())
338     {
339       session_half_open_free (ho_session_get (tc->s_index));
340     }
341   else
342     {
343       void *args = uword_to_pointer ((uword) tc->s_index, void *);
344       session_send_rpc_evt_to_thread_force (transport_cl_thread (),
345                                             session_half_open_free_rpc, args);
346     }
347 }
348
349 void
350 session_half_open_migrate_notify (transport_connection_t *tc)
351 {
352   session_t *ho;
353
354   ho = ho_session_get (tc->s_index);
355   ho->flags |= SESSION_F_IS_MIGRATING;
356   ho->connection_index = ~0;
357 }
358
359 int
360 session_half_open_migrated_notify (transport_connection_t *tc)
361 {
362   session_t *ho;
363
364   ho = ho_session_get (tc->s_index);
365
366   /* App probably detached so the half-open must be cleaned up */
367   if (ho->session_state == SESSION_STATE_CLOSED)
368     {
369       session_half_open_delete_notify (tc);
370       return -1;
371     }
372   ho->connection_index = tc->c_index;
373   /* Overload app index for half-open with new thread */
374   ho->app_index = tc->thread_index;
375   return 0;
376 }
377
378 session_t *
379 session_alloc_for_connection (transport_connection_t * tc)
380 {
381   session_t *s;
382   u32 thread_index = tc->thread_index;
383
384   ASSERT (thread_index == vlib_get_thread_index ()
385           || transport_protocol_is_cl (tc->proto));
386
387   s = session_alloc (thread_index);
388   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
389   session_set_state (s, SESSION_STATE_CLOSED);
390
391   /* Attach transport to session and vice versa */
392   s->connection_index = tc->c_index;
393   tc->s_index = s->session_index;
394   return s;
395 }
396
397 session_t *
398 session_alloc_for_half_open (transport_connection_t *tc)
399 {
400   session_t *s;
401
402   s = ho_session_alloc ();
403   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
404   s->connection_index = tc->c_index;
405   tc->s_index = s->session_index;
406   return s;
407 }
408
409 /**
410  * Discards bytes from buffer chain
411  *
412  * It discards n_bytes_to_drop starting at first buffer after chain_b
413  */
414 always_inline void
415 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
416                                      vlib_buffer_t ** chain_b,
417                                      u32 n_bytes_to_drop)
418 {
419   vlib_buffer_t *next = *chain_b;
420   u32 to_drop = n_bytes_to_drop;
421   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
422   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
423     {
424       next = vlib_get_buffer (vm, next->next_buffer);
425       if (next->current_length > to_drop)
426         {
427           vlib_buffer_advance (next, to_drop);
428           to_drop = 0;
429         }
430       else
431         {
432           to_drop -= next->current_length;
433           next->current_length = 0;
434         }
435     }
436   *chain_b = next;
437
438   if (to_drop == 0)
439     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
440 }
441
442 /**
443  * Enqueue buffer chain tail
444  */
445 always_inline int
446 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
447                             u32 offset, u8 is_in_order)
448 {
449   vlib_buffer_t *chain_b;
450   u32 chain_bi, len, diff;
451   vlib_main_t *vm = vlib_get_main ();
452   u8 *data;
453   u32 written = 0;
454   int rv = 0;
455
456   if (is_in_order && offset)
457     {
458       diff = offset - b->current_length;
459       if (diff > b->total_length_not_including_first_buffer)
460         return 0;
461       chain_b = b;
462       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
463       chain_bi = vlib_get_buffer_index (vm, chain_b);
464     }
465   else
466     chain_bi = b->next_buffer;
467
468   do
469     {
470       chain_b = vlib_get_buffer (vm, chain_bi);
471       data = vlib_buffer_get_current (chain_b);
472       len = chain_b->current_length;
473       if (!len)
474         continue;
475       if (is_in_order)
476         {
477           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
478           if (rv == len)
479             {
480               written += rv;
481             }
482           else if (rv < len)
483             {
484               return (rv > 0) ? (written + rv) : written;
485             }
486           else if (rv > len)
487             {
488               written += rv;
489
490               /* written more than what was left in chain */
491               if (written > b->total_length_not_including_first_buffer)
492                 return written;
493
494               /* drop the bytes that have already been delivered */
495               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
496             }
497         }
498       else
499         {
500           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
501           if (rv)
502             {
503               clib_warning ("failed to enqueue multi-buffer seg");
504               return -1;
505             }
506           offset += len;
507         }
508     }
509   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
510           ? chain_b->next_buffer : 0));
511
512   if (is_in_order)
513     return written;
514
515   return 0;
516 }
517
518 void
519 session_fifo_tuning (session_t * s, svm_fifo_t * f,
520                      session_ft_action_t act, u32 len)
521 {
522   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
523     {
524       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
525       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
526       if (CLIB_ASSERT_ENABLE)
527         {
528           segment_manager_t *sm;
529           sm = segment_manager_get (f->segment_manager);
530           ASSERT (f->shr->size >= 4096);
531           ASSERT (f->shr->size <= sm->max_fifo_size);
532         }
533     }
534 }
535
536 /*
537  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
538  * event but on request can queue notification events for later delivery by
539  * calling stream_server_flush_enqueue_events().
540  *
541  * @param tc Transport connection which is to be enqueued data
542  * @param b Buffer to be enqueued
543  * @param offset Offset at which to start enqueueing if out-of-order
544  * @param queue_event Flag to indicate if peer is to be notified or if event
545  *                    is to be queued. The former is useful when more data is
546  *                    enqueued and only one event is to be generated.
547  * @param is_in_order Flag to indicate if data is in order
548  * @return Number of bytes enqueued or a negative value if enqueueing failed.
549  */
550 int
551 session_enqueue_stream_connection (transport_connection_t * tc,
552                                    vlib_buffer_t * b, u32 offset,
553                                    u8 queue_event, u8 is_in_order)
554 {
555   session_t *s;
556   int enqueued = 0, rv, in_order_off;
557
558   s = session_get (tc->s_index, tc->thread_index);
559
560   if (is_in_order)
561     {
562       enqueued = svm_fifo_enqueue (s->rx_fifo,
563                                    b->current_length,
564                                    vlib_buffer_get_current (b));
565       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
566                          && enqueued >= 0))
567         {
568           in_order_off = enqueued > b->current_length ? enqueued : 0;
569           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
570           if (rv > 0)
571             enqueued += rv;
572         }
573     }
574   else
575     {
576       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
577                                          b->current_length,
578                                          vlib_buffer_get_current (b));
579       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
580         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
581       /* if something was enqueued, report even this as success for ooo
582        * segment handling */
583       return rv;
584     }
585
586   if (queue_event)
587     {
588       /* Queue RX event on this fifo. Eventually these will need to be flushed
589        * by calling stream_server_flush_enqueue_events () */
590       session_worker_t *wrk;
591
592       wrk = session_main_get_worker (s->thread_index);
593       if (!(s->flags & SESSION_F_RX_EVT))
594         {
595           s->flags |= SESSION_F_RX_EVT;
596           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
597         }
598
599       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
600     }
601
602   return enqueued;
603 }
604
605 int
606 session_enqueue_dgram_connection (session_t * s,
607                                   session_dgram_hdr_t * hdr,
608                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
609 {
610   int rv;
611
612   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
613           >= b->current_length + sizeof (*hdr));
614
615   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
616     {
617       /* *INDENT-OFF* */
618       svm_fifo_seg_t segs[2] = {
619           { (u8 *) hdr, sizeof (*hdr) },
620           { vlib_buffer_get_current (b), b->current_length }
621       };
622       /* *INDENT-ON* */
623
624       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
625                                       0 /* allow_partial */ );
626     }
627   else
628     {
629       vlib_main_t *vm = vlib_get_main ();
630       svm_fifo_seg_t *segs = 0, *seg;
631       vlib_buffer_t *it = b;
632       u32 n_segs = 1;
633
634       vec_add2 (segs, seg, 1);
635       seg->data = (u8 *) hdr;
636       seg->len = sizeof (*hdr);
637       while (it)
638         {
639           vec_add2 (segs, seg, 1);
640           seg->data = vlib_buffer_get_current (it);
641           seg->len = it->current_length;
642           n_segs++;
643           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
644             break;
645           it = vlib_get_buffer (vm, it->next_buffer);
646         }
647       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
648                                       0 /* allow partial */ );
649       vec_free (segs);
650     }
651
652   if (queue_event && rv > 0)
653     {
654       /* Queue RX event on this fifo. Eventually these will need to be flushed
655        * by calling stream_server_flush_enqueue_events () */
656       session_worker_t *wrk;
657
658       wrk = session_main_get_worker (s->thread_index);
659       if (!(s->flags & SESSION_F_RX_EVT))
660         {
661           s->flags |= SESSION_F_RX_EVT;
662           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
663         }
664
665       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
666     }
667   return rv > 0 ? rv : 0;
668 }
669
670 int
671 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
672                             u32 offset, u32 max_bytes)
673 {
674   session_t *s = session_get (tc->s_index, tc->thread_index);
675   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
676 }
677
678 u32
679 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
680 {
681   session_t *s = session_get (tc->s_index, tc->thread_index);
682   u32 rv;
683
684   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
685   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
686
687   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
688     session_dequeue_notify (s);
689
690   return rv;
691 }
692
693 static inline int
694 session_notify_subscribers (u32 app_index, session_t * s,
695                             svm_fifo_t * f, session_evt_type_t evt_type)
696 {
697   app_worker_t *app_wrk;
698   application_t *app;
699   int i;
700
701   app = application_get (app_index);
702   if (!app)
703     return -1;
704
705   for (i = 0; i < f->shr->n_subscribers; i++)
706     {
707       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
708       if (!app_wrk)
709         continue;
710       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
711         return -1;
712     }
713
714   return 0;
715 }
716
717 /**
718  * Notify session peer that new data has been enqueued.
719  *
720  * @param s     Stream session for which the event is to be generated.
721  * @param lock  Flag to indicate if call should lock message queue.
722  *
723  * @return 0 on success or negative number if failed to send notification.
724  */
725 static inline int
726 session_enqueue_notify_inline (session_t * s)
727 {
728   app_worker_t *app_wrk;
729   u32 session_index;
730   u8 n_subscribers;
731   u32 thread_index;
732
733   session_index = s->session_index;
734   thread_index = s->thread_index;
735   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
736
737   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
738   if (PREDICT_FALSE (!app_wrk))
739     {
740       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
741       return 0;
742     }
743
744   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
745
746   s->flags &= ~SESSION_F_RX_EVT;
747
748   /* Application didn't confirm accept yet */
749   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
750     return 0;
751
752   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
753                                                      SESSION_IO_EVT_RX)))
754     return -1;
755
756   if (PREDICT_FALSE (n_subscribers))
757     {
758       s = session_get (session_index, thread_index);
759       return session_notify_subscribers (app_wrk->app_index, s,
760                                          s->rx_fifo, SESSION_IO_EVT_RX);
761     }
762
763   return 0;
764 }
765
766 int
767 session_enqueue_notify (session_t * s)
768 {
769   return session_enqueue_notify_inline (s);
770 }
771
772 static void
773 session_enqueue_notify_rpc (void *arg)
774 {
775   u32 session_index = pointer_to_uword (arg);
776   session_t *s;
777
778   s = session_get_if_valid (session_index, vlib_get_thread_index ());
779   if (!s)
780     return;
781
782   session_enqueue_notify (s);
783 }
784
785 /**
786  * Like session_enqueue_notify, but can be called from a thread that does not
787  * own the session.
788  */
789 void
790 session_enqueue_notify_thread (session_handle_t sh)
791 {
792   u32 thread_index = session_thread_from_handle (sh);
793   u32 session_index = session_index_from_handle (sh);
794
795   /*
796    * Pass session index (u32) as opposed to handle (u64) in case pointers
797    * are not 64-bit.
798    */
799   session_send_rpc_evt_to_thread (thread_index,
800                                   session_enqueue_notify_rpc,
801                                   uword_to_pointer (session_index, void *));
802 }
803
804 int
805 session_dequeue_notify (session_t * s)
806 {
807   app_worker_t *app_wrk;
808
809   svm_fifo_clear_deq_ntf (s->tx_fifo);
810
811   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
812   if (PREDICT_FALSE (!app_wrk))
813     return -1;
814
815   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
816                                                      SESSION_IO_EVT_TX)))
817     return -1;
818
819   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
820     return session_notify_subscribers (app_wrk->app_index, s,
821                                        s->tx_fifo, SESSION_IO_EVT_TX);
822
823   return 0;
824 }
825
826 /**
827  * Flushes queue of sessions that are to be notified of new data
828  * enqueued events.
829  *
830  * @param thread_index Thread index for which the flush is to be performed.
831  * @return 0 on success or a positive number indicating the number of
832  *         failures due to API queue being full.
833  */
834 int
835 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
836 {
837   session_worker_t *wrk = session_main_get_worker (thread_index);
838   session_t *s;
839   int i, errors = 0;
840   u32 *indices;
841
842   indices = wrk->session_to_enqueue[transport_proto];
843
844   for (i = 0; i < vec_len (indices); i++)
845     {
846       s = session_get_if_valid (indices[i], thread_index);
847       if (PREDICT_FALSE (!s))
848         {
849           errors++;
850           continue;
851         }
852
853       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
854                            0 /* TODO/not needed */ );
855
856       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
857         errors++;
858     }
859
860   vec_reset_length (indices);
861   wrk->session_to_enqueue[transport_proto] = indices;
862
863   return errors;
864 }
865
866 int
867 session_main_flush_all_enqueue_events (u8 transport_proto)
868 {
869   vlib_thread_main_t *vtm = vlib_get_thread_main ();
870   int i, errors = 0;
871   for (i = 0; i < 1 + vtm->n_threads; i++)
872     errors += session_main_flush_enqueue_events (transport_proto, i);
873   return errors;
874 }
875
876 int
877 session_stream_connect_notify (transport_connection_t * tc,
878                                session_error_t err)
879 {
880   u32 opaque = 0, new_ti, new_si;
881   app_worker_t *app_wrk;
882   session_t *s = 0, *ho;
883
884   /*
885    * Cleanup half-open table
886    */
887   session_lookup_del_half_open (tc);
888
889   ho = ho_session_get (tc->s_index);
890   opaque = ho->opaque;
891   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
892   if (!app_wrk)
893     return -1;
894
895   if (err)
896     return app_worker_connect_notify (app_wrk, s, err, opaque);
897
898   s = session_alloc_for_connection (tc);
899   session_set_state (s, SESSION_STATE_CONNECTING);
900   s->app_wrk_index = app_wrk->wrk_index;
901   new_si = s->session_index;
902   new_ti = s->thread_index;
903
904   if ((err = app_worker_init_connected (app_wrk, s)))
905     {
906       session_free (s);
907       app_worker_connect_notify (app_wrk, 0, err, opaque);
908       return -1;
909     }
910
911   s = session_get (new_si, new_ti);
912   session_set_state (s, SESSION_STATE_READY);
913   session_lookup_add_connection (tc, session_handle (s));
914
915   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
916     {
917       session_lookup_del_connection (tc);
918       /* Avoid notifying app about rejected session cleanup */
919       s = session_get (new_si, new_ti);
920       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
921       session_free (s);
922       return -1;
923     }
924
925   return 0;
926 }
927
928 typedef union session_switch_pool_reply_args_
929 {
930   struct
931   {
932     u32 session_index;
933     u16 thread_index;
934     u8 is_closed;
935   };
936   u64 as_u64;
937 } session_switch_pool_reply_args_t;
938
939 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
940                "switch pool reply args size");
941
942 static void
943 session_switch_pool_reply (void *arg)
944 {
945   session_switch_pool_reply_args_t rargs;
946   session_t *s;
947
948   rargs.as_u64 = pointer_to_uword (arg);
949   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
950   if (!s)
951     return;
952
953   /* Session closed during migration. Clean everything up */
954   if (rargs.is_closed)
955     {
956       transport_cleanup (session_get_transport_proto (s), s->connection_index,
957                          s->thread_index);
958       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
959       session_free (s);
960       return;
961     }
962
963   /* Notify app that it has data on the new session */
964   session_enqueue_notify (s);
965 }
966
967 typedef struct _session_switch_pool_args
968 {
969   u32 session_index;
970   u32 thread_index;
971   u32 new_thread_index;
972   u32 new_session_index;
973 } session_switch_pool_args_t;
974
975 /**
976  * Notify old thread of the session pool switch
977  */
978 static void
979 session_switch_pool (void *cb_args)
980 {
981   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
982   session_switch_pool_reply_args_t rargs;
983   session_handle_t new_sh;
984   segment_manager_t *sm;
985   app_worker_t *app_wrk;
986   session_t *s;
987
988   ASSERT (args->thread_index == vlib_get_thread_index ());
989   s = session_get (args->session_index, args->thread_index);
990
991   /* Check if session closed during migration */
992   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
993
994   transport_cleanup (session_get_transport_proto (s), s->connection_index,
995                      s->thread_index);
996
997   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
998   if (app_wrk)
999     {
1000       /* Cleanup fifo segment slice state for fifos */
1001       sm = app_worker_get_connect_segment_manager (app_wrk);
1002       segment_manager_detach_fifo (sm, &s->rx_fifo);
1003       segment_manager_detach_fifo (sm, &s->tx_fifo);
1004
1005       /* Notify app, using old session, about the migration event */
1006       if (!rargs.is_closed)
1007         {
1008           new_sh = session_make_handle (args->new_session_index,
1009                                         args->new_thread_index);
1010           app_worker_migrate_notify (app_wrk, s, new_sh);
1011         }
1012     }
1013
1014   /* Trigger app read and fifo updates on the new thread */
1015   rargs.session_index = args->new_session_index;
1016   rargs.thread_index = args->new_thread_index;
1017   session_send_rpc_evt_to_thread (args->new_thread_index,
1018                                   session_switch_pool_reply,
1019                                   uword_to_pointer (rargs.as_u64, void *));
1020
1021   session_free (s);
1022   clib_mem_free (cb_args);
1023 }
1024
1025 /**
1026  * Move dgram session to the right thread
1027  */
1028 int
1029 session_dgram_connect_notify (transport_connection_t * tc,
1030                               u32 old_thread_index, session_t ** new_session)
1031 {
1032   session_t *new_s;
1033   session_switch_pool_args_t *rpc_args;
1034   segment_manager_t *sm;
1035   app_worker_t *app_wrk;
1036
1037   /*
1038    * Clone half-open session to the right thread.
1039    */
1040   new_s = session_clone_safe (tc->s_index, old_thread_index);
1041   new_s->connection_index = tc->c_index;
1042   session_set_state (new_s, SESSION_STATE_READY);
1043   new_s->flags |= SESSION_F_IS_MIGRATING;
1044
1045   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1046     session_lookup_add_connection (tc, session_handle (new_s));
1047
1048   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1049   if (app_wrk)
1050     {
1051       /* New set of fifos attached to the same shared memory */
1052       sm = app_worker_get_connect_segment_manager (app_wrk);
1053       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1054       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1055     }
1056
1057   /*
1058    * Ask thread owning the old session to clean it up and make us the tx
1059    * fifo owner
1060    */
1061   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1062   rpc_args->new_session_index = new_s->session_index;
1063   rpc_args->new_thread_index = new_s->thread_index;
1064   rpc_args->session_index = tc->s_index;
1065   rpc_args->thread_index = old_thread_index;
1066   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1067                                   rpc_args);
1068
1069   tc->s_index = new_s->session_index;
1070   new_s->connection_index = tc->c_index;
1071   *new_session = new_s;
1072   return 0;
1073 }
1074
1075 /**
1076  * Notification from transport that connection is being closed.
1077  *
1078  * A disconnect is sent to application but state is not removed. Once
1079  * disconnect is acknowledged by application, session disconnect is called.
1080  * Ultimately this leads to close being called on transport (passive close).
1081  */
1082 void
1083 session_transport_closing_notify (transport_connection_t * tc)
1084 {
1085   app_worker_t *app_wrk;
1086   session_t *s;
1087
1088   s = session_get (tc->s_index, tc->thread_index);
1089   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1090     return;
1091
1092   /* Wait for reply from app before sending notification as the
1093    * accept might be rejected */
1094   if (s->session_state == SESSION_STATE_ACCEPTING)
1095     {
1096       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1097       return;
1098     }
1099
1100   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1101   app_wrk = app_worker_get (s->app_wrk_index);
1102   app_worker_close_notify (app_wrk, s);
1103 }
1104
1105 /**
1106  * Notification from transport that connection is being deleted
1107  *
1108  * This removes the session if it is still valid. It should be called only on
1109  * previously fully established sessions. For instance failed connects should
1110  * call stream_session_connect_notify and indicate that the connect has
1111  * failed.
1112  */
1113 void
1114 session_transport_delete_notify (transport_connection_t * tc)
1115 {
1116   session_t *s;
1117
1118   /* App might've been removed already */
1119   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1120     return;
1121
1122   switch (s->session_state)
1123     {
1124     case SESSION_STATE_CREATED:
1125       /* Session was created but accept notification was not yet sent to the
1126        * app. Cleanup everything. */
1127       session_lookup_del_session (s);
1128       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1129       session_free (s);
1130       break;
1131     case SESSION_STATE_ACCEPTING:
1132     case SESSION_STATE_TRANSPORT_CLOSING:
1133     case SESSION_STATE_CLOSING:
1134     case SESSION_STATE_TRANSPORT_CLOSED:
1135       /* If transport finishes or times out before we get a reply
1136        * from the app, mark transport as closed and wait for reply
1137        * before removing the session. Cleanup session table in advance
1138        * because transport will soon be closed and closed sessions
1139        * are assumed to have been removed from the lookup table */
1140       session_lookup_del_session (s);
1141       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1142       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1143       svm_fifo_dequeue_drop_all (s->tx_fifo);
1144       break;
1145     case SESSION_STATE_APP_CLOSED:
1146       /* Cleanup lookup table as transport needs to still be valid.
1147        * Program transport close to ensure that all session events
1148        * have been cleaned up. Once transport close is called, the
1149        * session is just removed because both transport and app have
1150        * confirmed the close*/
1151       session_lookup_del_session (s);
1152       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1153       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1154       svm_fifo_dequeue_drop_all (s->tx_fifo);
1155       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1156       break;
1157     case SESSION_STATE_TRANSPORT_DELETED:
1158       break;
1159     case SESSION_STATE_CLOSED:
1160       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1161       session_delete (s);
1162       break;
1163     default:
1164       clib_warning ("session state %u", s->session_state);
1165       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1166       session_delete (s);
1167       break;
1168     }
1169 }
1170
1171 /**
1172  * Notification from transport that it is closed
1173  *
1174  * Should be called by transport, prior to calling delete notify, once it
1175  * knows that no more data will be exchanged. This could serve as an
1176  * early acknowledgment of an active close especially if transport delete
1177  * can be delayed a long time, e.g., tcp time-wait.
1178  */
1179 void
1180 session_transport_closed_notify (transport_connection_t * tc)
1181 {
1182   app_worker_t *app_wrk;
1183   session_t *s;
1184
1185   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1186     return;
1187
1188   /* Transport thinks that app requested close but it actually didn't.
1189    * Can happen for tcp:
1190    * 1)if fin and rst are received in close succession.
1191    * 2)if app shutdown the connection.  */
1192   if (s->session_state == SESSION_STATE_READY)
1193     {
1194       session_transport_closing_notify (tc);
1195       svm_fifo_dequeue_drop_all (s->tx_fifo);
1196       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1197     }
1198   /* If app close has not been received or has not yet resulted in
1199    * a transport close, only mark the session transport as closed */
1200   else if (s->session_state <= SESSION_STATE_CLOSING)
1201     session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1202   /* If app also closed, switch to closed */
1203   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1204     session_set_state (s, SESSION_STATE_CLOSED);
1205
1206   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1207   if (app_wrk)
1208     app_worker_transport_closed_notify (app_wrk, s);
1209 }
1210
1211 /**
1212  * Notify application that connection has been reset.
1213  */
1214 void
1215 session_transport_reset_notify (transport_connection_t * tc)
1216 {
1217   app_worker_t *app_wrk;
1218   session_t *s;
1219
1220   s = session_get (tc->s_index, tc->thread_index);
1221   svm_fifo_dequeue_drop_all (s->tx_fifo);
1222   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1223     return;
1224   if (s->session_state == SESSION_STATE_ACCEPTING)
1225     {
1226       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1227       return;
1228     }
1229   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1230   app_wrk = app_worker_get (s->app_wrk_index);
1231   app_worker_reset_notify (app_wrk, s);
1232 }
1233
1234 int
1235 session_stream_accept_notify (transport_connection_t * tc)
1236 {
1237   app_worker_t *app_wrk;
1238   session_t *s;
1239
1240   s = session_get (tc->s_index, tc->thread_index);
1241   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1242   if (!app_wrk)
1243     return -1;
1244   if (s->session_state != SESSION_STATE_CREATED)
1245     return 0;
1246   session_set_state (s, SESSION_STATE_ACCEPTING);
1247   if (app_worker_accept_notify (app_wrk, s))
1248     {
1249       /* On transport delete, no notifications should be sent. Unless, the
1250        * accept is retried and successful. */
1251       session_set_state (s, SESSION_STATE_CREATED);
1252       return -1;
1253     }
1254   return 0;
1255 }
1256
1257 /**
1258  * Accept a stream session. Optionally ping the server by callback.
1259  */
1260 int
1261 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1262                        u32 thread_index, u8 notify)
1263 {
1264   session_t *s;
1265   int rv;
1266
1267   s = session_alloc_for_connection (tc);
1268   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1269   session_set_state (s, SESSION_STATE_CREATED);
1270
1271   if ((rv = app_worker_init_accepted (s)))
1272     {
1273       session_free (s);
1274       return rv;
1275     }
1276
1277   session_lookup_add_connection (tc, session_handle (s));
1278
1279   /* Shoulder-tap the server */
1280   if (notify)
1281     {
1282       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1283       if ((rv = app_worker_accept_notify (app_wrk, s)))
1284         {
1285           session_lookup_del_session (s);
1286           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1287           session_free (s);
1288           return rv;
1289         }
1290     }
1291
1292   return 0;
1293 }
1294
1295 int
1296 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1297                       u32 thread_index)
1298 {
1299   app_worker_t *app_wrk;
1300   session_t *s;
1301   int rv;
1302
1303   s = session_alloc_for_connection (tc);
1304   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1305
1306   if ((rv = app_worker_init_accepted (s)))
1307     {
1308       session_free (s);
1309       return rv;
1310     }
1311
1312   session_lookup_add_connection (tc, session_handle (s));
1313   session_set_state (s, SESSION_STATE_ACCEPTING);
1314
1315   app_wrk = app_worker_get (s->app_wrk_index);
1316   if ((rv = app_worker_accept_notify (app_wrk, s)))
1317     {
1318       session_lookup_del_session (s);
1319       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1320       session_free (s);
1321       return rv;
1322     }
1323
1324   return 0;
1325 }
1326
1327 int
1328 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1329 {
1330   transport_connection_t *tc;
1331   transport_endpoint_cfg_t *tep;
1332   app_worker_t *app_wrk;
1333   session_handle_t sh;
1334   session_t *s;
1335   int rv;
1336
1337   tep = session_endpoint_to_transport_cfg (rmt);
1338   rv = transport_connect (rmt->transport_proto, tep);
1339   if (rv < 0)
1340     {
1341       SESSION_DBG ("Transport failed to open connection.");
1342       return rv;
1343     }
1344
1345   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1346
1347   /* For dgram type of service, allocate session and fifos now */
1348   app_wrk = app_worker_get (rmt->app_wrk_index);
1349   s = session_alloc_for_connection (tc);
1350   s->app_wrk_index = app_wrk->wrk_index;
1351   session_set_state (s, SESSION_STATE_OPENED);
1352   if (app_worker_init_connected (app_wrk, s))
1353     {
1354       session_free (s);
1355       return -1;
1356     }
1357
1358   sh = session_handle (s);
1359   *rsh = sh;
1360
1361   session_lookup_add_connection (tc, sh);
1362   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1363 }
1364
1365 int
1366 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1367 {
1368   transport_connection_t *tc;
1369   transport_endpoint_cfg_t *tep;
1370   app_worker_t *app_wrk;
1371   session_t *ho;
1372   int rv;
1373
1374   tep = session_endpoint_to_transport_cfg (rmt);
1375   rv = transport_connect (rmt->transport_proto, tep);
1376   if (rv < 0)
1377     {
1378       SESSION_DBG ("Transport failed to open connection.");
1379       return rv;
1380     }
1381
1382   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1383
1384   app_wrk = app_worker_get (rmt->app_wrk_index);
1385
1386   /* If transport offers a vc service, only allocate established
1387    * session once the connection has been established.
1388    * In the meantime allocate half-open session for tracking purposes
1389    * associate half-open connection to it and add session to app-worker
1390    * half-open table. These are needed to allocate the established
1391    * session on transport notification, and to cleanup the half-open
1392    * session if the app detaches before connection establishment.
1393    */
1394   ho = session_alloc_for_half_open (tc);
1395   ho->app_wrk_index = app_wrk->wrk_index;
1396   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1397   ho->opaque = rmt->opaque;
1398   *rsh = session_handle (ho);
1399
1400   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1401     session_lookup_add_half_open (tc, tc->c_index);
1402
1403   return 0;
1404 }
1405
1406 int
1407 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1408 {
1409   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1410
1411   /* Not supported for now */
1412   *rsh = SESSION_INVALID_HANDLE;
1413   return transport_connect (rmt->transport_proto, tep_cfg);
1414 }
1415
1416 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1417                                         session_handle_t *);
1418
1419 /* *INDENT-OFF* */
1420 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1421   session_open_vc,
1422   session_open_cl,
1423   session_open_app,
1424 };
1425 /* *INDENT-ON* */
1426
1427 /**
1428  * Ask transport to open connection to remote transport endpoint.
1429  *
1430  * Stores handle for matching request with reply since the call can be
1431  * asynchronous. For instance, for TCP the 3-way handshake must complete
1432  * before reply comes. Session is only created once connection is established.
1433  *
1434  * @param app_index Index of the application requesting the connect
1435  * @param st Session type requested.
1436  * @param tep Remote transport endpoint
1437  * @param opaque Opaque data (typically, api_context) the application expects
1438  *               on open completion.
1439  */
1440 int
1441 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1442 {
1443   transport_service_type_t tst;
1444   tst = transport_protocol_service_type (rmt->transport_proto);
1445   return session_open_srv_fns[tst](rmt, rsh);
1446 }
1447
1448 /**
1449  * Ask transport to listen on session endpoint.
1450  *
1451  * @param s Session for which listen will be called. Note that unlike
1452  *          established sessions, listen sessions are not associated to a
1453  *          thread.
1454  * @param sep Local endpoint to be listened on.
1455  */
1456 int
1457 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1458 {
1459   transport_endpoint_cfg_t *tep;
1460   int tc_index;
1461   u32 s_index;
1462
1463   /* Transport bind/listen */
1464   tep = session_endpoint_to_transport_cfg (sep);
1465   s_index = ls->session_index;
1466   tc_index = transport_start_listen (session_get_transport_proto (ls),
1467                                      s_index, tep);
1468
1469   if (tc_index < 0)
1470     return tc_index;
1471
1472   /* Attach transport to session. Lookup tables are populated by the app
1473    * worker because local tables (for ct sessions) are not backed by a fib */
1474   ls = listen_session_get (s_index);
1475   ls->connection_index = tc_index;
1476   ls->opaque = sep->opaque;
1477
1478   return 0;
1479 }
1480
1481 /**
1482  * Ask transport to stop listening on local transport endpoint.
1483  *
1484  * @param s Session to stop listening on. It must be in state LISTENING.
1485  */
1486 int
1487 session_stop_listen (session_t * s)
1488 {
1489   transport_proto_t tp = session_get_transport_proto (s);
1490   transport_connection_t *tc;
1491
1492   if (s->session_state != SESSION_STATE_LISTENING)
1493     return SESSION_E_NOLISTEN;
1494
1495   tc = transport_get_listener (tp, s->connection_index);
1496
1497   /* If no transport, assume everything was cleaned up already */
1498   if (!tc)
1499     return SESSION_E_NONE;
1500
1501   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1502     session_lookup_del_connection (tc);
1503
1504   transport_stop_listen (tp, s->connection_index);
1505   return 0;
1506 }
1507
1508 /**
1509  * Initialize session half-closing procedure.
1510  *
1511  * Note that half-closing will not change the state of the session.
1512  */
1513 void
1514 session_half_close (session_t *s)
1515 {
1516   if (!s)
1517     return;
1518
1519   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1520 }
1521
1522 /**
1523  * Initialize session closing procedure.
1524  *
1525  * Request is always sent to session node to ensure that all outstanding
1526  * requests are served before transport is notified.
1527  */
1528 void
1529 session_close (session_t * s)
1530 {
1531   if (!s || (s->flags & SESSION_F_APP_CLOSED))
1532     return;
1533
1534   /* Transports can close and delete their state independent of app closes
1535    * and transport initiated state transitions can hide app closes. Instead
1536    * of extending the state machine to support separate tracking of app and
1537    * transport initiated closes, use a flag. */
1538   s->flags |= SESSION_F_APP_CLOSED;
1539
1540   if (s->session_state >= SESSION_STATE_CLOSING)
1541     {
1542       /* Session will only be removed once both app and transport
1543        * acknowledge the close */
1544       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1545           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1546         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1547       return;
1548     }
1549
1550   /* App closed so stop propagating dequeue notifications.
1551    * App might disconnect session before connected, in this case,
1552    * tx_fifo may not be setup yet, so clear only it's inited. */
1553   if (s->tx_fifo)
1554     svm_fifo_clear_deq_ntf (s->tx_fifo);
1555   session_set_state (s, SESSION_STATE_CLOSING);
1556   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1557 }
1558
1559 /**
1560  * Force a close without waiting for data to be flushed
1561  */
1562 void
1563 session_reset (session_t * s)
1564 {
1565   if (s->session_state >= SESSION_STATE_CLOSING)
1566     return;
1567   /* Drop all outstanding tx data
1568    * App might disconnect session before connected, in this case,
1569    * tx_fifo may not be setup yet, so clear only it's inited. */
1570   if (s->tx_fifo)
1571     svm_fifo_dequeue_drop_all (s->tx_fifo);
1572   session_set_state (s, SESSION_STATE_CLOSING);
1573   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1574 }
1575
1576 /**
1577  * Notify transport the session can be half-disconnected.
1578  *
1579  * Must be called from the session's thread.
1580  */
1581 void
1582 session_transport_half_close (session_t *s)
1583 {
1584   /* Only READY session can be half-closed */
1585   if (s->session_state != SESSION_STATE_READY)
1586     {
1587       return;
1588     }
1589
1590   transport_half_close (session_get_transport_proto (s), s->connection_index,
1591                         s->thread_index);
1592 }
1593
1594 /**
1595  * Notify transport the session can be disconnected. This should eventually
1596  * result in a delete notification that allows us to cleanup session state.
1597  * Called for both active/passive disconnects.
1598  *
1599  * Must be called from the session's thread.
1600  */
1601 void
1602 session_transport_close (session_t * s)
1603 {
1604   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1605     {
1606       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1607         session_set_state (s, SESSION_STATE_CLOSED);
1608       /* If transport is already deleted, just free the session */
1609       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1610         session_free_w_fifos (s);
1611       return;
1612     }
1613
1614   /* If the tx queue wasn't drained, the transport can continue to try
1615    * sending the outstanding data (in closed state it cannot). It MUST however
1616    * at one point, either after sending everything or after a timeout, call
1617    * delete notify. This will finally lead to the complete cleanup of the
1618    * session.
1619    */
1620   session_set_state (s, SESSION_STATE_APP_CLOSED);
1621
1622   transport_close (session_get_transport_proto (s), s->connection_index,
1623                    s->thread_index);
1624 }
1625
1626 /**
1627  * Force transport close
1628  */
1629 void
1630 session_transport_reset (session_t * s)
1631 {
1632   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1633     {
1634       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1635         session_set_state (s, SESSION_STATE_CLOSED);
1636       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1637         session_free_w_fifos (s);
1638       return;
1639     }
1640
1641   session_set_state (s, SESSION_STATE_APP_CLOSED);
1642   transport_reset (session_get_transport_proto (s), s->connection_index,
1643                    s->thread_index);
1644 }
1645
1646 /**
1647  * Cleanup transport and session state.
1648  *
1649  * Notify transport of the cleanup and free the session. This should
1650  * be called only if transport reported some error and is already
1651  * closed.
1652  */
1653 void
1654 session_transport_cleanup (session_t * s)
1655 {
1656   /* Delete from main lookup table before we axe the the transport */
1657   session_lookup_del_session (s);
1658   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1659     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1660                        s->thread_index);
1661   /* Since we called cleanup, no delete notification will come. So, make
1662    * sure the session is properly freed. */
1663   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1664   session_free (s);
1665 }
1666
1667 /**
1668  * Allocate worker mqs in share-able segment
1669  *
1670  * That can only be a newly created memfd segment, that must be mapped
1671  * by all apps/stack users unless private rx mqs are enabled.
1672  */
1673 void
1674 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1675 {
1676   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1677   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1678   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1679   uword mqs_seg_size;
1680   int i;
1681
1682   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1683
1684   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1685     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1686   };
1687   cfg->consumer_pid = 0;
1688   cfg->n_rings = 2;
1689   cfg->q_nitems = mq_q_length;
1690   cfg->ring_cfgs = rc;
1691
1692   /*
1693    * Compute mqs segment size based on rings config and leave space
1694    * for passing extended configuration messages, i.e., data allocated
1695    * outside of the rings. If provided with a config value, accept it
1696    * if larger than minimum size.
1697    */
1698   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1699   mqs_seg_size = mqs_seg_size + (1 << 20);
1700   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1701
1702   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1703   mqs_seg->ssvm.my_pid = getpid ();
1704   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1705
1706   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1707     {
1708       clib_warning ("failed to initialize queue segment");
1709       return;
1710     }
1711
1712   fifo_segment_init (mqs_seg);
1713
1714   /* Special fifo segment that's filled only with mqs */
1715   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1716
1717   for (i = 0; i < vec_len (smm->wrk); i++)
1718     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1719 }
1720
1721 fifo_segment_t *
1722 session_main_get_wrk_mqs_segment (void)
1723 {
1724   return &session_main.wrk_mqs_segment;
1725 }
1726
1727 u64
1728 session_segment_handle (session_t * s)
1729 {
1730   svm_fifo_t *f;
1731
1732   if (!s->rx_fifo)
1733     return SESSION_INVALID_HANDLE;
1734
1735   f = s->rx_fifo;
1736   return segment_manager_make_segment_handle (f->segment_manager,
1737                                               f->segment_index);
1738 }
1739
1740 /* *INDENT-OFF* */
1741 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1742     session_tx_fifo_peek_and_snd,
1743     session_tx_fifo_dequeue_and_snd,
1744     session_tx_fifo_dequeue_internal,
1745     session_tx_fifo_dequeue_and_snd
1746 };
1747 /* *INDENT-ON* */
1748
1749 void
1750 session_register_transport (transport_proto_t transport_proto,
1751                             const transport_proto_vft_t * vft, u8 is_ip4,
1752                             u32 output_node)
1753 {
1754   session_main_t *smm = &session_main;
1755   session_type_t session_type;
1756   u32 next_index = ~0;
1757
1758   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1759
1760   vec_validate (smm->session_type_to_next, session_type);
1761   vec_validate (smm->session_tx_fns, session_type);
1762
1763   if (output_node != ~0)
1764     next_index = vlib_node_add_next (vlib_get_main (),
1765                                      session_queue_node.index, output_node);
1766
1767   smm->session_type_to_next[session_type] = next_index;
1768   smm->session_tx_fns[session_type] =
1769     session_tx_fns[vft->transport_options.tx_type];
1770 }
1771
1772 void
1773 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1774 {
1775   session_main_t *smm = &session_main;
1776   session_update_time_fn *fi;
1777   u32 fi_pos = ~0;
1778   u8 found = 0;
1779
1780   vec_foreach (fi, smm->update_time_fns)
1781     {
1782       if (*fi == fn)
1783         {
1784           fi_pos = fi - smm->update_time_fns;
1785           found = 1;
1786           break;
1787         }
1788     }
1789
1790   if (is_add)
1791     {
1792       if (found)
1793         {
1794           clib_warning ("update time fn %p already registered", fn);
1795           return;
1796         }
1797       vec_add1 (smm->update_time_fns, fn);
1798     }
1799   else
1800     {
1801       vec_del1 (smm->update_time_fns, fi_pos);
1802     }
1803 }
1804
1805 transport_proto_t
1806 session_add_transport_proto (void)
1807 {
1808   session_main_t *smm = &session_main;
1809   session_worker_t *wrk;
1810   u32 thread;
1811
1812   smm->last_transport_proto_type += 1;
1813
1814   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1815     {
1816       wrk = session_main_get_worker (thread);
1817       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1818     }
1819
1820   return smm->last_transport_proto_type;
1821 }
1822
1823 transport_connection_t *
1824 session_get_transport (session_t * s)
1825 {
1826   if (s->session_state != SESSION_STATE_LISTENING)
1827     return transport_get_connection (session_get_transport_proto (s),
1828                                      s->connection_index, s->thread_index);
1829   else
1830     return transport_get_listener (session_get_transport_proto (s),
1831                                    s->connection_index);
1832 }
1833
1834 void
1835 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1836 {
1837   if (s->session_state != SESSION_STATE_LISTENING)
1838     return transport_get_endpoint (session_get_transport_proto (s),
1839                                    s->connection_index, s->thread_index, tep,
1840                                    is_lcl);
1841   else
1842     return transport_get_listener_endpoint (session_get_transport_proto (s),
1843                                             s->connection_index, tep, is_lcl);
1844 }
1845
1846 int
1847 session_transport_attribute (session_t *s, u8 is_get,
1848                              transport_endpt_attr_t *attr)
1849 {
1850   if (s->session_state < SESSION_STATE_READY)
1851     return -1;
1852
1853   return transport_connection_attribute (session_get_transport_proto (s),
1854                                          s->connection_index, s->thread_index,
1855                                          is_get, attr);
1856 }
1857
1858 transport_connection_t *
1859 listen_session_get_transport (session_t * s)
1860 {
1861   return transport_get_listener (session_get_transport_proto (s),
1862                                  s->connection_index);
1863 }
1864
1865 void
1866 session_queue_run_on_main_thread (vlib_main_t * vm)
1867 {
1868   ASSERT (vlib_get_thread_index () == 0);
1869   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1870 }
1871
1872 static void
1873 session_stats_collector_fn (vlib_stats_collector_data_t *d)
1874 {
1875   u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1876   session_main_t *smm = &session_main;
1877   session_worker_t *wrk;
1878   counter_t **counters;
1879   counter_t *cb;
1880
1881   n_workers = vec_len (smm->wrk);
1882   vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1883   counters = d->entry->data;
1884   cb = counters[0];
1885
1886   for (i = 0; i < vec_len (smm->wrk); i++)
1887     {
1888       wrk = session_main_get_worker (i);
1889       n_wrk_sessions = pool_elts (wrk->sessions);
1890       cb[i] = n_wrk_sessions;
1891       n_sessions += n_wrk_sessions;
1892     }
1893
1894   vlib_stats_set_gauge (d->private_data, n_sessions);
1895 }
1896
1897 static void
1898 session_stats_collector_init (void)
1899 {
1900   vlib_stats_collector_reg_t reg = {};
1901
1902   reg.entry_index =
1903     vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1904   reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1905   reg.collect_fn = session_stats_collector_fn;
1906   vlib_stats_register_collector_fn (&reg);
1907   vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1908 }
1909
1910 static clib_error_t *
1911 session_manager_main_enable (vlib_main_t * vm)
1912 {
1913   session_main_t *smm = &session_main;
1914   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1915   u32 num_threads, preallocated_sessions_per_worker;
1916   session_worker_t *wrk;
1917   int i;
1918
1919   /* We only initialize once and do not de-initialized on disable */
1920   if (smm->is_initialized)
1921     goto done;
1922
1923   num_threads = 1 /* main thread */  + vtm->n_threads;
1924
1925   if (num_threads < 1)
1926     return clib_error_return (0, "n_thread_stacks not set");
1927
1928   /* Allocate cache line aligned worker contexts */
1929   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1930   clib_spinlock_init (&session_main.pool_realloc_lock);
1931
1932   for (i = 0; i < num_threads; i++)
1933     {
1934       wrk = &smm->wrk[i];
1935       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1936       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1937       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1938       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1939       wrk->evts_pending_main =
1940         clib_llist_make_head (wrk->event_elts, evt_list);
1941       wrk->vm = vlib_get_main_by_index (i);
1942       wrk->last_vlib_time = vlib_time_now (vm);
1943       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1944       wrk->timerfd = -1;
1945       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1946
1947       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1948         session_wrk_enable_adaptive_mode (wrk);
1949     }
1950
1951   /* Allocate vpp event queues segment and queue */
1952   session_vpp_wrk_mqs_alloc (smm);
1953
1954   /* Initialize segment manager properties */
1955   segment_manager_main_init ();
1956
1957   /* Preallocate sessions */
1958   if (smm->preallocated_sessions)
1959     {
1960       if (num_threads == 1)
1961         {
1962           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1963         }
1964       else
1965         {
1966           int j;
1967           preallocated_sessions_per_worker =
1968             (1.1 * (f64) smm->preallocated_sessions /
1969              (f64) (num_threads - 1));
1970
1971           for (j = 1; j < num_threads; j++)
1972             {
1973               pool_init_fixed (smm->wrk[j].sessions,
1974                                preallocated_sessions_per_worker);
1975             }
1976         }
1977     }
1978
1979   session_lookup_init ();
1980   app_namespaces_init ();
1981   transport_init ();
1982   session_stats_collector_init ();
1983   smm->is_initialized = 1;
1984
1985 done:
1986
1987   smm->is_enabled = 1;
1988
1989   /* Enable transports */
1990   transport_enable_disable (vm, 1);
1991   session_debug_init ();
1992
1993   return 0;
1994 }
1995
1996 static void
1997 session_manager_main_disable (vlib_main_t * vm)
1998 {
1999   transport_enable_disable (vm, 0 /* is_en */ );
2000 }
2001
2002 /* in this new callback, cookie hint the index */
2003 void
2004 session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
2005 {
2006   session_worker_t *wrk;
2007   wrk = session_main_get_worker (vm->thread_index);
2008   session_dma_transfer *dma_transfer;
2009
2010   dma_transfer = &wrk->dma_trans[wrk->trans_head];
2011   vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2012            vec_len (dma_transfer->pending_tx_buffers));
2013   vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2014            vec_len (dma_transfer->pending_tx_nexts));
2015   vec_reset_length (dma_transfer->pending_tx_buffers);
2016   vec_reset_length (dma_transfer->pending_tx_nexts);
2017   wrk->trans_head++;
2018   if (wrk->trans_head == wrk->trans_size)
2019     wrk->trans_head = 0;
2020   return;
2021 }
2022
2023 static void
2024 session_prepare_dma_args (vlib_dma_config_t *args)
2025 {
2026   args->max_batches = 16;
2027   args->max_transfers = DMA_TRANS_SIZE;
2028   args->max_transfer_size = 65536;
2029   args->features = 0;
2030   args->sw_fallback = 1;
2031   args->barrier_before_last = 1;
2032   args->callback_fn = session_dma_completion_cb;
2033 }
2034
2035 static void
2036 session_node_enable_dma (u8 is_en, int n_vlibs)
2037 {
2038   vlib_dma_config_t args;
2039   session_prepare_dma_args (&args);
2040   session_worker_t *wrk;
2041   vlib_main_t *vm;
2042
2043   int config_index = -1;
2044
2045   if (is_en)
2046     {
2047       vm = vlib_get_main_by_index (0);
2048       config_index = vlib_dma_config_add (vm, &args);
2049     }
2050   else
2051     {
2052       vm = vlib_get_main_by_index (0);
2053       wrk = session_main_get_worker (0);
2054       if (wrk->config_index >= 0)
2055         vlib_dma_config_del (vm, wrk->config_index);
2056     }
2057   int i;
2058   for (i = 0; i < n_vlibs; i++)
2059     {
2060       vm = vlib_get_main_by_index (i);
2061       wrk = session_main_get_worker (vm->thread_index);
2062       wrk->config_index = config_index;
2063       if (is_en)
2064         {
2065           if (config_index >= 0)
2066             wrk->dma_enabled = true;
2067           wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2068             sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2069           bzero (wrk->dma_trans,
2070                  sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2071         }
2072       else
2073         {
2074           if (wrk->dma_trans)
2075             clib_mem_free (wrk->dma_trans);
2076         }
2077       wrk->trans_head = 0;
2078       wrk->trans_tail = 0;
2079       wrk->trans_size = DMA_TRANS_SIZE;
2080     }
2081 }
2082
2083 void
2084 session_node_enable_disable (u8 is_en)
2085 {
2086   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2087   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2088   session_main_t *sm = &session_main;
2089   vlib_main_t *vm;
2090   vlib_node_t *n;
2091   int n_vlibs, i;
2092
2093   n_vlibs = vlib_get_n_threads ();
2094   for (i = 0; i < n_vlibs; i++)
2095     {
2096       vm = vlib_get_main_by_index (i);
2097       /* main thread with workers and not polling */
2098       if (i == 0 && n_vlibs > 1)
2099         {
2100           vlib_node_set_state (vm, session_queue_node.index, mstate);
2101           if (is_en)
2102             {
2103               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2104               vlib_node_set_state (vm, session_queue_process_node.index,
2105                                    state);
2106               n = vlib_get_node (vm, session_queue_process_node.index);
2107               vlib_start_process (vm, n->runtime_index);
2108             }
2109           else
2110             {
2111               vlib_process_signal_event_mt (vm,
2112                                             session_queue_process_node.index,
2113                                             SESSION_Q_PROCESS_STOP, 0);
2114             }
2115           if (!sm->poll_main)
2116             continue;
2117         }
2118       vlib_node_set_state (vm, session_queue_node.index, state);
2119     }
2120
2121   if (sm->use_private_rx_mqs)
2122     application_enable_rx_mqs_nodes (is_en);
2123
2124   if (sm->dma_enabled)
2125     session_node_enable_dma (is_en, n_vlibs);
2126 }
2127
2128 clib_error_t *
2129 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2130 {
2131   clib_error_t *error = 0;
2132   if (is_en)
2133     {
2134       if (session_main.is_enabled)
2135         return 0;
2136
2137       error = session_manager_main_enable (vm);
2138       session_node_enable_disable (is_en);
2139     }
2140   else
2141     {
2142       session_main.is_enabled = 0;
2143       session_manager_main_disable (vm);
2144       session_node_enable_disable (is_en);
2145     }
2146
2147   return error;
2148 }
2149
2150 clib_error_t *
2151 session_main_init (vlib_main_t * vm)
2152 {
2153   session_main_t *smm = &session_main;
2154
2155   smm->is_enabled = 0;
2156   smm->session_enable_asap = 0;
2157   smm->poll_main = 0;
2158   smm->use_private_rx_mqs = 0;
2159   smm->no_adaptive = 0;
2160   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2161
2162   return 0;
2163 }
2164
2165 static clib_error_t *
2166 session_main_loop_init (vlib_main_t * vm)
2167 {
2168   session_main_t *smm = &session_main;
2169   if (smm->session_enable_asap)
2170     {
2171       vlib_worker_thread_barrier_sync (vm);
2172       vnet_session_enable_disable (vm, 1 /* is_en */ );
2173       vlib_worker_thread_barrier_release (vm);
2174     }
2175   return 0;
2176 }
2177
2178 VLIB_INIT_FUNCTION (session_main_init);
2179 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2180
2181 static clib_error_t *
2182 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2183 {
2184   session_main_t *smm = &session_main;
2185   u32 nitems;
2186   uword tmp;
2187
2188   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2189     {
2190       if (unformat (input, "wrk-mq-length %d", &nitems))
2191         {
2192           if (nitems >= 2048)
2193             smm->configured_wrk_mq_length = nitems;
2194           else
2195             clib_warning ("event queue length %d too small, ignored", nitems);
2196         }
2197       else if (unformat (input, "wrk-mqs-segment-size %U",
2198                          unformat_memory_size, &smm->wrk_mqs_segment_size))
2199         ;
2200       else if (unformat (input, "preallocated-sessions %d",
2201                          &smm->preallocated_sessions))
2202         ;
2203       else if (unformat (input, "v4-session-table-buckets %d",
2204                          &smm->configured_v4_session_table_buckets))
2205         ;
2206       else if (unformat (input, "v4-halfopen-table-buckets %d",
2207                          &smm->configured_v4_halfopen_table_buckets))
2208         ;
2209       else if (unformat (input, "v6-session-table-buckets %d",
2210                          &smm->configured_v6_session_table_buckets))
2211         ;
2212       else if (unformat (input, "v6-halfopen-table-buckets %d",
2213                          &smm->configured_v6_halfopen_table_buckets))
2214         ;
2215       else if (unformat (input, "v4-session-table-memory %U",
2216                          unformat_memory_size, &tmp))
2217         {
2218           if (tmp >= 0x100000000)
2219             return clib_error_return (0, "memory size %llx (%lld) too large",
2220                                       tmp, tmp);
2221           smm->configured_v4_session_table_memory = tmp;
2222         }
2223       else if (unformat (input, "v4-halfopen-table-memory %U",
2224                          unformat_memory_size, &tmp))
2225         {
2226           if (tmp >= 0x100000000)
2227             return clib_error_return (0, "memory size %llx (%lld) too large",
2228                                       tmp, tmp);
2229           smm->configured_v4_halfopen_table_memory = tmp;
2230         }
2231       else if (unformat (input, "v6-session-table-memory %U",
2232                          unformat_memory_size, &tmp))
2233         {
2234           if (tmp >= 0x100000000)
2235             return clib_error_return (0, "memory size %llx (%lld) too large",
2236                                       tmp, tmp);
2237           smm->configured_v6_session_table_memory = tmp;
2238         }
2239       else if (unformat (input, "v6-halfopen-table-memory %U",
2240                          unformat_memory_size, &tmp))
2241         {
2242           if (tmp >= 0x100000000)
2243             return clib_error_return (0, "memory size %llx (%lld) too large",
2244                                       tmp, tmp);
2245           smm->configured_v6_halfopen_table_memory = tmp;
2246         }
2247       else if (unformat (input, "local-endpoints-table-memory %U",
2248                          unformat_memory_size, &tmp))
2249         {
2250           if (tmp >= 0x100000000)
2251             return clib_error_return (0, "memory size %llx (%lld) too large",
2252                                       tmp, tmp);
2253           smm->local_endpoints_table_memory = tmp;
2254         }
2255       else if (unformat (input, "local-endpoints-table-buckets %d",
2256                          &smm->local_endpoints_table_buckets))
2257         ;
2258       else if (unformat (input, "enable"))
2259         smm->session_enable_asap = 1;
2260       else if (unformat (input, "use-app-socket-api"))
2261         (void) appns_sapi_enable_disable (1 /* is_enable */);
2262       else if (unformat (input, "poll-main"))
2263         smm->poll_main = 1;
2264       else if (unformat (input, "use-private-rx-mqs"))
2265         smm->use_private_rx_mqs = 1;
2266       else if (unformat (input, "no-adaptive"))
2267         smm->no_adaptive = 1;
2268       else if (unformat (input, "use-dma"))
2269         smm->dma_enabled = 1;
2270       /*
2271        * Deprecated but maintained for compatibility
2272        */
2273       else if (unformat (input, "evt_qs_memfd_seg"))
2274         ;
2275       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2276         ;
2277       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2278                          &smm->wrk_mqs_segment_size))
2279         ;
2280       else if (unformat (input, "event-queue-length %d", &nitems))
2281         {
2282           if (nitems >= 2048)
2283             smm->configured_wrk_mq_length = nitems;
2284           else
2285             clib_warning ("event queue length %d too small, ignored", nitems);
2286         }
2287       else
2288         return clib_error_return (0, "unknown input `%U'",
2289                                   format_unformat_error, input);
2290     }
2291   return 0;
2292 }
2293
2294 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2295
2296 /*
2297  * fd.io coding-style-patch-verification: ON
2298  *
2299  * Local Variables:
2300  * eval: (c-set-style "gnu")
2301  * End:
2302  */