a07082b766c0b9ef5442c37694339713947df512
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24 #include <vlib/stats/stats.h>
25 #include <vlib/dma/dma.h>
26
27 session_main_t session_main;
28
29 static inline int
30 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
31                             session_evt_type_t evt_type)
32 {
33   session_worker_t *wrk = session_main_get_worker (thread_index);
34   session_event_t *evt;
35   svm_msg_q_msg_t msg;
36   svm_msg_q_t *mq;
37
38   mq = wrk->vpp_event_queue;
39   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
40     return -1;
41   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
42     {
43       svm_msg_q_unlock (mq);
44       return -2;
45     }
46   switch (evt_type)
47     {
48     case SESSION_CTRL_EVT_RPC:
49       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
50       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
51       evt->rpc_args.fp = data;
52       evt->rpc_args.arg = args;
53       break;
54     case SESSION_IO_EVT_RX:
55     case SESSION_IO_EVT_TX:
56     case SESSION_IO_EVT_TX_FLUSH:
57     case SESSION_IO_EVT_BUILTIN_RX:
58       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
59       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
60       evt->session_index = *(u32 *) data;
61       break;
62     case SESSION_IO_EVT_TX_MAIN:
63     case SESSION_CTRL_EVT_CLOSE:
64     case SESSION_CTRL_EVT_RESET:
65       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
66       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
67       evt->session_handle = session_handle ((session_t *) data);
68       break;
69     default:
70       clib_warning ("evt unhandled!");
71       svm_msg_q_unlock (mq);
72       return -1;
73     }
74   evt->event_type = evt_type;
75
76   svm_msg_q_add_and_unlock (mq, &msg);
77
78   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
79     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
80
81   return 0;
82 }
83
84 int
85 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
86 {
87   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
88                                      f->master_thread_index, evt_type);
89 }
90
91 int
92 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
93                                       session_evt_type_t evt_type)
94 {
95   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
96 }
97
98 int
99 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
100 {
101   /* only events supported are disconnect, shutdown and reset */
102   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
104           evt_type == SESSION_CTRL_EVT_RESET);
105   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
106 }
107
108 void
109 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
110                                       void *rpc_args)
111 {
112   session_send_evt_to_thread (fp, rpc_args, thread_index,
113                               SESSION_CTRL_EVT_RPC);
114 }
115
116 void
117 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
118 {
119   if (thread_index != vlib_get_thread_index ())
120     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
121   else
122     {
123       void (*fnp) (void *) = fp;
124       fnp (rpc_args);
125     }
126 }
127
128 void
129 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
130 {
131   session_t *s = session_get (tc->s_index, tc->thread_index);
132
133   ASSERT (s->thread_index == vlib_get_thread_index ());
134   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
135
136   if (!(s->flags & SESSION_F_CUSTOM_TX))
137     {
138       s->flags |= SESSION_F_CUSTOM_TX;
139       if (svm_fifo_set_event (s->tx_fifo)
140           || transport_connection_is_descheduled (tc))
141         {
142           session_evt_elt_t *elt;
143           session_worker_t *wrk;
144
145           wrk = session_main_get_worker (tc->thread_index);
146           if (has_prio)
147             elt = session_evt_alloc_new (wrk);
148           else
149             elt = session_evt_alloc_old (wrk);
150           elt->evt.session_index = tc->s_index;
151           elt->evt.event_type = SESSION_IO_EVT_TX;
152           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
153
154           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
155             vlib_node_set_interrupt_pending (wrk->vm,
156                                              session_queue_node.index);
157         }
158     }
159 }
160
161 void
162 sesssion_reschedule_tx (transport_connection_t * tc)
163 {
164   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
165   session_evt_elt_t *elt;
166
167   ASSERT (tc->thread_index == vlib_get_thread_index ());
168
169   elt = session_evt_alloc_new (wrk);
170   elt->evt.session_index = tc->s_index;
171   elt->evt.event_type = SESSION_IO_EVT_TX;
172
173   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
174     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
175 }
176
177 static void
178 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
179 {
180   u32 thread_index = vlib_get_thread_index ();
181   session_evt_elt_t *elt;
182   session_worker_t *wrk;
183
184   /* If we are in the handler thread, or being called with the worker barrier
185    * held, just append a new event to pending disconnects vector. */
186   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
187     {
188       wrk = session_main_get_worker (s->thread_index);
189       elt = session_evt_alloc_ctrl (wrk);
190       clib_memset (&elt->evt, 0, sizeof (session_event_t));
191       elt->evt.session_handle = session_handle (s);
192       elt->evt.event_type = evt;
193
194       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
195         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
196     }
197   else
198     session_send_ctrl_evt_to_thread (s, evt);
199 }
200
201 session_t *
202 session_alloc (u32 thread_index)
203 {
204   session_worker_t *wrk = &session_main.wrk[thread_index];
205   session_t *s;
206
207   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
208   clib_memset (s, 0, sizeof (*s));
209   s->session_index = s - wrk->sessions;
210   s->thread_index = thread_index;
211   s->app_index = APP_INVALID_INDEX;
212
213   return s;
214 }
215
216 void
217 session_free (session_t * s)
218 {
219   session_worker_t *wrk = &session_main.wrk[s->thread_index];
220
221   SESSION_EVT (SESSION_EVT_FREE, s);
222   if (CLIB_DEBUG)
223     clib_memset (s, 0xFA, sizeof (*s));
224   pool_put (wrk->sessions, s);
225 }
226
227 u8
228 session_is_valid (u32 si, u8 thread_index)
229 {
230   session_t *s;
231   transport_connection_t *tc;
232
233   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
234
235   if (s->thread_index != thread_index || s->session_index != si)
236     return 0;
237
238   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
239       || s->session_state <= SESSION_STATE_LISTENING)
240     return 1;
241
242   if (s->session_state == SESSION_STATE_CONNECTING &&
243       (s->flags & SESSION_F_HALF_OPEN))
244     return 1;
245
246   tc = session_get_transport (s);
247   if (s->connection_index != tc->c_index
248       || s->thread_index != tc->thread_index || tc->s_index != si)
249     return 0;
250
251   return 1;
252 }
253
254 static void
255 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
256 {
257   app_worker_t *app_wrk;
258
259   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
260   if (!app_wrk)
261     return;
262   app_worker_cleanup_notify (app_wrk, s, ntf);
263 }
264
265 void
266 session_free_w_fifos (session_t * s)
267 {
268   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
269   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
270   session_free (s);
271 }
272
273 /**
274  * Cleans up session and lookup table.
275  *
276  * Transport connection must still be valid.
277  */
278 static void
279 session_delete (session_t * s)
280 {
281   int rv;
282
283   /* Delete from the main lookup table. */
284   if ((rv = session_lookup_del_session (s)))
285     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
286
287   session_free_w_fifos (s);
288 }
289
290 void
291 session_cleanup_half_open (session_handle_t ho_handle)
292 {
293   session_t *ho = session_get_from_handle (ho_handle);
294
295   /* App transports can migrate their half-opens */
296   if (ho->flags & SESSION_F_IS_MIGRATING)
297     {
298       /* Session still migrating, move to closed state to signal that the
299        * session should be removed. */
300       if (ho->connection_index == ~0)
301         {
302           session_set_state (ho, SESSION_STATE_CLOSED);
303           return;
304         }
305       /* Migrated transports are no longer half-opens */
306       transport_cleanup (session_get_transport_proto (ho),
307                          ho->connection_index, ho->app_index /* overloaded */);
308     }
309   else
310     {
311       /* Cleanup half-open session lookup table if need be */
312       if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSING)
313         {
314           transport_connection_t *tc;
315           tc = transport_get_half_open (session_get_transport_proto (ho),
316                                         ho->connection_index);
317           if (tc && !(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
318             session_lookup_del_half_open (tc);
319         }
320       transport_cleanup_half_open (session_get_transport_proto (ho),
321                                    ho->connection_index);
322     }
323   session_free (ho);
324 }
325
326 static void
327 session_half_open_free (session_t *ho)
328 {
329   app_worker_t *app_wrk;
330
331   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
332   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
333   if (app_wrk)
334     app_worker_del_half_open (app_wrk, ho);
335   session_free (ho);
336 }
337
338 static void
339 session_half_open_free_rpc (void *args)
340 {
341   session_t *ho = ho_session_get (pointer_to_uword (args));
342   session_half_open_free (ho);
343 }
344
345 void
346 session_half_open_delete_notify (transport_connection_t *tc)
347 {
348   session_t *ho = ho_session_get (tc->s_index);
349
350   /* Cleanup half-open lookup table if need be */
351   if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSING)
352     {
353       if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
354         session_lookup_del_half_open (tc);
355     }
356
357   /* Notification from ctrl thread accepted without rpc */
358   if (tc->thread_index == transport_cl_thread ())
359     {
360       session_half_open_free (ho);
361     }
362   else
363     {
364       void *args = uword_to_pointer ((uword) tc->s_index, void *);
365       session_send_rpc_evt_to_thread_force (transport_cl_thread (),
366                                             session_half_open_free_rpc, args);
367     }
368 }
369
370 void
371 session_half_open_migrate_notify (transport_connection_t *tc)
372 {
373   session_t *ho;
374
375   /* Support half-open migrations only for transports with no lookup */
376   ASSERT (tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP);
377
378   ho = ho_session_get (tc->s_index);
379   ho->flags |= SESSION_F_IS_MIGRATING;
380   ho->connection_index = ~0;
381 }
382
383 int
384 session_half_open_migrated_notify (transport_connection_t *tc)
385 {
386   session_t *ho;
387
388   ho = ho_session_get (tc->s_index);
389
390   /* App probably detached so the half-open must be cleaned up */
391   if (ho->session_state == SESSION_STATE_CLOSED)
392     {
393       session_half_open_delete_notify (tc);
394       return -1;
395     }
396   ho->connection_index = tc->c_index;
397   /* Overload app index for half-open with new thread */
398   ho->app_index = tc->thread_index;
399   return 0;
400 }
401
402 session_t *
403 session_alloc_for_connection (transport_connection_t * tc)
404 {
405   session_t *s;
406   u32 thread_index = tc->thread_index;
407
408   ASSERT (thread_index == vlib_get_thread_index ()
409           || transport_protocol_is_cl (tc->proto));
410
411   s = session_alloc (thread_index);
412   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
413   session_set_state (s, SESSION_STATE_CLOSED);
414
415   /* Attach transport to session and vice versa */
416   s->connection_index = tc->c_index;
417   tc->s_index = s->session_index;
418   return s;
419 }
420
421 session_t *
422 session_alloc_for_half_open (transport_connection_t *tc)
423 {
424   session_t *s;
425
426   s = ho_session_alloc ();
427   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
428   s->connection_index = tc->c_index;
429   tc->s_index = s->session_index;
430   return s;
431 }
432
433 /**
434  * Discards bytes from buffer chain
435  *
436  * It discards n_bytes_to_drop starting at first buffer after chain_b
437  */
438 always_inline void
439 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
440                                      vlib_buffer_t ** chain_b,
441                                      u32 n_bytes_to_drop)
442 {
443   vlib_buffer_t *next = *chain_b;
444   u32 to_drop = n_bytes_to_drop;
445   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
446   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
447     {
448       next = vlib_get_buffer (vm, next->next_buffer);
449       if (next->current_length > to_drop)
450         {
451           vlib_buffer_advance (next, to_drop);
452           to_drop = 0;
453         }
454       else
455         {
456           to_drop -= next->current_length;
457           next->current_length = 0;
458         }
459     }
460   *chain_b = next;
461
462   if (to_drop == 0)
463     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
464 }
465
466 /**
467  * Enqueue buffer chain tail
468  */
469 always_inline int
470 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
471                             u32 offset, u8 is_in_order)
472 {
473   vlib_buffer_t *chain_b;
474   u32 chain_bi, len, diff;
475   vlib_main_t *vm = vlib_get_main ();
476   u8 *data;
477   u32 written = 0;
478   int rv = 0;
479
480   if (is_in_order && offset)
481     {
482       diff = offset - b->current_length;
483       if (diff > b->total_length_not_including_first_buffer)
484         return 0;
485       chain_b = b;
486       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
487       chain_bi = vlib_get_buffer_index (vm, chain_b);
488     }
489   else
490     chain_bi = b->next_buffer;
491
492   do
493     {
494       chain_b = vlib_get_buffer (vm, chain_bi);
495       data = vlib_buffer_get_current (chain_b);
496       len = chain_b->current_length;
497       if (!len)
498         continue;
499       if (is_in_order)
500         {
501           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
502           if (rv == len)
503             {
504               written += rv;
505             }
506           else if (rv < len)
507             {
508               return (rv > 0) ? (written + rv) : written;
509             }
510           else if (rv > len)
511             {
512               written += rv;
513
514               /* written more than what was left in chain */
515               if (written > b->total_length_not_including_first_buffer)
516                 return written;
517
518               /* drop the bytes that have already been delivered */
519               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
520             }
521         }
522       else
523         {
524           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
525           if (rv)
526             {
527               clib_warning ("failed to enqueue multi-buffer seg");
528               return -1;
529             }
530           offset += len;
531         }
532     }
533   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
534           ? chain_b->next_buffer : 0));
535
536   if (is_in_order)
537     return written;
538
539   return 0;
540 }
541
542 void
543 session_fifo_tuning (session_t * s, svm_fifo_t * f,
544                      session_ft_action_t act, u32 len)
545 {
546   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
547     {
548       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
549       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
550       if (CLIB_ASSERT_ENABLE)
551         {
552           segment_manager_t *sm;
553           sm = segment_manager_get (f->segment_manager);
554           ASSERT (f->shr->size >= 4096);
555           ASSERT (f->shr->size <= sm->max_fifo_size);
556         }
557     }
558 }
559
560 /*
561  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
562  * event but on request can queue notification events for later delivery by
563  * calling stream_server_flush_enqueue_events().
564  *
565  * @param tc Transport connection which is to be enqueued data
566  * @param b Buffer to be enqueued
567  * @param offset Offset at which to start enqueueing if out-of-order
568  * @param queue_event Flag to indicate if peer is to be notified or if event
569  *                    is to be queued. The former is useful when more data is
570  *                    enqueued and only one event is to be generated.
571  * @param is_in_order Flag to indicate if data is in order
572  * @return Number of bytes enqueued or a negative value if enqueueing failed.
573  */
574 int
575 session_enqueue_stream_connection (transport_connection_t * tc,
576                                    vlib_buffer_t * b, u32 offset,
577                                    u8 queue_event, u8 is_in_order)
578 {
579   session_t *s;
580   int enqueued = 0, rv, in_order_off;
581
582   s = session_get (tc->s_index, tc->thread_index);
583
584   if (is_in_order)
585     {
586       enqueued = svm_fifo_enqueue (s->rx_fifo,
587                                    b->current_length,
588                                    vlib_buffer_get_current (b));
589       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
590                          && enqueued >= 0))
591         {
592           in_order_off = enqueued > b->current_length ? enqueued : 0;
593           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
594           if (rv > 0)
595             enqueued += rv;
596         }
597     }
598   else
599     {
600       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
601                                          b->current_length,
602                                          vlib_buffer_get_current (b));
603       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
604         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
605       /* if something was enqueued, report even this as success for ooo
606        * segment handling */
607       return rv;
608     }
609
610   if (queue_event)
611     {
612       /* Queue RX event on this fifo. Eventually these will need to be flushed
613        * by calling stream_server_flush_enqueue_events () */
614       session_worker_t *wrk;
615
616       wrk = session_main_get_worker (s->thread_index);
617       if (!(s->flags & SESSION_F_RX_EVT))
618         {
619           s->flags |= SESSION_F_RX_EVT;
620           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
621         }
622
623       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
624     }
625
626   return enqueued;
627 }
628
629 int
630 session_enqueue_dgram_connection (session_t * s,
631                                   session_dgram_hdr_t * hdr,
632                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
633 {
634   int rv;
635
636   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
637           >= b->current_length + sizeof (*hdr));
638
639   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
640     {
641       /* *INDENT-OFF* */
642       svm_fifo_seg_t segs[2] = {
643           { (u8 *) hdr, sizeof (*hdr) },
644           { vlib_buffer_get_current (b), b->current_length }
645       };
646       /* *INDENT-ON* */
647
648       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
649                                       0 /* allow_partial */ );
650     }
651   else
652     {
653       vlib_main_t *vm = vlib_get_main ();
654       svm_fifo_seg_t *segs = 0, *seg;
655       vlib_buffer_t *it = b;
656       u32 n_segs = 1;
657
658       vec_add2 (segs, seg, 1);
659       seg->data = (u8 *) hdr;
660       seg->len = sizeof (*hdr);
661       while (it)
662         {
663           vec_add2 (segs, seg, 1);
664           seg->data = vlib_buffer_get_current (it);
665           seg->len = it->current_length;
666           n_segs++;
667           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
668             break;
669           it = vlib_get_buffer (vm, it->next_buffer);
670         }
671       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
672                                       0 /* allow partial */ );
673       vec_free (segs);
674     }
675
676   if (queue_event && rv > 0)
677     {
678       /* Queue RX event on this fifo. Eventually these will need to be flushed
679        * by calling stream_server_flush_enqueue_events () */
680       session_worker_t *wrk;
681
682       wrk = session_main_get_worker (s->thread_index);
683       if (!(s->flags & SESSION_F_RX_EVT))
684         {
685           s->flags |= SESSION_F_RX_EVT;
686           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
687         }
688
689       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
690     }
691   return rv > 0 ? rv : 0;
692 }
693
694 int
695 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
696                             u32 offset, u32 max_bytes)
697 {
698   session_t *s = session_get (tc->s_index, tc->thread_index);
699   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
700 }
701
702 u32
703 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
704 {
705   session_t *s = session_get (tc->s_index, tc->thread_index);
706   u32 rv;
707
708   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
709   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
710
711   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
712     session_dequeue_notify (s);
713
714   return rv;
715 }
716
717 static inline int
718 session_notify_subscribers (u32 app_index, session_t * s,
719                             svm_fifo_t * f, session_evt_type_t evt_type)
720 {
721   app_worker_t *app_wrk;
722   application_t *app;
723   int i;
724
725   app = application_get (app_index);
726   if (!app)
727     return -1;
728
729   for (i = 0; i < f->shr->n_subscribers; i++)
730     {
731       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
732       if (!app_wrk)
733         continue;
734       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
735         return -1;
736     }
737
738   return 0;
739 }
740
741 /**
742  * Notify session peer that new data has been enqueued.
743  *
744  * @param s     Stream session for which the event is to be generated.
745  * @param lock  Flag to indicate if call should lock message queue.
746  *
747  * @return 0 on success or negative number if failed to send notification.
748  */
749 static inline int
750 session_enqueue_notify_inline (session_t * s)
751 {
752   app_worker_t *app_wrk;
753   u32 session_index;
754   u8 n_subscribers;
755   u32 thread_index;
756
757   session_index = s->session_index;
758   thread_index = s->thread_index;
759   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
760
761   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
762   if (PREDICT_FALSE (!app_wrk))
763     {
764       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
765       return 0;
766     }
767
768   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
769
770   s->flags &= ~SESSION_F_RX_EVT;
771
772   /* Application didn't confirm accept yet */
773   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
774     return 0;
775
776   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
777                                                      SESSION_IO_EVT_RX)))
778     return -1;
779
780   if (PREDICT_FALSE (n_subscribers))
781     {
782       s = session_get (session_index, thread_index);
783       return session_notify_subscribers (app_wrk->app_index, s,
784                                          s->rx_fifo, SESSION_IO_EVT_RX);
785     }
786
787   return 0;
788 }
789
790 int
791 session_enqueue_notify (session_t * s)
792 {
793   return session_enqueue_notify_inline (s);
794 }
795
796 static void
797 session_enqueue_notify_rpc (void *arg)
798 {
799   u32 session_index = pointer_to_uword (arg);
800   session_t *s;
801
802   s = session_get_if_valid (session_index, vlib_get_thread_index ());
803   if (!s)
804     return;
805
806   session_enqueue_notify (s);
807 }
808
809 /**
810  * Like session_enqueue_notify, but can be called from a thread that does not
811  * own the session.
812  */
813 void
814 session_enqueue_notify_thread (session_handle_t sh)
815 {
816   u32 thread_index = session_thread_from_handle (sh);
817   u32 session_index = session_index_from_handle (sh);
818
819   /*
820    * Pass session index (u32) as opposed to handle (u64) in case pointers
821    * are not 64-bit.
822    */
823   session_send_rpc_evt_to_thread (thread_index,
824                                   session_enqueue_notify_rpc,
825                                   uword_to_pointer (session_index, void *));
826 }
827
828 int
829 session_dequeue_notify (session_t * s)
830 {
831   app_worker_t *app_wrk;
832
833   svm_fifo_clear_deq_ntf (s->tx_fifo);
834
835   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
836   if (PREDICT_FALSE (!app_wrk))
837     return -1;
838
839   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
840                                                      SESSION_IO_EVT_TX)))
841     return -1;
842
843   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
844     return session_notify_subscribers (app_wrk->app_index, s,
845                                        s->tx_fifo, SESSION_IO_EVT_TX);
846
847   return 0;
848 }
849
850 /**
851  * Flushes queue of sessions that are to be notified of new data
852  * enqueued events.
853  *
854  * @param thread_index Thread index for which the flush is to be performed.
855  * @return 0 on success or a positive number indicating the number of
856  *         failures due to API queue being full.
857  */
858 int
859 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
860 {
861   session_worker_t *wrk = session_main_get_worker (thread_index);
862   session_t *s;
863   int i, errors = 0;
864   u32 *indices;
865
866   indices = wrk->session_to_enqueue[transport_proto];
867
868   for (i = 0; i < vec_len (indices); i++)
869     {
870       s = session_get_if_valid (indices[i], thread_index);
871       if (PREDICT_FALSE (!s))
872         {
873           errors++;
874           continue;
875         }
876
877       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
878                            0 /* TODO/not needed */ );
879
880       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
881         errors++;
882     }
883
884   vec_reset_length (indices);
885   wrk->session_to_enqueue[transport_proto] = indices;
886
887   return errors;
888 }
889
890 int
891 session_main_flush_all_enqueue_events (u8 transport_proto)
892 {
893   vlib_thread_main_t *vtm = vlib_get_thread_main ();
894   int i, errors = 0;
895   for (i = 0; i < 1 + vtm->n_threads; i++)
896     errors += session_main_flush_enqueue_events (transport_proto, i);
897   return errors;
898 }
899
900 int
901 session_stream_connect_notify (transport_connection_t * tc,
902                                session_error_t err)
903 {
904   u32 opaque = 0, new_ti, new_si;
905   app_worker_t *app_wrk;
906   session_t *s = 0, *ho;
907
908   /*
909    * Cleanup half-open table
910    */
911   session_lookup_del_half_open (tc);
912
913   ho = ho_session_get (tc->s_index);
914   session_set_state (ho, SESSION_STATE_TRANSPORT_CLOSING);
915   opaque = ho->opaque;
916   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
917   if (!app_wrk)
918     return -1;
919
920   if (err)
921     return app_worker_connect_notify (app_wrk, s, err, opaque);
922
923   s = session_alloc_for_connection (tc);
924   session_set_state (s, SESSION_STATE_CONNECTING);
925   s->app_wrk_index = app_wrk->wrk_index;
926   new_si = s->session_index;
927   new_ti = s->thread_index;
928
929   if ((err = app_worker_init_connected (app_wrk, s)))
930     {
931       session_free (s);
932       app_worker_connect_notify (app_wrk, 0, err, opaque);
933       return -1;
934     }
935
936   s = session_get (new_si, new_ti);
937   session_set_state (s, SESSION_STATE_READY);
938   session_lookup_add_connection (tc, session_handle (s));
939
940   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
941     {
942       session_lookup_del_connection (tc);
943       /* Avoid notifying app about rejected session cleanup */
944       s = session_get (new_si, new_ti);
945       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
946       session_free (s);
947       return -1;
948     }
949
950   return 0;
951 }
952
953 typedef union session_switch_pool_reply_args_
954 {
955   struct
956   {
957     u32 session_index;
958     u16 thread_index;
959     u8 is_closed;
960   };
961   u64 as_u64;
962 } session_switch_pool_reply_args_t;
963
964 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
965                "switch pool reply args size");
966
967 static void
968 session_switch_pool_reply (void *arg)
969 {
970   session_switch_pool_reply_args_t rargs;
971   session_t *s;
972
973   rargs.as_u64 = pointer_to_uword (arg);
974   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
975   if (!s)
976     return;
977
978   /* Session closed during migration. Clean everything up */
979   if (rargs.is_closed)
980     {
981       transport_cleanup (session_get_transport_proto (s), s->connection_index,
982                          s->thread_index);
983       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
984       session_free (s);
985       return;
986     }
987
988   /* Notify app that it has data on the new session */
989   session_enqueue_notify (s);
990 }
991
992 typedef struct _session_switch_pool_args
993 {
994   u32 session_index;
995   u32 thread_index;
996   u32 new_thread_index;
997   u32 new_session_index;
998 } session_switch_pool_args_t;
999
1000 /**
1001  * Notify old thread of the session pool switch
1002  */
1003 static void
1004 session_switch_pool (void *cb_args)
1005 {
1006   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
1007   session_switch_pool_reply_args_t rargs;
1008   session_handle_t new_sh;
1009   segment_manager_t *sm;
1010   app_worker_t *app_wrk;
1011   session_t *s;
1012
1013   ASSERT (args->thread_index == vlib_get_thread_index ());
1014   s = session_get (args->session_index, args->thread_index);
1015
1016   /* Check if session closed during migration */
1017   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
1018
1019   transport_cleanup (session_get_transport_proto (s), s->connection_index,
1020                      s->thread_index);
1021
1022   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1023   if (app_wrk)
1024     {
1025       /* Cleanup fifo segment slice state for fifos */
1026       sm = app_worker_get_connect_segment_manager (app_wrk);
1027       segment_manager_detach_fifo (sm, &s->rx_fifo);
1028       segment_manager_detach_fifo (sm, &s->tx_fifo);
1029
1030       /* Notify app, using old session, about the migration event */
1031       if (!rargs.is_closed)
1032         {
1033           new_sh = session_make_handle (args->new_session_index,
1034                                         args->new_thread_index);
1035           app_worker_migrate_notify (app_wrk, s, new_sh);
1036         }
1037     }
1038
1039   /* Trigger app read and fifo updates on the new thread */
1040   rargs.session_index = args->new_session_index;
1041   rargs.thread_index = args->new_thread_index;
1042   session_send_rpc_evt_to_thread (args->new_thread_index,
1043                                   session_switch_pool_reply,
1044                                   uword_to_pointer (rargs.as_u64, void *));
1045
1046   session_free (s);
1047   clib_mem_free (cb_args);
1048 }
1049
1050 /**
1051  * Move dgram session to the right thread
1052  */
1053 int
1054 session_dgram_connect_notify (transport_connection_t * tc,
1055                               u32 old_thread_index, session_t ** new_session)
1056 {
1057   session_t *new_s;
1058   session_switch_pool_args_t *rpc_args;
1059   segment_manager_t *sm;
1060   app_worker_t *app_wrk;
1061
1062   /*
1063    * Clone half-open session to the right thread.
1064    */
1065   new_s = session_clone_safe (tc->s_index, old_thread_index);
1066   new_s->connection_index = tc->c_index;
1067   session_set_state (new_s, SESSION_STATE_READY);
1068   new_s->flags |= SESSION_F_IS_MIGRATING;
1069
1070   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1071     session_lookup_add_connection (tc, session_handle (new_s));
1072
1073   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1074   if (app_wrk)
1075     {
1076       /* New set of fifos attached to the same shared memory */
1077       sm = app_worker_get_connect_segment_manager (app_wrk);
1078       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1079       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1080     }
1081
1082   /*
1083    * Ask thread owning the old session to clean it up and make us the tx
1084    * fifo owner
1085    */
1086   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1087   rpc_args->new_session_index = new_s->session_index;
1088   rpc_args->new_thread_index = new_s->thread_index;
1089   rpc_args->session_index = tc->s_index;
1090   rpc_args->thread_index = old_thread_index;
1091   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1092                                   rpc_args);
1093
1094   tc->s_index = new_s->session_index;
1095   new_s->connection_index = tc->c_index;
1096   *new_session = new_s;
1097   return 0;
1098 }
1099
1100 /**
1101  * Notification from transport that connection is being closed.
1102  *
1103  * A disconnect is sent to application but state is not removed. Once
1104  * disconnect is acknowledged by application, session disconnect is called.
1105  * Ultimately this leads to close being called on transport (passive close).
1106  */
1107 void
1108 session_transport_closing_notify (transport_connection_t * tc)
1109 {
1110   app_worker_t *app_wrk;
1111   session_t *s;
1112
1113   s = session_get (tc->s_index, tc->thread_index);
1114   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1115     return;
1116
1117   /* Wait for reply from app before sending notification as the
1118    * accept might be rejected */
1119   if (s->session_state == SESSION_STATE_ACCEPTING)
1120     {
1121       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1122       return;
1123     }
1124
1125   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1126   app_wrk = app_worker_get (s->app_wrk_index);
1127   app_worker_close_notify (app_wrk, s);
1128 }
1129
1130 /**
1131  * Notification from transport that connection is being deleted
1132  *
1133  * This removes the session if it is still valid. It should be called only on
1134  * previously fully established sessions. For instance failed connects should
1135  * call stream_session_connect_notify and indicate that the connect has
1136  * failed.
1137  */
1138 void
1139 session_transport_delete_notify (transport_connection_t * tc)
1140 {
1141   session_t *s;
1142
1143   /* App might've been removed already */
1144   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1145     return;
1146
1147   switch (s->session_state)
1148     {
1149     case SESSION_STATE_CREATED:
1150       /* Session was created but accept notification was not yet sent to the
1151        * app. Cleanup everything. */
1152       session_lookup_del_session (s);
1153       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1154       session_free (s);
1155       break;
1156     case SESSION_STATE_ACCEPTING:
1157     case SESSION_STATE_TRANSPORT_CLOSING:
1158     case SESSION_STATE_CLOSING:
1159     case SESSION_STATE_TRANSPORT_CLOSED:
1160       /* If transport finishes or times out before we get a reply
1161        * from the app, mark transport as closed and wait for reply
1162        * before removing the session. Cleanup session table in advance
1163        * because transport will soon be closed and closed sessions
1164        * are assumed to have been removed from the lookup table */
1165       session_lookup_del_session (s);
1166       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1167       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1168       svm_fifo_dequeue_drop_all (s->tx_fifo);
1169       break;
1170     case SESSION_STATE_APP_CLOSED:
1171       /* Cleanup lookup table as transport needs to still be valid.
1172        * Program transport close to ensure that all session events
1173        * have been cleaned up. Once transport close is called, the
1174        * session is just removed because both transport and app have
1175        * confirmed the close*/
1176       session_lookup_del_session (s);
1177       session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
1178       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1179       svm_fifo_dequeue_drop_all (s->tx_fifo);
1180       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1181       break;
1182     case SESSION_STATE_TRANSPORT_DELETED:
1183       break;
1184     case SESSION_STATE_CLOSED:
1185       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1186       session_delete (s);
1187       break;
1188     default:
1189       clib_warning ("session state %u", s->session_state);
1190       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1191       session_delete (s);
1192       break;
1193     }
1194 }
1195
1196 /**
1197  * Notification from transport that it is closed
1198  *
1199  * Should be called by transport, prior to calling delete notify, once it
1200  * knows that no more data will be exchanged. This could serve as an
1201  * early acknowledgment of an active close especially if transport delete
1202  * can be delayed a long time, e.g., tcp time-wait.
1203  */
1204 void
1205 session_transport_closed_notify (transport_connection_t * tc)
1206 {
1207   app_worker_t *app_wrk;
1208   session_t *s;
1209
1210   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1211     return;
1212
1213   /* Transport thinks that app requested close but it actually didn't.
1214    * Can happen for tcp:
1215    * 1)if fin and rst are received in close succession.
1216    * 2)if app shutdown the connection.  */
1217   if (s->session_state == SESSION_STATE_READY)
1218     {
1219       session_transport_closing_notify (tc);
1220       svm_fifo_dequeue_drop_all (s->tx_fifo);
1221       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1222     }
1223   /* If app close has not been received or has not yet resulted in
1224    * a transport close, only mark the session transport as closed */
1225   else if (s->session_state <= SESSION_STATE_CLOSING)
1226     session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED);
1227   /* If app also closed, switch to closed */
1228   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1229     session_set_state (s, SESSION_STATE_CLOSED);
1230
1231   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1232   if (app_wrk)
1233     app_worker_transport_closed_notify (app_wrk, s);
1234 }
1235
1236 /**
1237  * Notify application that connection has been reset.
1238  */
1239 void
1240 session_transport_reset_notify (transport_connection_t * tc)
1241 {
1242   app_worker_t *app_wrk;
1243   session_t *s;
1244
1245   s = session_get (tc->s_index, tc->thread_index);
1246   svm_fifo_dequeue_drop_all (s->tx_fifo);
1247   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1248     return;
1249   if (s->session_state == SESSION_STATE_ACCEPTING)
1250     {
1251       session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1252       return;
1253     }
1254   session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING);
1255   app_wrk = app_worker_get (s->app_wrk_index);
1256   app_worker_reset_notify (app_wrk, s);
1257 }
1258
1259 int
1260 session_stream_accept_notify (transport_connection_t * tc)
1261 {
1262   app_worker_t *app_wrk;
1263   session_t *s;
1264
1265   s = session_get (tc->s_index, tc->thread_index);
1266   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1267   if (!app_wrk)
1268     return -1;
1269   if (s->session_state != SESSION_STATE_CREATED)
1270     return 0;
1271   session_set_state (s, SESSION_STATE_ACCEPTING);
1272   if (app_worker_accept_notify (app_wrk, s))
1273     {
1274       /* On transport delete, no notifications should be sent. Unless, the
1275        * accept is retried and successful. */
1276       session_set_state (s, SESSION_STATE_CREATED);
1277       return -1;
1278     }
1279   return 0;
1280 }
1281
1282 /**
1283  * Accept a stream session. Optionally ping the server by callback.
1284  */
1285 int
1286 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1287                        u32 thread_index, u8 notify)
1288 {
1289   session_t *s;
1290   int rv;
1291
1292   s = session_alloc_for_connection (tc);
1293   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1294   session_set_state (s, SESSION_STATE_CREATED);
1295
1296   if ((rv = app_worker_init_accepted (s)))
1297     {
1298       session_free (s);
1299       return rv;
1300     }
1301
1302   session_lookup_add_connection (tc, session_handle (s));
1303
1304   /* Shoulder-tap the server */
1305   if (notify)
1306     {
1307       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1308       if ((rv = app_worker_accept_notify (app_wrk, s)))
1309         {
1310           session_lookup_del_session (s);
1311           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1312           session_free (s);
1313           return rv;
1314         }
1315     }
1316
1317   return 0;
1318 }
1319
1320 int
1321 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1322                       u32 thread_index)
1323 {
1324   app_worker_t *app_wrk;
1325   session_t *s;
1326   int rv;
1327
1328   s = session_alloc_for_connection (tc);
1329   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1330
1331   if ((rv = app_worker_init_accepted (s)))
1332     {
1333       session_free (s);
1334       return rv;
1335     }
1336
1337   session_lookup_add_connection (tc, session_handle (s));
1338   session_set_state (s, SESSION_STATE_ACCEPTING);
1339
1340   app_wrk = app_worker_get (s->app_wrk_index);
1341   if ((rv = app_worker_accept_notify (app_wrk, s)))
1342     {
1343       session_lookup_del_session (s);
1344       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1345       session_free (s);
1346       return rv;
1347     }
1348
1349   return 0;
1350 }
1351
1352 int
1353 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1354 {
1355   transport_connection_t *tc;
1356   transport_endpoint_cfg_t *tep;
1357   app_worker_t *app_wrk;
1358   session_handle_t sh;
1359   session_t *s;
1360   int rv;
1361
1362   tep = session_endpoint_to_transport_cfg (rmt);
1363   rv = transport_connect (rmt->transport_proto, tep);
1364   if (rv < 0)
1365     {
1366       SESSION_DBG ("Transport failed to open connection.");
1367       return rv;
1368     }
1369
1370   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1371
1372   /* For dgram type of service, allocate session and fifos now */
1373   app_wrk = app_worker_get (rmt->app_wrk_index);
1374   s = session_alloc_for_connection (tc);
1375   s->app_wrk_index = app_wrk->wrk_index;
1376   session_set_state (s, SESSION_STATE_OPENED);
1377   if (app_worker_init_connected (app_wrk, s))
1378     {
1379       session_free (s);
1380       return -1;
1381     }
1382
1383   sh = session_handle (s);
1384   *rsh = sh;
1385
1386   session_lookup_add_connection (tc, sh);
1387   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1388 }
1389
1390 int
1391 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1392 {
1393   transport_connection_t *tc;
1394   transport_endpoint_cfg_t *tep;
1395   app_worker_t *app_wrk;
1396   session_t *ho;
1397   int rv;
1398
1399   tep = session_endpoint_to_transport_cfg (rmt);
1400   rv = transport_connect (rmt->transport_proto, tep);
1401   if (rv < 0)
1402     {
1403       SESSION_DBG ("Transport failed to open connection.");
1404       return rv;
1405     }
1406
1407   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1408
1409   app_wrk = app_worker_get (rmt->app_wrk_index);
1410
1411   /* If transport offers a vc service, only allocate established
1412    * session once the connection has been established.
1413    * In the meantime allocate half-open session for tracking purposes
1414    * associate half-open connection to it and add session to app-worker
1415    * half-open table. These are needed to allocate the established
1416    * session on transport notification, and to cleanup the half-open
1417    * session if the app detaches before connection establishment.
1418    */
1419   ho = session_alloc_for_half_open (tc);
1420   ho->app_wrk_index = app_wrk->wrk_index;
1421   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1422   ho->opaque = rmt->opaque;
1423   *rsh = session_handle (ho);
1424
1425   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1426     session_lookup_add_half_open (tc, tc->c_index);
1427
1428   return 0;
1429 }
1430
1431 int
1432 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1433 {
1434   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1435
1436   /* Not supported for now */
1437   *rsh = SESSION_INVALID_HANDLE;
1438   return transport_connect (rmt->transport_proto, tep_cfg);
1439 }
1440
1441 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1442                                         session_handle_t *);
1443
1444 /* *INDENT-OFF* */
1445 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1446   session_open_vc,
1447   session_open_cl,
1448   session_open_app,
1449 };
1450 /* *INDENT-ON* */
1451
1452 /**
1453  * Ask transport to open connection to remote transport endpoint.
1454  *
1455  * Stores handle for matching request with reply since the call can be
1456  * asynchronous. For instance, for TCP the 3-way handshake must complete
1457  * before reply comes. Session is only created once connection is established.
1458  *
1459  * @param app_index Index of the application requesting the connect
1460  * @param st Session type requested.
1461  * @param tep Remote transport endpoint
1462  * @param opaque Opaque data (typically, api_context) the application expects
1463  *               on open completion.
1464  */
1465 int
1466 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1467 {
1468   transport_service_type_t tst;
1469   tst = transport_protocol_service_type (rmt->transport_proto);
1470   return session_open_srv_fns[tst](rmt, rsh);
1471 }
1472
1473 /**
1474  * Ask transport to listen on session endpoint.
1475  *
1476  * @param s Session for which listen will be called. Note that unlike
1477  *          established sessions, listen sessions are not associated to a
1478  *          thread.
1479  * @param sep Local endpoint to be listened on.
1480  */
1481 int
1482 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1483 {
1484   transport_endpoint_cfg_t *tep;
1485   int tc_index;
1486   u32 s_index;
1487
1488   /* Transport bind/listen */
1489   tep = session_endpoint_to_transport_cfg (sep);
1490   s_index = ls->session_index;
1491   tc_index = transport_start_listen (session_get_transport_proto (ls),
1492                                      s_index, tep);
1493
1494   if (tc_index < 0)
1495     return tc_index;
1496
1497   /* Attach transport to session. Lookup tables are populated by the app
1498    * worker because local tables (for ct sessions) are not backed by a fib */
1499   ls = listen_session_get (s_index);
1500   ls->connection_index = tc_index;
1501   ls->opaque = sep->opaque;
1502
1503   return 0;
1504 }
1505
1506 /**
1507  * Ask transport to stop listening on local transport endpoint.
1508  *
1509  * @param s Session to stop listening on. It must be in state LISTENING.
1510  */
1511 int
1512 session_stop_listen (session_t * s)
1513 {
1514   transport_proto_t tp = session_get_transport_proto (s);
1515   transport_connection_t *tc;
1516
1517   if (s->session_state != SESSION_STATE_LISTENING)
1518     return SESSION_E_NOLISTEN;
1519
1520   tc = transport_get_listener (tp, s->connection_index);
1521
1522   /* If no transport, assume everything was cleaned up already */
1523   if (!tc)
1524     return SESSION_E_NONE;
1525
1526   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1527     {
1528       if (transport_connection_is_cless (tc))
1529         {
1530           clib_memset (&tc->rmt_ip, 0, sizeof (tc->rmt_ip));
1531           tc->rmt_port = 0;
1532         }
1533       session_lookup_del_connection (tc);
1534     }
1535
1536   transport_stop_listen (tp, s->connection_index);
1537   return 0;
1538 }
1539
1540 /**
1541  * Initialize session half-closing procedure.
1542  *
1543  * Note that half-closing will not change the state of the session.
1544  */
1545 void
1546 session_half_close (session_t *s)
1547 {
1548   if (!s)
1549     return;
1550
1551   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1552 }
1553
1554 /**
1555  * Initialize session closing procedure.
1556  *
1557  * Request is always sent to session node to ensure that all outstanding
1558  * requests are served before transport is notified.
1559  */
1560 void
1561 session_close (session_t * s)
1562 {
1563   if (!s || (s->flags & SESSION_F_APP_CLOSED))
1564     return;
1565
1566   /* Transports can close and delete their state independent of app closes
1567    * and transport initiated state transitions can hide app closes. Instead
1568    * of extending the state machine to support separate tracking of app and
1569    * transport initiated closes, use a flag. */
1570   s->flags |= SESSION_F_APP_CLOSED;
1571
1572   if (s->session_state >= SESSION_STATE_CLOSING)
1573     {
1574       /* Session will only be removed once both app and transport
1575        * acknowledge the close */
1576       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1577           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1578         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1579       return;
1580     }
1581
1582   /* App closed so stop propagating dequeue notifications.
1583    * App might disconnect session before connected, in this case,
1584    * tx_fifo may not be setup yet, so clear only it's inited. */
1585   if (s->tx_fifo)
1586     svm_fifo_clear_deq_ntf (s->tx_fifo);
1587   session_set_state (s, SESSION_STATE_CLOSING);
1588   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1589 }
1590
1591 /**
1592  * Force a close without waiting for data to be flushed
1593  */
1594 void
1595 session_reset (session_t * s)
1596 {
1597   if (s->session_state >= SESSION_STATE_CLOSING)
1598     return;
1599   /* Drop all outstanding tx data
1600    * App might disconnect session before connected, in this case,
1601    * tx_fifo may not be setup yet, so clear only it's inited. */
1602   if (s->tx_fifo)
1603     svm_fifo_dequeue_drop_all (s->tx_fifo);
1604   session_set_state (s, SESSION_STATE_CLOSING);
1605   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1606 }
1607
1608 /**
1609  * Notify transport the session can be half-disconnected.
1610  *
1611  * Must be called from the session's thread.
1612  */
1613 void
1614 session_transport_half_close (session_t *s)
1615 {
1616   /* Only READY session can be half-closed */
1617   if (s->session_state != SESSION_STATE_READY)
1618     {
1619       return;
1620     }
1621
1622   transport_half_close (session_get_transport_proto (s), s->connection_index,
1623                         s->thread_index);
1624 }
1625
1626 /**
1627  * Notify transport the session can be disconnected. This should eventually
1628  * result in a delete notification that allows us to cleanup session state.
1629  * Called for both active/passive disconnects.
1630  *
1631  * Must be called from the session's thread.
1632  */
1633 void
1634 session_transport_close (session_t * s)
1635 {
1636   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1637     {
1638       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1639         session_set_state (s, SESSION_STATE_CLOSED);
1640       /* If transport is already deleted, just free the session */
1641       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1642         session_free_w_fifos (s);
1643       return;
1644     }
1645
1646   /* If the tx queue wasn't drained, the transport can continue to try
1647    * sending the outstanding data (in closed state it cannot). It MUST however
1648    * at one point, either after sending everything or after a timeout, call
1649    * delete notify. This will finally lead to the complete cleanup of the
1650    * session.
1651    */
1652   session_set_state (s, SESSION_STATE_APP_CLOSED);
1653
1654   transport_close (session_get_transport_proto (s), s->connection_index,
1655                    s->thread_index);
1656 }
1657
1658 /**
1659  * Force transport close
1660  */
1661 void
1662 session_transport_reset (session_t * s)
1663 {
1664   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1665     {
1666       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1667         session_set_state (s, SESSION_STATE_CLOSED);
1668       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1669         session_free_w_fifos (s);
1670       return;
1671     }
1672
1673   session_set_state (s, SESSION_STATE_APP_CLOSED);
1674   transport_reset (session_get_transport_proto (s), s->connection_index,
1675                    s->thread_index);
1676 }
1677
1678 /**
1679  * Cleanup transport and session state.
1680  *
1681  * Notify transport of the cleanup and free the session. This should
1682  * be called only if transport reported some error and is already
1683  * closed.
1684  */
1685 void
1686 session_transport_cleanup (session_t * s)
1687 {
1688   /* Delete from main lookup table before we axe the the transport */
1689   session_lookup_del_session (s);
1690   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1691     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1692                        s->thread_index);
1693   /* Since we called cleanup, no delete notification will come. So, make
1694    * sure the session is properly freed. */
1695   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1696   session_free (s);
1697 }
1698
1699 /**
1700  * Allocate worker mqs in share-able segment
1701  *
1702  * That can only be a newly created memfd segment, that must be mapped
1703  * by all apps/stack users unless private rx mqs are enabled.
1704  */
1705 void
1706 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1707 {
1708   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1709   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1710   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1711   uword mqs_seg_size;
1712   int i;
1713
1714   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1715
1716   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1717     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1718   };
1719   cfg->consumer_pid = 0;
1720   cfg->n_rings = 2;
1721   cfg->q_nitems = mq_q_length;
1722   cfg->ring_cfgs = rc;
1723
1724   /*
1725    * Compute mqs segment size based on rings config and leave space
1726    * for passing extended configuration messages, i.e., data allocated
1727    * outside of the rings. If provided with a config value, accept it
1728    * if larger than minimum size.
1729    */
1730   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1731   mqs_seg_size = mqs_seg_size + (1 << 20);
1732   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1733
1734   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1735   mqs_seg->ssvm.my_pid = getpid ();
1736   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1737
1738   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1739     {
1740       clib_warning ("failed to initialize queue segment");
1741       return;
1742     }
1743
1744   fifo_segment_init (mqs_seg);
1745
1746   /* Special fifo segment that's filled only with mqs */
1747   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1748
1749   for (i = 0; i < vec_len (smm->wrk); i++)
1750     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1751 }
1752
1753 fifo_segment_t *
1754 session_main_get_wrk_mqs_segment (void)
1755 {
1756   return &session_main.wrk_mqs_segment;
1757 }
1758
1759 u64
1760 session_segment_handle (session_t * s)
1761 {
1762   svm_fifo_t *f;
1763
1764   if (!s->rx_fifo)
1765     return SESSION_INVALID_HANDLE;
1766
1767   f = s->rx_fifo;
1768   return segment_manager_make_segment_handle (f->segment_manager,
1769                                               f->segment_index);
1770 }
1771
1772 /* *INDENT-OFF* */
1773 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1774     session_tx_fifo_peek_and_snd,
1775     session_tx_fifo_dequeue_and_snd,
1776     session_tx_fifo_dequeue_internal,
1777     session_tx_fifo_dequeue_and_snd
1778 };
1779 /* *INDENT-ON* */
1780
1781 void
1782 session_register_transport (transport_proto_t transport_proto,
1783                             const transport_proto_vft_t * vft, u8 is_ip4,
1784                             u32 output_node)
1785 {
1786   session_main_t *smm = &session_main;
1787   session_type_t session_type;
1788   u32 next_index = ~0;
1789
1790   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1791
1792   vec_validate (smm->session_type_to_next, session_type);
1793   vec_validate (smm->session_tx_fns, session_type);
1794
1795   if (output_node != ~0)
1796     next_index = vlib_node_add_next (vlib_get_main (),
1797                                      session_queue_node.index, output_node);
1798
1799   smm->session_type_to_next[session_type] = next_index;
1800   smm->session_tx_fns[session_type] =
1801     session_tx_fns[vft->transport_options.tx_type];
1802 }
1803
1804 void
1805 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1806 {
1807   session_main_t *smm = &session_main;
1808   session_update_time_fn *fi;
1809   u32 fi_pos = ~0;
1810   u8 found = 0;
1811
1812   vec_foreach (fi, smm->update_time_fns)
1813     {
1814       if (*fi == fn)
1815         {
1816           fi_pos = fi - smm->update_time_fns;
1817           found = 1;
1818           break;
1819         }
1820     }
1821
1822   if (is_add)
1823     {
1824       if (found)
1825         {
1826           clib_warning ("update time fn %p already registered", fn);
1827           return;
1828         }
1829       vec_add1 (smm->update_time_fns, fn);
1830     }
1831   else
1832     {
1833       vec_del1 (smm->update_time_fns, fi_pos);
1834     }
1835 }
1836
1837 transport_proto_t
1838 session_add_transport_proto (void)
1839 {
1840   session_main_t *smm = &session_main;
1841   session_worker_t *wrk;
1842   u32 thread;
1843
1844   smm->last_transport_proto_type += 1;
1845
1846   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1847     {
1848       wrk = session_main_get_worker (thread);
1849       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1850     }
1851
1852   return smm->last_transport_proto_type;
1853 }
1854
1855 transport_connection_t *
1856 session_get_transport (session_t * s)
1857 {
1858   if (s->session_state != SESSION_STATE_LISTENING)
1859     return transport_get_connection (session_get_transport_proto (s),
1860                                      s->connection_index, s->thread_index);
1861   else
1862     return transport_get_listener (session_get_transport_proto (s),
1863                                    s->connection_index);
1864 }
1865
1866 void
1867 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1868 {
1869   if (s->session_state != SESSION_STATE_LISTENING)
1870     return transport_get_endpoint (session_get_transport_proto (s),
1871                                    s->connection_index, s->thread_index, tep,
1872                                    is_lcl);
1873   else
1874     return transport_get_listener_endpoint (session_get_transport_proto (s),
1875                                             s->connection_index, tep, is_lcl);
1876 }
1877
1878 int
1879 session_transport_attribute (session_t *s, u8 is_get,
1880                              transport_endpt_attr_t *attr)
1881 {
1882   if (s->session_state < SESSION_STATE_READY)
1883     return -1;
1884
1885   return transport_connection_attribute (session_get_transport_proto (s),
1886                                          s->connection_index, s->thread_index,
1887                                          is_get, attr);
1888 }
1889
1890 transport_connection_t *
1891 listen_session_get_transport (session_t * s)
1892 {
1893   return transport_get_listener (session_get_transport_proto (s),
1894                                  s->connection_index);
1895 }
1896
1897 void
1898 session_queue_run_on_main_thread (vlib_main_t * vm)
1899 {
1900   ASSERT (vlib_get_thread_index () == 0);
1901   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1902 }
1903
1904 static void
1905 session_stats_collector_fn (vlib_stats_collector_data_t *d)
1906 {
1907   u32 i, n_workers, n_wrk_sessions, n_sessions = 0;
1908   session_main_t *smm = &session_main;
1909   session_worker_t *wrk;
1910   counter_t **counters;
1911   counter_t *cb;
1912
1913   n_workers = vec_len (smm->wrk);
1914   vlib_stats_validate (d->entry_index, 0, n_workers - 1);
1915   counters = d->entry->data;
1916   cb = counters[0];
1917
1918   for (i = 0; i < vec_len (smm->wrk); i++)
1919     {
1920       wrk = session_main_get_worker (i);
1921       n_wrk_sessions = pool_elts (wrk->sessions);
1922       cb[i] = n_wrk_sessions;
1923       n_sessions += n_wrk_sessions;
1924     }
1925
1926   vlib_stats_set_gauge (d->private_data, n_sessions);
1927 }
1928
1929 static void
1930 session_stats_collector_init (void)
1931 {
1932   vlib_stats_collector_reg_t reg = {};
1933
1934   reg.entry_index =
1935     vlib_stats_add_counter_vector ("/sys/session/sessions_per_worker");
1936   reg.private_data = vlib_stats_add_gauge ("/sys/session/sessions_total");
1937   reg.collect_fn = session_stats_collector_fn;
1938   vlib_stats_register_collector_fn (&reg);
1939   vlib_stats_validate (reg.entry_index, 0, vlib_get_n_threads ());
1940 }
1941
1942 static clib_error_t *
1943 session_manager_main_enable (vlib_main_t * vm)
1944 {
1945   session_main_t *smm = &session_main;
1946   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1947   u32 num_threads, preallocated_sessions_per_worker;
1948   session_worker_t *wrk;
1949   int i;
1950
1951   /* We only initialize once and do not de-initialized on disable */
1952   if (smm->is_initialized)
1953     goto done;
1954
1955   num_threads = 1 /* main thread */  + vtm->n_threads;
1956
1957   if (num_threads < 1)
1958     return clib_error_return (0, "n_thread_stacks not set");
1959
1960   /* Allocate cache line aligned worker contexts */
1961   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1962   clib_spinlock_init (&session_main.pool_realloc_lock);
1963
1964   for (i = 0; i < num_threads; i++)
1965     {
1966       wrk = &smm->wrk[i];
1967       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1968       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1969       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1970       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1971       wrk->evts_pending_main =
1972         clib_llist_make_head (wrk->event_elts, evt_list);
1973       wrk->vm = vlib_get_main_by_index (i);
1974       wrk->last_vlib_time = vlib_time_now (vm);
1975       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1976       wrk->timerfd = -1;
1977       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1978
1979       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1980         session_wrk_enable_adaptive_mode (wrk);
1981     }
1982
1983   /* Allocate vpp event queues segment and queue */
1984   session_vpp_wrk_mqs_alloc (smm);
1985
1986   /* Initialize segment manager properties */
1987   segment_manager_main_init ();
1988
1989   /* Preallocate sessions */
1990   if (smm->preallocated_sessions)
1991     {
1992       if (num_threads == 1)
1993         {
1994           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1995         }
1996       else
1997         {
1998           int j;
1999           preallocated_sessions_per_worker =
2000             (1.1 * (f64) smm->preallocated_sessions /
2001              (f64) (num_threads - 1));
2002
2003           for (j = 1; j < num_threads; j++)
2004             {
2005               pool_init_fixed (smm->wrk[j].sessions,
2006                                preallocated_sessions_per_worker);
2007             }
2008         }
2009     }
2010
2011   session_lookup_init ();
2012   app_namespaces_init ();
2013   transport_init ();
2014   session_stats_collector_init ();
2015   smm->is_initialized = 1;
2016
2017 done:
2018
2019   smm->is_enabled = 1;
2020
2021   /* Enable transports */
2022   transport_enable_disable (vm, 1);
2023   session_debug_init ();
2024
2025   return 0;
2026 }
2027
2028 static void
2029 session_manager_main_disable (vlib_main_t * vm)
2030 {
2031   transport_enable_disable (vm, 0 /* is_en */ );
2032 }
2033
2034 /* in this new callback, cookie hint the index */
2035 void
2036 session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch)
2037 {
2038   session_worker_t *wrk;
2039   wrk = session_main_get_worker (vm->thread_index);
2040   session_dma_transfer *dma_transfer;
2041
2042   dma_transfer = &wrk->dma_trans[wrk->trans_head];
2043   vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers,
2044            vec_len (dma_transfer->pending_tx_buffers));
2045   vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts,
2046            vec_len (dma_transfer->pending_tx_nexts));
2047   vec_reset_length (dma_transfer->pending_tx_buffers);
2048   vec_reset_length (dma_transfer->pending_tx_nexts);
2049   wrk->trans_head++;
2050   if (wrk->trans_head == wrk->trans_size)
2051     wrk->trans_head = 0;
2052   return;
2053 }
2054
2055 static void
2056 session_prepare_dma_args (vlib_dma_config_t *args)
2057 {
2058   args->max_batches = 16;
2059   args->max_transfers = DMA_TRANS_SIZE;
2060   args->max_transfer_size = 65536;
2061   args->features = 0;
2062   args->sw_fallback = 1;
2063   args->barrier_before_last = 1;
2064   args->callback_fn = session_dma_completion_cb;
2065 }
2066
2067 static void
2068 session_node_enable_dma (u8 is_en, int n_vlibs)
2069 {
2070   vlib_dma_config_t args;
2071   session_prepare_dma_args (&args);
2072   session_worker_t *wrk;
2073   vlib_main_t *vm;
2074
2075   int config_index = -1;
2076
2077   if (is_en)
2078     {
2079       vm = vlib_get_main_by_index (0);
2080       config_index = vlib_dma_config_add (vm, &args);
2081     }
2082   else
2083     {
2084       vm = vlib_get_main_by_index (0);
2085       wrk = session_main_get_worker (0);
2086       if (wrk->config_index >= 0)
2087         vlib_dma_config_del (vm, wrk->config_index);
2088     }
2089   int i;
2090   for (i = 0; i < n_vlibs; i++)
2091     {
2092       vm = vlib_get_main_by_index (i);
2093       wrk = session_main_get_worker (vm->thread_index);
2094       wrk->config_index = config_index;
2095       if (is_en)
2096         {
2097           if (config_index >= 0)
2098             wrk->dma_enabled = true;
2099           wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc (
2100             sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2101           bzero (wrk->dma_trans,
2102                  sizeof (session_dma_transfer) * DMA_TRANS_SIZE);
2103         }
2104       else
2105         {
2106           if (wrk->dma_trans)
2107             clib_mem_free (wrk->dma_trans);
2108         }
2109       wrk->trans_head = 0;
2110       wrk->trans_tail = 0;
2111       wrk->trans_size = DMA_TRANS_SIZE;
2112     }
2113 }
2114
2115 void
2116 session_node_enable_disable (u8 is_en)
2117 {
2118   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
2119   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
2120   session_main_t *sm = &session_main;
2121   vlib_main_t *vm;
2122   vlib_node_t *n;
2123   int n_vlibs, i;
2124
2125   n_vlibs = vlib_get_n_threads ();
2126   for (i = 0; i < n_vlibs; i++)
2127     {
2128       vm = vlib_get_main_by_index (i);
2129       /* main thread with workers and not polling */
2130       if (i == 0 && n_vlibs > 1)
2131         {
2132           vlib_node_set_state (vm, session_queue_node.index, mstate);
2133           if (is_en)
2134             {
2135               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
2136               vlib_node_set_state (vm, session_queue_process_node.index,
2137                                    state);
2138               n = vlib_get_node (vm, session_queue_process_node.index);
2139               vlib_start_process (vm, n->runtime_index);
2140             }
2141           else
2142             {
2143               vlib_process_signal_event_mt (vm,
2144                                             session_queue_process_node.index,
2145                                             SESSION_Q_PROCESS_STOP, 0);
2146             }
2147           if (!sm->poll_main)
2148             continue;
2149         }
2150       vlib_node_set_state (vm, session_queue_node.index, state);
2151     }
2152
2153   if (sm->use_private_rx_mqs)
2154     application_enable_rx_mqs_nodes (is_en);
2155
2156   if (sm->dma_enabled)
2157     session_node_enable_dma (is_en, n_vlibs);
2158 }
2159
2160 clib_error_t *
2161 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2162 {
2163   clib_error_t *error = 0;
2164   if (is_en)
2165     {
2166       if (session_main.is_enabled)
2167         return 0;
2168
2169       error = session_manager_main_enable (vm);
2170       session_node_enable_disable (is_en);
2171     }
2172   else
2173     {
2174       session_main.is_enabled = 0;
2175       session_manager_main_disable (vm);
2176       session_node_enable_disable (is_en);
2177     }
2178
2179   return error;
2180 }
2181
2182 clib_error_t *
2183 session_main_init (vlib_main_t * vm)
2184 {
2185   session_main_t *smm = &session_main;
2186
2187   smm->is_enabled = 0;
2188   smm->session_enable_asap = 0;
2189   smm->poll_main = 0;
2190   smm->use_private_rx_mqs = 0;
2191   smm->no_adaptive = 0;
2192   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2193
2194   return 0;
2195 }
2196
2197 static clib_error_t *
2198 session_main_loop_init (vlib_main_t * vm)
2199 {
2200   session_main_t *smm = &session_main;
2201   if (smm->session_enable_asap)
2202     {
2203       vlib_worker_thread_barrier_sync (vm);
2204       vnet_session_enable_disable (vm, 1 /* is_en */ );
2205       vlib_worker_thread_barrier_release (vm);
2206     }
2207   return 0;
2208 }
2209
2210 VLIB_INIT_FUNCTION (session_main_init);
2211 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2212
2213 static clib_error_t *
2214 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2215 {
2216   session_main_t *smm = &session_main;
2217   u32 nitems;
2218   uword tmp;
2219
2220   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2221     {
2222       if (unformat (input, "wrk-mq-length %d", &nitems))
2223         {
2224           if (nitems >= 2048)
2225             smm->configured_wrk_mq_length = nitems;
2226           else
2227             clib_warning ("event queue length %d too small, ignored", nitems);
2228         }
2229       else if (unformat (input, "wrk-mqs-segment-size %U",
2230                          unformat_memory_size, &smm->wrk_mqs_segment_size))
2231         ;
2232       else if (unformat (input, "preallocated-sessions %d",
2233                          &smm->preallocated_sessions))
2234         ;
2235       else if (unformat (input, "v4-session-table-buckets %d",
2236                          &smm->configured_v4_session_table_buckets))
2237         ;
2238       else if (unformat (input, "v4-halfopen-table-buckets %d",
2239                          &smm->configured_v4_halfopen_table_buckets))
2240         ;
2241       else if (unformat (input, "v6-session-table-buckets %d",
2242                          &smm->configured_v6_session_table_buckets))
2243         ;
2244       else if (unformat (input, "v6-halfopen-table-buckets %d",
2245                          &smm->configured_v6_halfopen_table_buckets))
2246         ;
2247       else if (unformat (input, "v4-session-table-memory %U",
2248                          unformat_memory_size, &tmp))
2249         {
2250           if (tmp >= 0x100000000)
2251             return clib_error_return (0, "memory size %llx (%lld) too large",
2252                                       tmp, tmp);
2253           smm->configured_v4_session_table_memory = tmp;
2254         }
2255       else if (unformat (input, "v4-halfopen-table-memory %U",
2256                          unformat_memory_size, &tmp))
2257         {
2258           if (tmp >= 0x100000000)
2259             return clib_error_return (0, "memory size %llx (%lld) too large",
2260                                       tmp, tmp);
2261           smm->configured_v4_halfopen_table_memory = tmp;
2262         }
2263       else if (unformat (input, "v6-session-table-memory %U",
2264                          unformat_memory_size, &tmp))
2265         {
2266           if (tmp >= 0x100000000)
2267             return clib_error_return (0, "memory size %llx (%lld) too large",
2268                                       tmp, tmp);
2269           smm->configured_v6_session_table_memory = tmp;
2270         }
2271       else if (unformat (input, "v6-halfopen-table-memory %U",
2272                          unformat_memory_size, &tmp))
2273         {
2274           if (tmp >= 0x100000000)
2275             return clib_error_return (0, "memory size %llx (%lld) too large",
2276                                       tmp, tmp);
2277           smm->configured_v6_halfopen_table_memory = tmp;
2278         }
2279       else if (unformat (input, "local-endpoints-table-memory %U",
2280                          unformat_memory_size, &tmp))
2281         {
2282           if (tmp >= 0x100000000)
2283             return clib_error_return (0, "memory size %llx (%lld) too large",
2284                                       tmp, tmp);
2285           smm->local_endpoints_table_memory = tmp;
2286         }
2287       else if (unformat (input, "local-endpoints-table-buckets %d",
2288                          &smm->local_endpoints_table_buckets))
2289         ;
2290       else if (unformat (input, "enable"))
2291         smm->session_enable_asap = 1;
2292       else if (unformat (input, "use-app-socket-api"))
2293         (void) appns_sapi_enable_disable (1 /* is_enable */);
2294       else if (unformat (input, "poll-main"))
2295         smm->poll_main = 1;
2296       else if (unformat (input, "use-private-rx-mqs"))
2297         smm->use_private_rx_mqs = 1;
2298       else if (unformat (input, "no-adaptive"))
2299         smm->no_adaptive = 1;
2300       else if (unformat (input, "use-dma"))
2301         smm->dma_enabled = 1;
2302       /*
2303        * Deprecated but maintained for compatibility
2304        */
2305       else if (unformat (input, "evt_qs_memfd_seg"))
2306         ;
2307       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2308         ;
2309       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2310                          &smm->wrk_mqs_segment_size))
2311         ;
2312       else if (unformat (input, "event-queue-length %d", &nitems))
2313         {
2314           if (nitems >= 2048)
2315             smm->configured_wrk_mq_length = nitems;
2316           else
2317             clib_warning ("event queue length %d too small, ignored", nitems);
2318         }
2319       else
2320         return clib_error_return (0, "unknown input `%U'",
2321                                   format_unformat_error, input);
2322     }
2323   return 0;
2324 }
2325
2326 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2327
2328 /*
2329  * fd.io coding-style-patch-verification: ON
2330  *
2331  * Local Variables:
2332  * eval: (c-set-style "gnu")
2333  * End:
2334  */