session: use safe realloc for pools
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24
25 session_main_t session_main;
26
27 static inline int
28 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
29                             session_evt_type_t evt_type)
30 {
31   session_worker_t *wrk = session_main_get_worker (thread_index);
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35
36   mq = wrk->vpp_event_queue;
37   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
38     return -1;
39   if (PREDICT_FALSE (svm_msg_q_is_full (mq)
40                      || svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
41     {
42       svm_msg_q_unlock (mq);
43       return -2;
44     }
45   switch (evt_type)
46     {
47     case SESSION_CTRL_EVT_RPC:
48       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
49       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
50       evt->rpc_args.fp = data;
51       evt->rpc_args.arg = args;
52       break;
53     case SESSION_IO_EVT_RX:
54     case SESSION_IO_EVT_TX:
55     case SESSION_IO_EVT_TX_FLUSH:
56     case SESSION_IO_EVT_BUILTIN_RX:
57       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
58       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59       evt->session_index = *(u32 *) data;
60       break;
61     case SESSION_IO_EVT_BUILTIN_TX:
62     case SESSION_CTRL_EVT_CLOSE:
63     case SESSION_CTRL_EVT_RESET:
64       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
65       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
66       evt->session_handle = session_handle ((session_t *) data);
67       break;
68     default:
69       clib_warning ("evt unhandled!");
70       svm_msg_q_unlock (mq);
71       return -1;
72     }
73   evt->event_type = evt_type;
74
75   svm_msg_q_add_and_unlock (mq, &msg);
76
77   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
78     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
79
80   return 0;
81 }
82
83 int
84 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
85 {
86   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
87                                      f->master_thread_index, evt_type);
88 }
89
90 int
91 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
92                                       session_evt_type_t evt_type)
93 {
94   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
95 }
96
97 int
98 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
99 {
100   /* only events supported are disconnect, shutdown and reset */
101   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
102           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_RESET);
104   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
105 }
106
107 void
108 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
109                                       void *rpc_args)
110 {
111   session_send_evt_to_thread (fp, rpc_args, thread_index,
112                               SESSION_CTRL_EVT_RPC);
113 }
114
115 void
116 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
117 {
118   if (thread_index != vlib_get_thread_index ())
119     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
120   else
121     {
122       void (*fnp) (void *) = fp;
123       fnp (rpc_args);
124     }
125 }
126
127 void
128 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
129 {
130   session_t *s = session_get (tc->s_index, tc->thread_index);
131
132   ASSERT (s->thread_index == vlib_get_thread_index ());
133   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
134
135   if (!(s->flags & SESSION_F_CUSTOM_TX))
136     {
137       s->flags |= SESSION_F_CUSTOM_TX;
138       if (svm_fifo_set_event (s->tx_fifo)
139           || transport_connection_is_descheduled (tc))
140         {
141           session_evt_elt_t *elt;
142           session_worker_t *wrk;
143
144           wrk = session_main_get_worker (tc->thread_index);
145           if (has_prio)
146             elt = session_evt_alloc_new (wrk);
147           else
148             elt = session_evt_alloc_old (wrk);
149           elt->evt.session_index = tc->s_index;
150           elt->evt.event_type = SESSION_IO_EVT_TX;
151           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
152
153           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
154             vlib_node_set_interrupt_pending (wrk->vm,
155                                              session_queue_node.index);
156         }
157     }
158 }
159
160 void
161 sesssion_reschedule_tx (transport_connection_t * tc)
162 {
163   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
164   session_evt_elt_t *elt;
165
166   ASSERT (tc->thread_index == vlib_get_thread_index ());
167
168   elt = session_evt_alloc_new (wrk);
169   elt->evt.session_index = tc->s_index;
170   elt->evt.event_type = SESSION_IO_EVT_TX;
171
172   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
173     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
174 }
175
176 static void
177 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
178 {
179   u32 thread_index = vlib_get_thread_index ();
180   session_evt_elt_t *elt;
181   session_worker_t *wrk;
182
183   /* If we are in the handler thread, or being called with the worker barrier
184    * held, just append a new event to pending disconnects vector. */
185   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
186     {
187       wrk = session_main_get_worker (s->thread_index);
188       elt = session_evt_alloc_ctrl (wrk);
189       clib_memset (&elt->evt, 0, sizeof (session_event_t));
190       elt->evt.session_handle = session_handle (s);
191       elt->evt.event_type = evt;
192
193       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
194         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
195     }
196   else
197     session_send_ctrl_evt_to_thread (s, evt);
198 }
199
200 static void
201 session_pool_realloc_rpc (void *rpc_args)
202 {
203   session_worker_t *wrk;
204   u32 thread_index;
205
206   thread_index = pointer_to_uword (rpc_args);
207   wrk = &session_main.wrk[thread_index];
208
209   pool_realloc_safe_aligned (wrk->sessions, CLIB_CACHE_LINE_BYTES);
210 }
211
212 session_t *
213 session_alloc (u32 thread_index)
214 {
215   session_worker_t *wrk = &session_main.wrk[thread_index];
216   session_t *s;
217
218   pool_get_aligned_safe (wrk->sessions, s, thread_index,
219                          session_pool_realloc_rpc, CLIB_CACHE_LINE_BYTES);
220   clib_memset (s, 0, sizeof (*s));
221   s->session_index = s - wrk->sessions;
222   s->thread_index = thread_index;
223   s->app_index = APP_INVALID_INDEX;
224
225   return s;
226 }
227
228 void
229 session_free (session_t * s)
230 {
231   if (CLIB_DEBUG)
232     {
233       u8 thread_index = s->thread_index;
234       clib_memset (s, 0xFA, sizeof (*s));
235       pool_put (session_main.wrk[thread_index].sessions, s);
236       return;
237     }
238   SESSION_EVT (SESSION_EVT_FREE, s);
239   pool_put (session_main.wrk[s->thread_index].sessions, s);
240 }
241
242 u8
243 session_is_valid (u32 si, u8 thread_index)
244 {
245   session_t *s;
246   transport_connection_t *tc;
247
248   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
249
250   if (s->thread_index != thread_index || s->session_index != si)
251     return 0;
252
253   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
254       || s->session_state <= SESSION_STATE_LISTENING)
255     return 1;
256
257   if (s->session_state == SESSION_STATE_CONNECTING &&
258       (s->flags & SESSION_F_HALF_OPEN))
259     return 1;
260
261   tc = session_get_transport (s);
262   if (s->connection_index != tc->c_index
263       || s->thread_index != tc->thread_index || tc->s_index != si)
264     return 0;
265
266   return 1;
267 }
268
269 static void
270 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
271 {
272   app_worker_t *app_wrk;
273
274   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
275   if (!app_wrk)
276     return;
277   app_worker_cleanup_notify (app_wrk, s, ntf);
278 }
279
280 void
281 session_free_w_fifos (session_t * s)
282 {
283   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
284   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
285   session_free (s);
286 }
287
288 /**
289  * Cleans up session and lookup table.
290  *
291  * Transport connection must still be valid.
292  */
293 static void
294 session_delete (session_t * s)
295 {
296   int rv;
297
298   /* Delete from the main lookup table. */
299   if ((rv = session_lookup_del_session (s)))
300     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
301
302   session_free_w_fifos (s);
303 }
304
305 void
306 session_cleanup_half_open (session_handle_t ho_handle)
307 {
308   session_t *ho = session_get_from_handle (ho_handle);
309
310   /* App transports can migrate their half-opens */
311   if (ho->flags & SESSION_F_IS_MIGRATING)
312     {
313       /* Session still migrating, move to closed state to signal that the
314        * session should be removed. */
315       if (ho->connection_index == ~0)
316         {
317           ho->session_state = SESSION_STATE_CLOSED;
318           return;
319         }
320       /* Migrated transports are no longer half-opens */
321       transport_cleanup (session_get_transport_proto (ho),
322                          ho->connection_index, ho->app_index /* overloaded */);
323     }
324   else
325     transport_cleanup_half_open (session_get_transport_proto (ho),
326                                  ho->connection_index);
327   session_free (ho);
328 }
329
330 static void
331 session_half_open_free (session_t *ho)
332 {
333   app_worker_t *app_wrk;
334
335   ASSERT (vlib_get_thread_index () <= 1);
336   app_wrk = app_worker_get (ho->app_wrk_index);
337   app_worker_del_half_open (app_wrk, ho);
338   session_free (ho);
339 }
340
341 static void
342 session_half_open_free_rpc (void *args)
343 {
344   session_t *ho = ho_session_get (pointer_to_uword (args));
345   session_half_open_free (ho);
346 }
347
348 void
349 session_half_open_delete_notify (transport_connection_t *tc)
350 {
351   /* Notification from ctrl thread accepted without rpc */
352   if (!tc->thread_index)
353     {
354       session_half_open_free (ho_session_get (tc->s_index));
355     }
356   else
357     {
358       void *args = uword_to_pointer ((uword) tc->s_index, void *);
359       session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
360                                             args);
361     }
362 }
363
364 void
365 session_half_open_migrate_notify (transport_connection_t *tc)
366 {
367   session_t *ho;
368
369   ho = ho_session_get (tc->s_index);
370   ho->flags |= SESSION_F_IS_MIGRATING;
371   ho->connection_index = ~0;
372 }
373
374 int
375 session_half_open_migrated_notify (transport_connection_t *tc)
376 {
377   session_t *ho;
378
379   ho = ho_session_get (tc->s_index);
380
381   /* App probably detached so the half-open must be cleaned up */
382   if (ho->session_state == SESSION_STATE_CLOSED)
383     {
384       session_half_open_delete_notify (tc);
385       return -1;
386     }
387   ho->connection_index = tc->c_index;
388   /* Overload app index for half-open with new thread */
389   ho->app_index = tc->thread_index;
390   return 0;
391 }
392
393 session_t *
394 session_alloc_for_connection (transport_connection_t * tc)
395 {
396   session_t *s;
397   u32 thread_index = tc->thread_index;
398
399   ASSERT (thread_index == vlib_get_thread_index ()
400           || transport_protocol_is_cl (tc->proto));
401
402   s = session_alloc (thread_index);
403   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
404   s->session_state = SESSION_STATE_CLOSED;
405
406   /* Attach transport to session and vice versa */
407   s->connection_index = tc->c_index;
408   tc->s_index = s->session_index;
409   return s;
410 }
411
412 session_t *
413 session_alloc_for_half_open (transport_connection_t *tc)
414 {
415   session_t *s;
416
417   s = ho_session_alloc ();
418   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
419   s->connection_index = tc->c_index;
420   tc->s_index = s->session_index;
421   return s;
422 }
423
424 /**
425  * Discards bytes from buffer chain
426  *
427  * It discards n_bytes_to_drop starting at first buffer after chain_b
428  */
429 always_inline void
430 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
431                                      vlib_buffer_t ** chain_b,
432                                      u32 n_bytes_to_drop)
433 {
434   vlib_buffer_t *next = *chain_b;
435   u32 to_drop = n_bytes_to_drop;
436   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
437   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
438     {
439       next = vlib_get_buffer (vm, next->next_buffer);
440       if (next->current_length > to_drop)
441         {
442           vlib_buffer_advance (next, to_drop);
443           to_drop = 0;
444         }
445       else
446         {
447           to_drop -= next->current_length;
448           next->current_length = 0;
449         }
450     }
451   *chain_b = next;
452
453   if (to_drop == 0)
454     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
455 }
456
457 /**
458  * Enqueue buffer chain tail
459  */
460 always_inline int
461 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
462                             u32 offset, u8 is_in_order)
463 {
464   vlib_buffer_t *chain_b;
465   u32 chain_bi, len, diff;
466   vlib_main_t *vm = vlib_get_main ();
467   u8 *data;
468   u32 written = 0;
469   int rv = 0;
470
471   if (is_in_order && offset)
472     {
473       diff = offset - b->current_length;
474       if (diff > b->total_length_not_including_first_buffer)
475         return 0;
476       chain_b = b;
477       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
478       chain_bi = vlib_get_buffer_index (vm, chain_b);
479     }
480   else
481     chain_bi = b->next_buffer;
482
483   do
484     {
485       chain_b = vlib_get_buffer (vm, chain_bi);
486       data = vlib_buffer_get_current (chain_b);
487       len = chain_b->current_length;
488       if (!len)
489         continue;
490       if (is_in_order)
491         {
492           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
493           if (rv == len)
494             {
495               written += rv;
496             }
497           else if (rv < len)
498             {
499               return (rv > 0) ? (written + rv) : written;
500             }
501           else if (rv > len)
502             {
503               written += rv;
504
505               /* written more than what was left in chain */
506               if (written > b->total_length_not_including_first_buffer)
507                 return written;
508
509               /* drop the bytes that have already been delivered */
510               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
511             }
512         }
513       else
514         {
515           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
516           if (rv)
517             {
518               clib_warning ("failed to enqueue multi-buffer seg");
519               return -1;
520             }
521           offset += len;
522         }
523     }
524   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
525           ? chain_b->next_buffer : 0));
526
527   if (is_in_order)
528     return written;
529
530   return 0;
531 }
532
533 void
534 session_fifo_tuning (session_t * s, svm_fifo_t * f,
535                      session_ft_action_t act, u32 len)
536 {
537   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
538     {
539       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
540       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
541       if (CLIB_ASSERT_ENABLE)
542         {
543           segment_manager_t *sm;
544           sm = segment_manager_get (f->segment_manager);
545           ASSERT (f->shr->size >= 4096);
546           ASSERT (f->shr->size <= sm->max_fifo_size);
547         }
548     }
549 }
550
551 /*
552  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
553  * event but on request can queue notification events for later delivery by
554  * calling stream_server_flush_enqueue_events().
555  *
556  * @param tc Transport connection which is to be enqueued data
557  * @param b Buffer to be enqueued
558  * @param offset Offset at which to start enqueueing if out-of-order
559  * @param queue_event Flag to indicate if peer is to be notified or if event
560  *                    is to be queued. The former is useful when more data is
561  *                    enqueued and only one event is to be generated.
562  * @param is_in_order Flag to indicate if data is in order
563  * @return Number of bytes enqueued or a negative value if enqueueing failed.
564  */
565 int
566 session_enqueue_stream_connection (transport_connection_t * tc,
567                                    vlib_buffer_t * b, u32 offset,
568                                    u8 queue_event, u8 is_in_order)
569 {
570   session_t *s;
571   int enqueued = 0, rv, in_order_off;
572
573   s = session_get (tc->s_index, tc->thread_index);
574
575   if (is_in_order)
576     {
577       enqueued = svm_fifo_enqueue (s->rx_fifo,
578                                    b->current_length,
579                                    vlib_buffer_get_current (b));
580       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
581                          && enqueued >= 0))
582         {
583           in_order_off = enqueued > b->current_length ? enqueued : 0;
584           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
585           if (rv > 0)
586             enqueued += rv;
587         }
588     }
589   else
590     {
591       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
592                                          b->current_length,
593                                          vlib_buffer_get_current (b));
594       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
595         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
596       /* if something was enqueued, report even this as success for ooo
597        * segment handling */
598       return rv;
599     }
600
601   if (queue_event)
602     {
603       /* Queue RX event on this fifo. Eventually these will need to be flushed
604        * by calling stream_server_flush_enqueue_events () */
605       session_worker_t *wrk;
606
607       wrk = session_main_get_worker (s->thread_index);
608       if (!(s->flags & SESSION_F_RX_EVT))
609         {
610           s->flags |= SESSION_F_RX_EVT;
611           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
612         }
613
614       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
615     }
616
617   return enqueued;
618 }
619
620 int
621 session_enqueue_dgram_connection (session_t * s,
622                                   session_dgram_hdr_t * hdr,
623                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
624 {
625   int rv;
626
627   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
628           >= b->current_length + sizeof (*hdr));
629
630   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
631     {
632       /* *INDENT-OFF* */
633       svm_fifo_seg_t segs[2] = {
634           { (u8 *) hdr, sizeof (*hdr) },
635           { vlib_buffer_get_current (b), b->current_length }
636       };
637       /* *INDENT-ON* */
638
639       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
640                                       0 /* allow_partial */ );
641     }
642   else
643     {
644       vlib_main_t *vm = vlib_get_main ();
645       svm_fifo_seg_t *segs = 0, *seg;
646       vlib_buffer_t *it = b;
647       u32 n_segs = 1;
648
649       vec_add2 (segs, seg, 1);
650       seg->data = (u8 *) hdr;
651       seg->len = sizeof (*hdr);
652       while (it)
653         {
654           vec_add2 (segs, seg, 1);
655           seg->data = vlib_buffer_get_current (it);
656           seg->len = it->current_length;
657           n_segs++;
658           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
659             break;
660           it = vlib_get_buffer (vm, it->next_buffer);
661         }
662       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
663                                       0 /* allow partial */ );
664       vec_free (segs);
665     }
666
667   if (queue_event && rv > 0)
668     {
669       /* Queue RX event on this fifo. Eventually these will need to be flushed
670        * by calling stream_server_flush_enqueue_events () */
671       session_worker_t *wrk;
672
673       wrk = session_main_get_worker (s->thread_index);
674       if (!(s->flags & SESSION_F_RX_EVT))
675         {
676           s->flags |= SESSION_F_RX_EVT;
677           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
678         }
679
680       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
681     }
682   return rv > 0 ? rv : 0;
683 }
684
685 int
686 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
687                             u32 offset, u32 max_bytes)
688 {
689   session_t *s = session_get (tc->s_index, tc->thread_index);
690   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
691 }
692
693 u32
694 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
695 {
696   session_t *s = session_get (tc->s_index, tc->thread_index);
697   u32 rv;
698
699   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
700   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
701
702   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
703     session_dequeue_notify (s);
704
705   return rv;
706 }
707
708 static inline int
709 session_notify_subscribers (u32 app_index, session_t * s,
710                             svm_fifo_t * f, session_evt_type_t evt_type)
711 {
712   app_worker_t *app_wrk;
713   application_t *app;
714   int i;
715
716   app = application_get (app_index);
717   if (!app)
718     return -1;
719
720   for (i = 0; i < f->shr->n_subscribers; i++)
721     {
722       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
723       if (!app_wrk)
724         continue;
725       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
726         return -1;
727     }
728
729   return 0;
730 }
731
732 /**
733  * Notify session peer that new data has been enqueued.
734  *
735  * @param s     Stream session for which the event is to be generated.
736  * @param lock  Flag to indicate if call should lock message queue.
737  *
738  * @return 0 on success or negative number if failed to send notification.
739  */
740 static inline int
741 session_enqueue_notify_inline (session_t * s)
742 {
743   app_worker_t *app_wrk;
744   u32 session_index;
745   u8 n_subscribers;
746
747   session_index = s->session_index;
748   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
749
750   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
751   if (PREDICT_FALSE (!app_wrk))
752     {
753       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
754       return 0;
755     }
756
757   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
758
759   s->flags &= ~SESSION_F_RX_EVT;
760
761   /* Application didn't confirm accept yet */
762   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
763     return 0;
764
765   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
766                                                      SESSION_IO_EVT_RX)))
767     return -1;
768
769   if (PREDICT_FALSE (n_subscribers))
770     {
771       s = session_get (session_index, vlib_get_thread_index ());
772       return session_notify_subscribers (app_wrk->app_index, s,
773                                          s->rx_fifo, SESSION_IO_EVT_RX);
774     }
775
776   return 0;
777 }
778
779 int
780 session_enqueue_notify (session_t * s)
781 {
782   return session_enqueue_notify_inline (s);
783 }
784
785 static void
786 session_enqueue_notify_rpc (void *arg)
787 {
788   u32 session_index = pointer_to_uword (arg);
789   session_t *s;
790
791   s = session_get_if_valid (session_index, vlib_get_thread_index ());
792   if (!s)
793     return;
794
795   session_enqueue_notify (s);
796 }
797
798 /**
799  * Like session_enqueue_notify, but can be called from a thread that does not
800  * own the session.
801  */
802 void
803 session_enqueue_notify_thread (session_handle_t sh)
804 {
805   u32 thread_index = session_thread_from_handle (sh);
806   u32 session_index = session_index_from_handle (sh);
807
808   /*
809    * Pass session index (u32) as opposed to handle (u64) in case pointers
810    * are not 64-bit.
811    */
812   session_send_rpc_evt_to_thread (thread_index,
813                                   session_enqueue_notify_rpc,
814                                   uword_to_pointer (session_index, void *));
815 }
816
817 int
818 session_dequeue_notify (session_t * s)
819 {
820   app_worker_t *app_wrk;
821
822   svm_fifo_clear_deq_ntf (s->tx_fifo);
823
824   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
825   if (PREDICT_FALSE (!app_wrk))
826     return -1;
827
828   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
829                                                      SESSION_IO_EVT_TX)))
830     return -1;
831
832   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
833     return session_notify_subscribers (app_wrk->app_index, s,
834                                        s->tx_fifo, SESSION_IO_EVT_TX);
835
836   return 0;
837 }
838
839 /**
840  * Flushes queue of sessions that are to be notified of new data
841  * enqueued events.
842  *
843  * @param thread_index Thread index for which the flush is to be performed.
844  * @return 0 on success or a positive number indicating the number of
845  *         failures due to API queue being full.
846  */
847 int
848 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
849 {
850   session_worker_t *wrk = session_main_get_worker (thread_index);
851   session_t *s;
852   int i, errors = 0;
853   u32 *indices;
854
855   indices = wrk->session_to_enqueue[transport_proto];
856
857   for (i = 0; i < vec_len (indices); i++)
858     {
859       s = session_get_if_valid (indices[i], thread_index);
860       if (PREDICT_FALSE (!s))
861         {
862           errors++;
863           continue;
864         }
865
866       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
867                            0 /* TODO/not needed */ );
868
869       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
870         errors++;
871     }
872
873   vec_reset_length (indices);
874   wrk->session_to_enqueue[transport_proto] = indices;
875
876   return errors;
877 }
878
879 int
880 session_main_flush_all_enqueue_events (u8 transport_proto)
881 {
882   vlib_thread_main_t *vtm = vlib_get_thread_main ();
883   int i, errors = 0;
884   for (i = 0; i < 1 + vtm->n_threads; i++)
885     errors += session_main_flush_enqueue_events (transport_proto, i);
886   return errors;
887 }
888
889 int
890 session_stream_connect_notify (transport_connection_t * tc,
891                                session_error_t err)
892 {
893   u32 opaque = 0, new_ti, new_si;
894   app_worker_t *app_wrk;
895   session_t *s = 0, *ho;
896
897   /*
898    * Cleanup half-open table
899    */
900   session_lookup_del_half_open (tc);
901
902   ho = ho_session_get (tc->s_index);
903   opaque = ho->opaque;
904   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
905   if (!app_wrk)
906     return -1;
907
908   if (err)
909     return app_worker_connect_notify (app_wrk, s, err, opaque);
910
911   s = session_alloc_for_connection (tc);
912   s->session_state = SESSION_STATE_CONNECTING;
913   s->app_wrk_index = app_wrk->wrk_index;
914   new_si = s->session_index;
915   new_ti = s->thread_index;
916
917   if ((err = app_worker_init_connected (app_wrk, s)))
918     {
919       session_free (s);
920       app_worker_connect_notify (app_wrk, 0, err, opaque);
921       return -1;
922     }
923
924   s = session_get (new_si, new_ti);
925   s->session_state = SESSION_STATE_READY;
926   session_lookup_add_connection (tc, session_handle (s));
927
928   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
929     {
930       session_lookup_del_connection (tc);
931       /* Avoid notifying app about rejected session cleanup */
932       s = session_get (new_si, new_ti);
933       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
934       session_free (s);
935       return -1;
936     }
937
938   return 0;
939 }
940
941 typedef union session_switch_pool_reply_args_
942 {
943   struct
944   {
945     u32 session_index;
946     u16 thread_index;
947     u8 is_closed;
948   };
949   u64 as_u64;
950 } session_switch_pool_reply_args_t;
951
952 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
953                "switch pool reply args size");
954
955 static void
956 session_switch_pool_reply (void *arg)
957 {
958   session_switch_pool_reply_args_t rargs;
959   session_t *s;
960
961   rargs.as_u64 = pointer_to_uword (arg);
962   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
963   if (!s)
964     return;
965
966   /* Session closed during migration. Clean everything up */
967   if (rargs.is_closed)
968     {
969       transport_cleanup (session_get_transport_proto (s), s->connection_index,
970                          s->thread_index);
971       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
972       session_free (s);
973       return;
974     }
975
976   /* Notify app that it has data on the new session */
977   session_enqueue_notify (s);
978 }
979
980 typedef struct _session_switch_pool_args
981 {
982   u32 session_index;
983   u32 thread_index;
984   u32 new_thread_index;
985   u32 new_session_index;
986 } session_switch_pool_args_t;
987
988 /**
989  * Notify old thread of the session pool switch
990  */
991 static void
992 session_switch_pool (void *cb_args)
993 {
994   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
995   session_switch_pool_reply_args_t rargs;
996   session_handle_t new_sh;
997   segment_manager_t *sm;
998   app_worker_t *app_wrk;
999   session_t *s;
1000
1001   ASSERT (args->thread_index == vlib_get_thread_index ());
1002   s = session_get (args->session_index, args->thread_index);
1003
1004   /* Check if session closed during migration */
1005   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
1006
1007   transport_cleanup (session_get_transport_proto (s), s->connection_index,
1008                      s->thread_index);
1009
1010   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1011   if (app_wrk)
1012     {
1013       /* Cleanup fifo segment slice state for fifos */
1014       sm = app_worker_get_connect_segment_manager (app_wrk);
1015       segment_manager_detach_fifo (sm, &s->rx_fifo);
1016       segment_manager_detach_fifo (sm, &s->tx_fifo);
1017
1018       /* Notify app, using old session, about the migration event */
1019       if (!rargs.is_closed)
1020         {
1021           new_sh = session_make_handle (args->new_session_index,
1022                                         args->new_thread_index);
1023           app_worker_migrate_notify (app_wrk, s, new_sh);
1024         }
1025     }
1026
1027   /* Trigger app read and fifo updates on the new thread */
1028   rargs.session_index = args->new_session_index;
1029   rargs.thread_index = args->new_thread_index;
1030   session_send_rpc_evt_to_thread (args->new_thread_index,
1031                                   session_switch_pool_reply,
1032                                   uword_to_pointer (rargs.as_u64, void *));
1033
1034   session_free (s);
1035   clib_mem_free (cb_args);
1036 }
1037
1038 /**
1039  * Move dgram session to the right thread
1040  */
1041 int
1042 session_dgram_connect_notify (transport_connection_t * tc,
1043                               u32 old_thread_index, session_t ** new_session)
1044 {
1045   session_t *new_s;
1046   session_switch_pool_args_t *rpc_args;
1047   segment_manager_t *sm;
1048   app_worker_t *app_wrk;
1049
1050   /*
1051    * Clone half-open session to the right thread.
1052    */
1053   new_s = session_clone_safe (tc->s_index, old_thread_index);
1054   new_s->connection_index = tc->c_index;
1055   new_s->session_state = SESSION_STATE_READY;
1056   new_s->flags |= SESSION_F_IS_MIGRATING;
1057
1058   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1059     session_lookup_add_connection (tc, session_handle (new_s));
1060
1061   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1062   if (app_wrk)
1063     {
1064       /* New set of fifos attached to the same shared memory */
1065       sm = app_worker_get_connect_segment_manager (app_wrk);
1066       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1067       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1068     }
1069
1070   /*
1071    * Ask thread owning the old session to clean it up and make us the tx
1072    * fifo owner
1073    */
1074   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1075   rpc_args->new_session_index = new_s->session_index;
1076   rpc_args->new_thread_index = new_s->thread_index;
1077   rpc_args->session_index = tc->s_index;
1078   rpc_args->thread_index = old_thread_index;
1079   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1080                                   rpc_args);
1081
1082   tc->s_index = new_s->session_index;
1083   new_s->connection_index = tc->c_index;
1084   *new_session = new_s;
1085   return 0;
1086 }
1087
1088 /**
1089  * Notification from transport that connection is being closed.
1090  *
1091  * A disconnect is sent to application but state is not removed. Once
1092  * disconnect is acknowledged by application, session disconnect is called.
1093  * Ultimately this leads to close being called on transport (passive close).
1094  */
1095 void
1096 session_transport_closing_notify (transport_connection_t * tc)
1097 {
1098   app_worker_t *app_wrk;
1099   session_t *s;
1100
1101   s = session_get (tc->s_index, tc->thread_index);
1102   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1103     return;
1104
1105   /* Wait for reply from app before sending notification as the
1106    * accept might be rejected */
1107   if (s->session_state == SESSION_STATE_ACCEPTING)
1108     {
1109       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1110       return;
1111     }
1112
1113   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1114   app_wrk = app_worker_get (s->app_wrk_index);
1115   app_worker_close_notify (app_wrk, s);
1116 }
1117
1118 /**
1119  * Notification from transport that connection is being deleted
1120  *
1121  * This removes the session if it is still valid. It should be called only on
1122  * previously fully established sessions. For instance failed connects should
1123  * call stream_session_connect_notify and indicate that the connect has
1124  * failed.
1125  */
1126 void
1127 session_transport_delete_notify (transport_connection_t * tc)
1128 {
1129   session_t *s;
1130
1131   /* App might've been removed already */
1132   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1133     return;
1134
1135   switch (s->session_state)
1136     {
1137     case SESSION_STATE_CREATED:
1138       /* Session was created but accept notification was not yet sent to the
1139        * app. Cleanup everything. */
1140       session_lookup_del_session (s);
1141       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1142       session_free (s);
1143       break;
1144     case SESSION_STATE_ACCEPTING:
1145     case SESSION_STATE_TRANSPORT_CLOSING:
1146     case SESSION_STATE_CLOSING:
1147     case SESSION_STATE_TRANSPORT_CLOSED:
1148       /* If transport finishes or times out before we get a reply
1149        * from the app, mark transport as closed and wait for reply
1150        * before removing the session. Cleanup session table in advance
1151        * because transport will soon be closed and closed sessions
1152        * are assumed to have been removed from the lookup table */
1153       session_lookup_del_session (s);
1154       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1155       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1156       svm_fifo_dequeue_drop_all (s->tx_fifo);
1157       break;
1158     case SESSION_STATE_APP_CLOSED:
1159       /* Cleanup lookup table as transport needs to still be valid.
1160        * Program transport close to ensure that all session events
1161        * have been cleaned up. Once transport close is called, the
1162        * session is just removed because both transport and app have
1163        * confirmed the close*/
1164       session_lookup_del_session (s);
1165       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1166       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1167       svm_fifo_dequeue_drop_all (s->tx_fifo);
1168       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1169       break;
1170     case SESSION_STATE_TRANSPORT_DELETED:
1171       break;
1172     case SESSION_STATE_CLOSED:
1173       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1174       session_delete (s);
1175       break;
1176     default:
1177       clib_warning ("session state %u", s->session_state);
1178       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1179       session_delete (s);
1180       break;
1181     }
1182 }
1183
1184 /**
1185  * Notification from transport that it is closed
1186  *
1187  * Should be called by transport, prior to calling delete notify, once it
1188  * knows that no more data will be exchanged. This could serve as an
1189  * early acknowledgment of an active close especially if transport delete
1190  * can be delayed a long time, e.g., tcp time-wait.
1191  */
1192 void
1193 session_transport_closed_notify (transport_connection_t * tc)
1194 {
1195   app_worker_t *app_wrk;
1196   session_t *s;
1197
1198   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1199     return;
1200
1201   /* Transport thinks that app requested close but it actually didn't.
1202    * Can happen for tcp:
1203    * 1)if fin and rst are received in close succession.
1204    * 2)if app shutdown the connection.  */
1205   if (s->session_state == SESSION_STATE_READY)
1206     {
1207       session_transport_closing_notify (tc);
1208       svm_fifo_dequeue_drop_all (s->tx_fifo);
1209       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1210     }
1211   /* If app close has not been received or has not yet resulted in
1212    * a transport close, only mark the session transport as closed */
1213   else if (s->session_state <= SESSION_STATE_CLOSING)
1214     {
1215       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1216     }
1217   /* If app also closed, switch to closed */
1218   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1219     s->session_state = SESSION_STATE_CLOSED;
1220
1221   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1222   if (app_wrk)
1223     app_worker_transport_closed_notify (app_wrk, s);
1224 }
1225
1226 /**
1227  * Notify application that connection has been reset.
1228  */
1229 void
1230 session_transport_reset_notify (transport_connection_t * tc)
1231 {
1232   app_worker_t *app_wrk;
1233   session_t *s;
1234
1235   s = session_get (tc->s_index, tc->thread_index);
1236   svm_fifo_dequeue_drop_all (s->tx_fifo);
1237   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1238     return;
1239   if (s->session_state == SESSION_STATE_ACCEPTING)
1240     {
1241       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1242       return;
1243     }
1244   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1245   app_wrk = app_worker_get (s->app_wrk_index);
1246   app_worker_reset_notify (app_wrk, s);
1247 }
1248
1249 int
1250 session_stream_accept_notify (transport_connection_t * tc)
1251 {
1252   app_worker_t *app_wrk;
1253   session_t *s;
1254
1255   s = session_get (tc->s_index, tc->thread_index);
1256   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1257   if (!app_wrk)
1258     return -1;
1259   if (s->session_state != SESSION_STATE_CREATED)
1260     return 0;
1261   s->session_state = SESSION_STATE_ACCEPTING;
1262   if (app_worker_accept_notify (app_wrk, s))
1263     {
1264       /* On transport delete, no notifications should be sent. Unless, the
1265        * accept is retried and successful. */
1266       s->session_state = SESSION_STATE_CREATED;
1267       return -1;
1268     }
1269   return 0;
1270 }
1271
1272 /**
1273  * Accept a stream session. Optionally ping the server by callback.
1274  */
1275 int
1276 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1277                        u32 thread_index, u8 notify)
1278 {
1279   session_t *s;
1280   int rv;
1281
1282   s = session_alloc_for_connection (tc);
1283   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1284   s->session_state = SESSION_STATE_CREATED;
1285
1286   if ((rv = app_worker_init_accepted (s)))
1287     {
1288       session_free (s);
1289       return rv;
1290     }
1291
1292   session_lookup_add_connection (tc, session_handle (s));
1293
1294   /* Shoulder-tap the server */
1295   if (notify)
1296     {
1297       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1298       if ((rv = app_worker_accept_notify (app_wrk, s)))
1299         {
1300           session_lookup_del_session (s);
1301           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1302           session_free (s);
1303           return rv;
1304         }
1305     }
1306
1307   return 0;
1308 }
1309
1310 int
1311 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1312                       u32 thread_index)
1313 {
1314   app_worker_t *app_wrk;
1315   session_t *s;
1316   int rv;
1317
1318   s = session_alloc_for_connection (tc);
1319   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1320
1321   if ((rv = app_worker_init_accepted (s)))
1322     {
1323       session_free (s);
1324       return rv;
1325     }
1326
1327   session_lookup_add_connection (tc, session_handle (s));
1328
1329   app_wrk = app_worker_get (s->app_wrk_index);
1330   if ((rv = app_worker_accept_notify (app_wrk, s)))
1331     {
1332       session_lookup_del_session (s);
1333       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1334       session_free (s);
1335       return rv;
1336     }
1337
1338   return 0;
1339 }
1340
1341 int
1342 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1343 {
1344   transport_connection_t *tc;
1345   transport_endpoint_cfg_t *tep;
1346   app_worker_t *app_wrk;
1347   session_handle_t sh;
1348   session_t *s;
1349   int rv;
1350
1351   tep = session_endpoint_to_transport_cfg (rmt);
1352   rv = transport_connect (rmt->transport_proto, tep);
1353   if (rv < 0)
1354     {
1355       SESSION_DBG ("Transport failed to open connection.");
1356       return rv;
1357     }
1358
1359   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1360
1361   /* For dgram type of service, allocate session and fifos now */
1362   app_wrk = app_worker_get (rmt->app_wrk_index);
1363   s = session_alloc_for_connection (tc);
1364   s->app_wrk_index = app_wrk->wrk_index;
1365   s->session_state = SESSION_STATE_OPENED;
1366   if (app_worker_init_connected (app_wrk, s))
1367     {
1368       session_free (s);
1369       return -1;
1370     }
1371
1372   sh = session_handle (s);
1373   *rsh = sh;
1374
1375   session_lookup_add_connection (tc, sh);
1376   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1377 }
1378
1379 int
1380 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1381 {
1382   transport_connection_t *tc;
1383   transport_endpoint_cfg_t *tep;
1384   app_worker_t *app_wrk;
1385   session_t *ho;
1386   int rv;
1387
1388   tep = session_endpoint_to_transport_cfg (rmt);
1389   rv = transport_connect (rmt->transport_proto, tep);
1390   if (rv < 0)
1391     {
1392       SESSION_DBG ("Transport failed to open connection.");
1393       return rv;
1394     }
1395
1396   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1397
1398   app_wrk = app_worker_get (rmt->app_wrk_index);
1399
1400   /* If transport offers a vc service, only allocate established
1401    * session once the connection has been established.
1402    * In the meantime allocate half-open session for tracking purposes
1403    * associate half-open connection to it and add session to app-worker
1404    * half-open table. These are needed to allocate the established
1405    * session on transport notification, and to cleanup the half-open
1406    * session if the app detaches before connection establishment.
1407    */
1408   ho = session_alloc_for_half_open (tc);
1409   ho->app_wrk_index = app_wrk->wrk_index;
1410   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1411   ho->opaque = rmt->opaque;
1412   *rsh = session_handle (ho);
1413
1414   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1415     session_lookup_add_half_open (tc, tc->c_index);
1416
1417   return 0;
1418 }
1419
1420 int
1421 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1422 {
1423   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1424
1425   /* Not supported for now */
1426   *rsh = SESSION_INVALID_HANDLE;
1427   return transport_connect (rmt->transport_proto, tep_cfg);
1428 }
1429
1430 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1431                                         session_handle_t *);
1432
1433 /* *INDENT-OFF* */
1434 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1435   session_open_vc,
1436   session_open_cl,
1437   session_open_app,
1438 };
1439 /* *INDENT-ON* */
1440
1441 /**
1442  * Ask transport to open connection to remote transport endpoint.
1443  *
1444  * Stores handle for matching request with reply since the call can be
1445  * asynchronous. For instance, for TCP the 3-way handshake must complete
1446  * before reply comes. Session is only created once connection is established.
1447  *
1448  * @param app_index Index of the application requesting the connect
1449  * @param st Session type requested.
1450  * @param tep Remote transport endpoint
1451  * @param opaque Opaque data (typically, api_context) the application expects
1452  *               on open completion.
1453  */
1454 int
1455 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1456 {
1457   transport_service_type_t tst;
1458   tst = transport_protocol_service_type (rmt->transport_proto);
1459   return session_open_srv_fns[tst](rmt, rsh);
1460 }
1461
1462 /**
1463  * Ask transport to listen on session endpoint.
1464  *
1465  * @param s Session for which listen will be called. Note that unlike
1466  *          established sessions, listen sessions are not associated to a
1467  *          thread.
1468  * @param sep Local endpoint to be listened on.
1469  */
1470 int
1471 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1472 {
1473   transport_endpoint_cfg_t *tep;
1474   int tc_index;
1475   u32 s_index;
1476
1477   /* Transport bind/listen */
1478   tep = session_endpoint_to_transport_cfg (sep);
1479   s_index = ls->session_index;
1480   tc_index = transport_start_listen (session_get_transport_proto (ls),
1481                                      s_index, tep);
1482
1483   if (tc_index < 0)
1484     return tc_index;
1485
1486   /* Attach transport to session. Lookup tables are populated by the app
1487    * worker because local tables (for ct sessions) are not backed by a fib */
1488   ls = listen_session_get (s_index);
1489   ls->connection_index = tc_index;
1490
1491   return 0;
1492 }
1493
1494 /**
1495  * Ask transport to stop listening on local transport endpoint.
1496  *
1497  * @param s Session to stop listening on. It must be in state LISTENING.
1498  */
1499 int
1500 session_stop_listen (session_t * s)
1501 {
1502   transport_proto_t tp = session_get_transport_proto (s);
1503   transport_connection_t *tc;
1504
1505   if (s->session_state != SESSION_STATE_LISTENING)
1506     return SESSION_E_NOLISTEN;
1507
1508   tc = transport_get_listener (tp, s->connection_index);
1509
1510   /* If no transport, assume everything was cleaned up already */
1511   if (!tc)
1512     return SESSION_E_NONE;
1513
1514   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1515     session_lookup_del_connection (tc);
1516
1517   transport_stop_listen (tp, s->connection_index);
1518   return 0;
1519 }
1520
1521 /**
1522  * Initialize session half-closing procedure.
1523  *
1524  * Note that half-closing will not change the state of the session.
1525  */
1526 void
1527 session_half_close (session_t *s)
1528 {
1529   if (!s)
1530     return;
1531
1532   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1533 }
1534
1535 /**
1536  * Initialize session closing procedure.
1537  *
1538  * Request is always sent to session node to ensure that all outstanding
1539  * requests are served before transport is notified.
1540  */
1541 void
1542 session_close (session_t * s)
1543 {
1544   if (!s)
1545     return;
1546
1547   if (s->session_state >= SESSION_STATE_CLOSING)
1548     {
1549       /* Session will only be removed once both app and transport
1550        * acknowledge the close */
1551       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1552           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1553         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1554       return;
1555     }
1556
1557   /* App closed so stop propagating dequeue notifications */
1558   svm_fifo_clear_deq_ntf (s->tx_fifo);
1559   s->session_state = SESSION_STATE_CLOSING;
1560   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1561 }
1562
1563 /**
1564  * Force a close without waiting for data to be flushed
1565  */
1566 void
1567 session_reset (session_t * s)
1568 {
1569   if (s->session_state >= SESSION_STATE_CLOSING)
1570     return;
1571   /* Drop all outstanding tx data */
1572   svm_fifo_dequeue_drop_all (s->tx_fifo);
1573   s->session_state = SESSION_STATE_CLOSING;
1574   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1575 }
1576
1577 /**
1578  * Notify transport the session can be half-disconnected.
1579  *
1580  * Must be called from the session's thread.
1581  */
1582 void
1583 session_transport_half_close (session_t *s)
1584 {
1585   /* Only READY session can be half-closed */
1586   if (s->session_state != SESSION_STATE_READY)
1587     {
1588       return;
1589     }
1590
1591   transport_half_close (session_get_transport_proto (s), s->connection_index,
1592                         s->thread_index);
1593 }
1594
1595 /**
1596  * Notify transport the session can be disconnected. This should eventually
1597  * result in a delete notification that allows us to cleanup session state.
1598  * Called for both active/passive disconnects.
1599  *
1600  * Must be called from the session's thread.
1601  */
1602 void
1603 session_transport_close (session_t * s)
1604 {
1605   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1606     {
1607       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1608         s->session_state = SESSION_STATE_CLOSED;
1609       /* If transport is already deleted, just free the session */
1610       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1611         session_free_w_fifos (s);
1612       return;
1613     }
1614
1615   /* If the tx queue wasn't drained, the transport can continue to try
1616    * sending the outstanding data (in closed state it cannot). It MUST however
1617    * at one point, either after sending everything or after a timeout, call
1618    * delete notify. This will finally lead to the complete cleanup of the
1619    * session.
1620    */
1621   s->session_state = SESSION_STATE_APP_CLOSED;
1622
1623   transport_close (session_get_transport_proto (s), s->connection_index,
1624                    s->thread_index);
1625 }
1626
1627 /**
1628  * Force transport close
1629  */
1630 void
1631 session_transport_reset (session_t * s)
1632 {
1633   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1634     {
1635       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1636         s->session_state = SESSION_STATE_CLOSED;
1637       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1638         session_free_w_fifos (s);
1639       return;
1640     }
1641
1642   s->session_state = SESSION_STATE_APP_CLOSED;
1643   transport_reset (session_get_transport_proto (s), s->connection_index,
1644                    s->thread_index);
1645 }
1646
1647 /**
1648  * Cleanup transport and session state.
1649  *
1650  * Notify transport of the cleanup and free the session. This should
1651  * be called only if transport reported some error and is already
1652  * closed.
1653  */
1654 void
1655 session_transport_cleanup (session_t * s)
1656 {
1657   /* Delete from main lookup table before we axe the the transport */
1658   session_lookup_del_session (s);
1659   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1660     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1661                        s->thread_index);
1662   /* Since we called cleanup, no delete notification will come. So, make
1663    * sure the session is properly freed. */
1664   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1665   session_free (s);
1666 }
1667
1668 /**
1669  * Allocate worker mqs in share-able segment
1670  *
1671  * That can only be a newly created memfd segment, that must be mapped
1672  * by all apps/stack users unless private rx mqs are enabled.
1673  */
1674 void
1675 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1676 {
1677   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1678   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1679   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1680   uword mqs_seg_size;
1681   int i;
1682
1683   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1684
1685   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1686     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1687   };
1688   cfg->consumer_pid = 0;
1689   cfg->n_rings = 2;
1690   cfg->q_nitems = mq_q_length;
1691   cfg->ring_cfgs = rc;
1692
1693   /*
1694    * Compute mqs segment size based on rings config and leave space
1695    * for passing extended configuration messages, i.e., data allocated
1696    * outside of the rings. If provided with a config value, accept it
1697    * if larger than minimum size.
1698    */
1699   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1700   mqs_seg_size = mqs_seg_size + (32 << 10);
1701   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1702
1703   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1704   mqs_seg->ssvm.my_pid = getpid ();
1705   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1706
1707   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1708     {
1709       clib_warning ("failed to initialize queue segment");
1710       return;
1711     }
1712
1713   fifo_segment_init (mqs_seg);
1714
1715   /* Special fifo segment that's filled only with mqs */
1716   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1717
1718   for (i = 0; i < vec_len (smm->wrk); i++)
1719     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1720 }
1721
1722 fifo_segment_t *
1723 session_main_get_wrk_mqs_segment (void)
1724 {
1725   return &session_main.wrk_mqs_segment;
1726 }
1727
1728 u64
1729 session_segment_handle (session_t * s)
1730 {
1731   svm_fifo_t *f;
1732
1733   if (!s->rx_fifo)
1734     return SESSION_INVALID_HANDLE;
1735
1736   f = s->rx_fifo;
1737   return segment_manager_make_segment_handle (f->segment_manager,
1738                                               f->segment_index);
1739 }
1740
1741 /* *INDENT-OFF* */
1742 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1743     session_tx_fifo_peek_and_snd,
1744     session_tx_fifo_dequeue_and_snd,
1745     session_tx_fifo_dequeue_internal,
1746     session_tx_fifo_dequeue_and_snd
1747 };
1748 /* *INDENT-ON* */
1749
1750 void
1751 session_register_transport (transport_proto_t transport_proto,
1752                             const transport_proto_vft_t * vft, u8 is_ip4,
1753                             u32 output_node)
1754 {
1755   session_main_t *smm = &session_main;
1756   session_type_t session_type;
1757   u32 next_index = ~0;
1758
1759   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1760
1761   vec_validate (smm->session_type_to_next, session_type);
1762   vec_validate (smm->session_tx_fns, session_type);
1763
1764   if (output_node != ~0)
1765     next_index = vlib_node_add_next (vlib_get_main (),
1766                                      session_queue_node.index, output_node);
1767
1768   smm->session_type_to_next[session_type] = next_index;
1769   smm->session_tx_fns[session_type] =
1770     session_tx_fns[vft->transport_options.tx_type];
1771 }
1772
1773 void
1774 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1775 {
1776   session_main_t *smm = &session_main;
1777   session_update_time_fn *fi;
1778   u32 fi_pos = ~0;
1779   u8 found = 0;
1780
1781   vec_foreach (fi, smm->update_time_fns)
1782     {
1783       if (*fi == fn)
1784         {
1785           fi_pos = fi - smm->update_time_fns;
1786           found = 1;
1787           break;
1788         }
1789     }
1790
1791   if (is_add)
1792     {
1793       if (found)
1794         {
1795           clib_warning ("update time fn %p already registered", fn);
1796           return;
1797         }
1798       vec_add1 (smm->update_time_fns, fn);
1799     }
1800   else
1801     {
1802       vec_del1 (smm->update_time_fns, fi_pos);
1803     }
1804 }
1805
1806 transport_proto_t
1807 session_add_transport_proto (void)
1808 {
1809   session_main_t *smm = &session_main;
1810   session_worker_t *wrk;
1811   u32 thread;
1812
1813   smm->last_transport_proto_type += 1;
1814
1815   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1816     {
1817       wrk = session_main_get_worker (thread);
1818       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1819     }
1820
1821   return smm->last_transport_proto_type;
1822 }
1823
1824 transport_connection_t *
1825 session_get_transport (session_t * s)
1826 {
1827   if (s->session_state != SESSION_STATE_LISTENING)
1828     return transport_get_connection (session_get_transport_proto (s),
1829                                      s->connection_index, s->thread_index);
1830   else
1831     return transport_get_listener (session_get_transport_proto (s),
1832                                    s->connection_index);
1833 }
1834
1835 void
1836 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1837 {
1838   if (s->session_state != SESSION_STATE_LISTENING)
1839     return transport_get_endpoint (session_get_transport_proto (s),
1840                                    s->connection_index, s->thread_index, tep,
1841                                    is_lcl);
1842   else
1843     return transport_get_listener_endpoint (session_get_transport_proto (s),
1844                                             s->connection_index, tep, is_lcl);
1845 }
1846
1847 int
1848 session_transport_attribute (session_t *s, u8 is_get,
1849                              transport_endpt_attr_t *attr)
1850 {
1851   if (s->session_state < SESSION_STATE_READY)
1852     return -1;
1853
1854   return transport_connection_attribute (session_get_transport_proto (s),
1855                                          s->connection_index, s->thread_index,
1856                                          is_get, attr);
1857 }
1858
1859 transport_connection_t *
1860 listen_session_get_transport (session_t * s)
1861 {
1862   return transport_get_listener (session_get_transport_proto (s),
1863                                  s->connection_index);
1864 }
1865
1866 void
1867 session_queue_run_on_main_thread (vlib_main_t * vm)
1868 {
1869   ASSERT (vlib_get_thread_index () == 0);
1870   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1871 }
1872
1873 static clib_error_t *
1874 session_manager_main_enable (vlib_main_t * vm)
1875 {
1876   session_main_t *smm = &session_main;
1877   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1878   u32 num_threads, preallocated_sessions_per_worker;
1879   session_worker_t *wrk;
1880   int i;
1881
1882   /* We only initialize once and do not de-initialized on disable */
1883   if (smm->is_initialized)
1884     goto done;
1885
1886   num_threads = 1 /* main thread */  + vtm->n_threads;
1887
1888   if (num_threads < 1)
1889     return clib_error_return (0, "n_thread_stacks not set");
1890
1891   /* Allocate cache line aligned worker contexts */
1892   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1893   clib_spinlock_init (&session_main.pool_realloc_lock);
1894
1895   for (i = 0; i < num_threads; i++)
1896     {
1897       wrk = &smm->wrk[i];
1898       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1899       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1900       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1901       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1902       wrk->evts_pending_main =
1903         clib_llist_make_head (wrk->event_elts, evt_list);
1904       wrk->vm = vlib_get_main_by_index (i);
1905       wrk->last_vlib_time = vlib_time_now (vm);
1906       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1907       wrk->timerfd = -1;
1908       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1909
1910       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1911         session_wrk_enable_adaptive_mode (wrk);
1912     }
1913
1914   /* Allocate vpp event queues segment and queue */
1915   session_vpp_wrk_mqs_alloc (smm);
1916
1917   /* Initialize segment manager properties */
1918   segment_manager_main_init ();
1919
1920   /* Preallocate sessions */
1921   if (smm->preallocated_sessions)
1922     {
1923       if (num_threads == 1)
1924         {
1925           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1926         }
1927       else
1928         {
1929           int j;
1930           preallocated_sessions_per_worker =
1931             (1.1 * (f64) smm->preallocated_sessions /
1932              (f64) (num_threads - 1));
1933
1934           for (j = 1; j < num_threads; j++)
1935             {
1936               pool_init_fixed (smm->wrk[j].sessions,
1937                                preallocated_sessions_per_worker);
1938             }
1939         }
1940     }
1941
1942   session_lookup_init ();
1943   app_namespaces_init ();
1944   transport_init ();
1945   smm->is_initialized = 1;
1946
1947 done:
1948
1949   smm->is_enabled = 1;
1950
1951   /* Enable transports */
1952   transport_enable_disable (vm, 1);
1953   session_debug_init ();
1954
1955   return 0;
1956 }
1957
1958 static void
1959 session_manager_main_disable (vlib_main_t * vm)
1960 {
1961   transport_enable_disable (vm, 0 /* is_en */ );
1962 }
1963
1964 void
1965 session_node_enable_disable (u8 is_en)
1966 {
1967   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
1968   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1969   session_main_t *sm = &session_main;
1970   vlib_main_t *vm;
1971   vlib_node_t *n;
1972   int n_vlibs, i;
1973
1974   n_vlibs = vlib_get_n_threads ();
1975   for (i = 0; i < n_vlibs; i++)
1976     {
1977       vm = vlib_get_main_by_index (i);
1978       /* main thread with workers and not polling */
1979       if (i == 0 && n_vlibs > 1)
1980         {
1981           vlib_node_set_state (vm, session_queue_node.index, mstate);
1982           if (is_en)
1983             {
1984               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
1985               vlib_node_set_state (vm, session_queue_process_node.index,
1986                                    state);
1987               n = vlib_get_node (vm, session_queue_process_node.index);
1988               vlib_start_process (vm, n->runtime_index);
1989             }
1990           else
1991             {
1992               vlib_process_signal_event_mt (vm,
1993                                             session_queue_process_node.index,
1994                                             SESSION_Q_PROCESS_STOP, 0);
1995             }
1996           if (!sm->poll_main)
1997             continue;
1998         }
1999       vlib_node_set_state (vm, session_queue_node.index, state);
2000     }
2001
2002   if (sm->use_private_rx_mqs)
2003     application_enable_rx_mqs_nodes (is_en);
2004 }
2005
2006 clib_error_t *
2007 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2008 {
2009   clib_error_t *error = 0;
2010   if (is_en)
2011     {
2012       if (session_main.is_enabled)
2013         return 0;
2014
2015       error = session_manager_main_enable (vm);
2016       session_node_enable_disable (is_en);
2017     }
2018   else
2019     {
2020       session_main.is_enabled = 0;
2021       session_manager_main_disable (vm);
2022       session_node_enable_disable (is_en);
2023     }
2024
2025   return error;
2026 }
2027
2028 clib_error_t *
2029 session_main_init (vlib_main_t * vm)
2030 {
2031   session_main_t *smm = &session_main;
2032
2033   smm->is_enabled = 0;
2034   smm->session_enable_asap = 0;
2035   smm->poll_main = 0;
2036   smm->use_private_rx_mqs = 0;
2037   smm->no_adaptive = 0;
2038   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2039
2040   return 0;
2041 }
2042
2043 static clib_error_t *
2044 session_main_loop_init (vlib_main_t * vm)
2045 {
2046   session_main_t *smm = &session_main;
2047   if (smm->session_enable_asap)
2048     {
2049       vlib_worker_thread_barrier_sync (vm);
2050       vnet_session_enable_disable (vm, 1 /* is_en */ );
2051       vlib_worker_thread_barrier_release (vm);
2052     }
2053   return 0;
2054 }
2055
2056 VLIB_INIT_FUNCTION (session_main_init);
2057 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2058
2059 static clib_error_t *
2060 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2061 {
2062   session_main_t *smm = &session_main;
2063   u32 nitems;
2064   uword tmp;
2065
2066   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2067     {
2068       if (unformat (input, "wrk-mq-length %d", &nitems))
2069         {
2070           if (nitems >= 2048)
2071             smm->configured_wrk_mq_length = nitems;
2072           else
2073             clib_warning ("event queue length %d too small, ignored", nitems);
2074         }
2075       else if (unformat (input, "preallocated-sessions %d",
2076                          &smm->preallocated_sessions))
2077         ;
2078       else if (unformat (input, "v4-session-table-buckets %d",
2079                          &smm->configured_v4_session_table_buckets))
2080         ;
2081       else if (unformat (input, "v4-halfopen-table-buckets %d",
2082                          &smm->configured_v4_halfopen_table_buckets))
2083         ;
2084       else if (unformat (input, "v6-session-table-buckets %d",
2085                          &smm->configured_v6_session_table_buckets))
2086         ;
2087       else if (unformat (input, "v6-halfopen-table-buckets %d",
2088                          &smm->configured_v6_halfopen_table_buckets))
2089         ;
2090       else if (unformat (input, "v4-session-table-memory %U",
2091                          unformat_memory_size, &tmp))
2092         {
2093           if (tmp >= 0x100000000)
2094             return clib_error_return (0, "memory size %llx (%lld) too large",
2095                                       tmp, tmp);
2096           smm->configured_v4_session_table_memory = tmp;
2097         }
2098       else if (unformat (input, "v4-halfopen-table-memory %U",
2099                          unformat_memory_size, &tmp))
2100         {
2101           if (tmp >= 0x100000000)
2102             return clib_error_return (0, "memory size %llx (%lld) too large",
2103                                       tmp, tmp);
2104           smm->configured_v4_halfopen_table_memory = tmp;
2105         }
2106       else if (unformat (input, "v6-session-table-memory %U",
2107                          unformat_memory_size, &tmp))
2108         {
2109           if (tmp >= 0x100000000)
2110             return clib_error_return (0, "memory size %llx (%lld) too large",
2111                                       tmp, tmp);
2112           smm->configured_v6_session_table_memory = tmp;
2113         }
2114       else if (unformat (input, "v6-halfopen-table-memory %U",
2115                          unformat_memory_size, &tmp))
2116         {
2117           if (tmp >= 0x100000000)
2118             return clib_error_return (0, "memory size %llx (%lld) too large",
2119                                       tmp, tmp);
2120           smm->configured_v6_halfopen_table_memory = tmp;
2121         }
2122       else if (unformat (input, "local-endpoints-table-memory %U",
2123                          unformat_memory_size, &tmp))
2124         {
2125           if (tmp >= 0x100000000)
2126             return clib_error_return (0, "memory size %llx (%lld) too large",
2127                                       tmp, tmp);
2128           smm->local_endpoints_table_memory = tmp;
2129         }
2130       else if (unformat (input, "local-endpoints-table-buckets %d",
2131                          &smm->local_endpoints_table_buckets))
2132         ;
2133       else if (unformat (input, "enable"))
2134         smm->session_enable_asap = 1;
2135       else if (unformat (input, "use-app-socket-api"))
2136         (void) appns_sapi_enable_disable (1 /* is_enable */);
2137       else if (unformat (input, "poll-main"))
2138         smm->poll_main = 1;
2139       else if (unformat (input, "use-private-rx-mqs"))
2140         smm->use_private_rx_mqs = 1;
2141       else if (unformat (input, "no-adaptive"))
2142         smm->no_adaptive = 1;
2143       /*
2144        * Deprecated but maintained for compatibility
2145        */
2146       else if (unformat (input, "evt_qs_memfd_seg"))
2147         ;
2148       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2149         ;
2150       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2151                          &tmp))
2152         ;
2153       else if (unformat (input, "event-queue-length %d", &nitems))
2154         {
2155           if (nitems >= 2048)
2156             smm->configured_wrk_mq_length = nitems;
2157           else
2158             clib_warning ("event queue length %d too small, ignored", nitems);
2159         }
2160       else
2161         return clib_error_return (0, "unknown input `%U'",
2162                                   format_unformat_error, input);
2163     }
2164   return 0;
2165 }
2166
2167 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2168
2169 /*
2170  * fd.io coding-style-patch-verification: ON
2171  *
2172  * Local Variables:
2173  * eval: (c-set-style "gnu")
2174  * End:
2175  */