session svm: fix mq producer wait on q and ring
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24
25 session_main_t session_main;
26
27 static inline int
28 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
29                             session_evt_type_t evt_type)
30 {
31   session_worker_t *wrk = session_main_get_worker (thread_index);
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35
36   mq = wrk->vpp_event_queue;
37   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
38     return -1;
39   if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
40     {
41       svm_msg_q_unlock (mq);
42       return -2;
43     }
44   switch (evt_type)
45     {
46     case SESSION_CTRL_EVT_RPC:
47       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
48       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
49       evt->rpc_args.fp = data;
50       evt->rpc_args.arg = args;
51       break;
52     case SESSION_IO_EVT_RX:
53     case SESSION_IO_EVT_TX:
54     case SESSION_IO_EVT_TX_FLUSH:
55     case SESSION_IO_EVT_BUILTIN_RX:
56       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
57       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
58       evt->session_index = *(u32 *) data;
59       break;
60     case SESSION_IO_EVT_BUILTIN_TX:
61     case SESSION_CTRL_EVT_CLOSE:
62     case SESSION_CTRL_EVT_RESET:
63       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
64       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
65       evt->session_handle = session_handle ((session_t *) data);
66       break;
67     default:
68       clib_warning ("evt unhandled!");
69       svm_msg_q_unlock (mq);
70       return -1;
71     }
72   evt->event_type = evt_type;
73
74   svm_msg_q_add_and_unlock (mq, &msg);
75
76   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
77     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
78
79   return 0;
80 }
81
82 int
83 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
84 {
85   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
86                                      f->master_thread_index, evt_type);
87 }
88
89 int
90 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
91                                       session_evt_type_t evt_type)
92 {
93   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
94 }
95
96 int
97 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
98 {
99   /* only events supported are disconnect, shutdown and reset */
100   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
101           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
102           evt_type == SESSION_CTRL_EVT_RESET);
103   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
104 }
105
106 void
107 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
108                                       void *rpc_args)
109 {
110   session_send_evt_to_thread (fp, rpc_args, thread_index,
111                               SESSION_CTRL_EVT_RPC);
112 }
113
114 void
115 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
116 {
117   if (thread_index != vlib_get_thread_index ())
118     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
119   else
120     {
121       void (*fnp) (void *) = fp;
122       fnp (rpc_args);
123     }
124 }
125
126 void
127 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
128 {
129   session_t *s = session_get (tc->s_index, tc->thread_index);
130
131   ASSERT (s->thread_index == vlib_get_thread_index ());
132   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
133
134   if (!(s->flags & SESSION_F_CUSTOM_TX))
135     {
136       s->flags |= SESSION_F_CUSTOM_TX;
137       if (svm_fifo_set_event (s->tx_fifo)
138           || transport_connection_is_descheduled (tc))
139         {
140           session_evt_elt_t *elt;
141           session_worker_t *wrk;
142
143           wrk = session_main_get_worker (tc->thread_index);
144           if (has_prio)
145             elt = session_evt_alloc_new (wrk);
146           else
147             elt = session_evt_alloc_old (wrk);
148           elt->evt.session_index = tc->s_index;
149           elt->evt.event_type = SESSION_IO_EVT_TX;
150           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
151
152           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
153             vlib_node_set_interrupt_pending (wrk->vm,
154                                              session_queue_node.index);
155         }
156     }
157 }
158
159 void
160 sesssion_reschedule_tx (transport_connection_t * tc)
161 {
162   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
163   session_evt_elt_t *elt;
164
165   ASSERT (tc->thread_index == vlib_get_thread_index ());
166
167   elt = session_evt_alloc_new (wrk);
168   elt->evt.session_index = tc->s_index;
169   elt->evt.event_type = SESSION_IO_EVT_TX;
170
171   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
172     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
173 }
174
175 static void
176 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
177 {
178   u32 thread_index = vlib_get_thread_index ();
179   session_evt_elt_t *elt;
180   session_worker_t *wrk;
181
182   /* If we are in the handler thread, or being called with the worker barrier
183    * held, just append a new event to pending disconnects vector. */
184   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
185     {
186       wrk = session_main_get_worker (s->thread_index);
187       elt = session_evt_alloc_ctrl (wrk);
188       clib_memset (&elt->evt, 0, sizeof (session_event_t));
189       elt->evt.session_handle = session_handle (s);
190       elt->evt.event_type = evt;
191
192       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
193         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
194     }
195   else
196     session_send_ctrl_evt_to_thread (s, evt);
197 }
198
199 session_t *
200 session_alloc (u32 thread_index)
201 {
202   session_worker_t *wrk = &session_main.wrk[thread_index];
203   session_t *s;
204
205   pool_get_aligned_safe (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
206   clib_memset (s, 0, sizeof (*s));
207   s->session_index = s - wrk->sessions;
208   s->thread_index = thread_index;
209   s->app_index = APP_INVALID_INDEX;
210
211   return s;
212 }
213
214 void
215 session_free (session_t * s)
216 {
217   if (CLIB_DEBUG)
218     {
219       u8 thread_index = s->thread_index;
220       clib_memset (s, 0xFA, sizeof (*s));
221       pool_put (session_main.wrk[thread_index].sessions, s);
222       return;
223     }
224   SESSION_EVT (SESSION_EVT_FREE, s);
225   pool_put (session_main.wrk[s->thread_index].sessions, s);
226 }
227
228 u8
229 session_is_valid (u32 si, u8 thread_index)
230 {
231   session_t *s;
232   transport_connection_t *tc;
233
234   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
235
236   if (s->thread_index != thread_index || s->session_index != si)
237     return 0;
238
239   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
240       || s->session_state <= SESSION_STATE_LISTENING)
241     return 1;
242
243   if (s->session_state == SESSION_STATE_CONNECTING &&
244       (s->flags & SESSION_F_HALF_OPEN))
245     return 1;
246
247   tc = session_get_transport (s);
248   if (s->connection_index != tc->c_index
249       || s->thread_index != tc->thread_index || tc->s_index != si)
250     return 0;
251
252   return 1;
253 }
254
255 static void
256 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
257 {
258   app_worker_t *app_wrk;
259
260   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
261   if (!app_wrk)
262     return;
263   app_worker_cleanup_notify (app_wrk, s, ntf);
264 }
265
266 void
267 session_free_w_fifos (session_t * s)
268 {
269   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
270   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
271   session_free (s);
272 }
273
274 /**
275  * Cleans up session and lookup table.
276  *
277  * Transport connection must still be valid.
278  */
279 static void
280 session_delete (session_t * s)
281 {
282   int rv;
283
284   /* Delete from the main lookup table. */
285   if ((rv = session_lookup_del_session (s)))
286     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
287
288   session_free_w_fifos (s);
289 }
290
291 void
292 session_cleanup_half_open (session_handle_t ho_handle)
293 {
294   session_t *ho = session_get_from_handle (ho_handle);
295
296   /* App transports can migrate their half-opens */
297   if (ho->flags & SESSION_F_IS_MIGRATING)
298     {
299       /* Session still migrating, move to closed state to signal that the
300        * session should be removed. */
301       if (ho->connection_index == ~0)
302         {
303           ho->session_state = SESSION_STATE_CLOSED;
304           return;
305         }
306       /* Migrated transports are no longer half-opens */
307       transport_cleanup (session_get_transport_proto (ho),
308                          ho->connection_index, ho->app_index /* overloaded */);
309     }
310   else
311     transport_cleanup_half_open (session_get_transport_proto (ho),
312                                  ho->connection_index);
313   session_free (ho);
314 }
315
316 static void
317 session_half_open_free (session_t *ho)
318 {
319   app_worker_t *app_wrk;
320
321   ASSERT (vlib_get_thread_index () <= 1);
322   app_wrk = app_worker_get (ho->app_wrk_index);
323   app_worker_del_half_open (app_wrk, ho);
324   session_free (ho);
325 }
326
327 static void
328 session_half_open_free_rpc (void *args)
329 {
330   session_t *ho = ho_session_get (pointer_to_uword (args));
331   session_half_open_free (ho);
332 }
333
334 void
335 session_half_open_delete_notify (transport_connection_t *tc)
336 {
337   /* Notification from ctrl thread accepted without rpc */
338   if (!tc->thread_index)
339     {
340       session_half_open_free (ho_session_get (tc->s_index));
341     }
342   else
343     {
344       void *args = uword_to_pointer ((uword) tc->s_index, void *);
345       session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
346                                             args);
347     }
348 }
349
350 void
351 session_half_open_migrate_notify (transport_connection_t *tc)
352 {
353   session_t *ho;
354
355   ho = ho_session_get (tc->s_index);
356   ho->flags |= SESSION_F_IS_MIGRATING;
357   ho->connection_index = ~0;
358 }
359
360 int
361 session_half_open_migrated_notify (transport_connection_t *tc)
362 {
363   session_t *ho;
364
365   ho = ho_session_get (tc->s_index);
366
367   /* App probably detached so the half-open must be cleaned up */
368   if (ho->session_state == SESSION_STATE_CLOSED)
369     {
370       session_half_open_delete_notify (tc);
371       return -1;
372     }
373   ho->connection_index = tc->c_index;
374   /* Overload app index for half-open with new thread */
375   ho->app_index = tc->thread_index;
376   return 0;
377 }
378
379 session_t *
380 session_alloc_for_connection (transport_connection_t * tc)
381 {
382   session_t *s;
383   u32 thread_index = tc->thread_index;
384
385   ASSERT (thread_index == vlib_get_thread_index ()
386           || transport_protocol_is_cl (tc->proto));
387
388   s = session_alloc (thread_index);
389   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
390   s->session_state = SESSION_STATE_CLOSED;
391
392   /* Attach transport to session and vice versa */
393   s->connection_index = tc->c_index;
394   tc->s_index = s->session_index;
395   return s;
396 }
397
398 session_t *
399 session_alloc_for_half_open (transport_connection_t *tc)
400 {
401   session_t *s;
402
403   s = ho_session_alloc ();
404   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
405   s->connection_index = tc->c_index;
406   tc->s_index = s->session_index;
407   return s;
408 }
409
410 /**
411  * Discards bytes from buffer chain
412  *
413  * It discards n_bytes_to_drop starting at first buffer after chain_b
414  */
415 always_inline void
416 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
417                                      vlib_buffer_t ** chain_b,
418                                      u32 n_bytes_to_drop)
419 {
420   vlib_buffer_t *next = *chain_b;
421   u32 to_drop = n_bytes_to_drop;
422   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
423   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
424     {
425       next = vlib_get_buffer (vm, next->next_buffer);
426       if (next->current_length > to_drop)
427         {
428           vlib_buffer_advance (next, to_drop);
429           to_drop = 0;
430         }
431       else
432         {
433           to_drop -= next->current_length;
434           next->current_length = 0;
435         }
436     }
437   *chain_b = next;
438
439   if (to_drop == 0)
440     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
441 }
442
443 /**
444  * Enqueue buffer chain tail
445  */
446 always_inline int
447 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
448                             u32 offset, u8 is_in_order)
449 {
450   vlib_buffer_t *chain_b;
451   u32 chain_bi, len, diff;
452   vlib_main_t *vm = vlib_get_main ();
453   u8 *data;
454   u32 written = 0;
455   int rv = 0;
456
457   if (is_in_order && offset)
458     {
459       diff = offset - b->current_length;
460       if (diff > b->total_length_not_including_first_buffer)
461         return 0;
462       chain_b = b;
463       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
464       chain_bi = vlib_get_buffer_index (vm, chain_b);
465     }
466   else
467     chain_bi = b->next_buffer;
468
469   do
470     {
471       chain_b = vlib_get_buffer (vm, chain_bi);
472       data = vlib_buffer_get_current (chain_b);
473       len = chain_b->current_length;
474       if (!len)
475         continue;
476       if (is_in_order)
477         {
478           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
479           if (rv == len)
480             {
481               written += rv;
482             }
483           else if (rv < len)
484             {
485               return (rv > 0) ? (written + rv) : written;
486             }
487           else if (rv > len)
488             {
489               written += rv;
490
491               /* written more than what was left in chain */
492               if (written > b->total_length_not_including_first_buffer)
493                 return written;
494
495               /* drop the bytes that have already been delivered */
496               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
497             }
498         }
499       else
500         {
501           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
502           if (rv)
503             {
504               clib_warning ("failed to enqueue multi-buffer seg");
505               return -1;
506             }
507           offset += len;
508         }
509     }
510   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
511           ? chain_b->next_buffer : 0));
512
513   if (is_in_order)
514     return written;
515
516   return 0;
517 }
518
519 void
520 session_fifo_tuning (session_t * s, svm_fifo_t * f,
521                      session_ft_action_t act, u32 len)
522 {
523   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
524     {
525       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
526       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
527       if (CLIB_ASSERT_ENABLE)
528         {
529           segment_manager_t *sm;
530           sm = segment_manager_get (f->segment_manager);
531           ASSERT (f->shr->size >= 4096);
532           ASSERT (f->shr->size <= sm->max_fifo_size);
533         }
534     }
535 }
536
537 /*
538  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
539  * event but on request can queue notification events for later delivery by
540  * calling stream_server_flush_enqueue_events().
541  *
542  * @param tc Transport connection which is to be enqueued data
543  * @param b Buffer to be enqueued
544  * @param offset Offset at which to start enqueueing if out-of-order
545  * @param queue_event Flag to indicate if peer is to be notified or if event
546  *                    is to be queued. The former is useful when more data is
547  *                    enqueued and only one event is to be generated.
548  * @param is_in_order Flag to indicate if data is in order
549  * @return Number of bytes enqueued or a negative value if enqueueing failed.
550  */
551 int
552 session_enqueue_stream_connection (transport_connection_t * tc,
553                                    vlib_buffer_t * b, u32 offset,
554                                    u8 queue_event, u8 is_in_order)
555 {
556   session_t *s;
557   int enqueued = 0, rv, in_order_off;
558
559   s = session_get (tc->s_index, tc->thread_index);
560
561   if (is_in_order)
562     {
563       enqueued = svm_fifo_enqueue (s->rx_fifo,
564                                    b->current_length,
565                                    vlib_buffer_get_current (b));
566       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
567                          && enqueued >= 0))
568         {
569           in_order_off = enqueued > b->current_length ? enqueued : 0;
570           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
571           if (rv > 0)
572             enqueued += rv;
573         }
574     }
575   else
576     {
577       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
578                                          b->current_length,
579                                          vlib_buffer_get_current (b));
580       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
581         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
582       /* if something was enqueued, report even this as success for ooo
583        * segment handling */
584       return rv;
585     }
586
587   if (queue_event)
588     {
589       /* Queue RX event on this fifo. Eventually these will need to be flushed
590        * by calling stream_server_flush_enqueue_events () */
591       session_worker_t *wrk;
592
593       wrk = session_main_get_worker (s->thread_index);
594       if (!(s->flags & SESSION_F_RX_EVT))
595         {
596           s->flags |= SESSION_F_RX_EVT;
597           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
598         }
599
600       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
601     }
602
603   return enqueued;
604 }
605
606 int
607 session_enqueue_dgram_connection (session_t * s,
608                                   session_dgram_hdr_t * hdr,
609                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
610 {
611   int rv;
612
613   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
614           >= b->current_length + sizeof (*hdr));
615
616   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
617     {
618       /* *INDENT-OFF* */
619       svm_fifo_seg_t segs[2] = {
620           { (u8 *) hdr, sizeof (*hdr) },
621           { vlib_buffer_get_current (b), b->current_length }
622       };
623       /* *INDENT-ON* */
624
625       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
626                                       0 /* allow_partial */ );
627     }
628   else
629     {
630       vlib_main_t *vm = vlib_get_main ();
631       svm_fifo_seg_t *segs = 0, *seg;
632       vlib_buffer_t *it = b;
633       u32 n_segs = 1;
634
635       vec_add2 (segs, seg, 1);
636       seg->data = (u8 *) hdr;
637       seg->len = sizeof (*hdr);
638       while (it)
639         {
640           vec_add2 (segs, seg, 1);
641           seg->data = vlib_buffer_get_current (it);
642           seg->len = it->current_length;
643           n_segs++;
644           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
645             break;
646           it = vlib_get_buffer (vm, it->next_buffer);
647         }
648       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
649                                       0 /* allow partial */ );
650       vec_free (segs);
651     }
652
653   if (queue_event && rv > 0)
654     {
655       /* Queue RX event on this fifo. Eventually these will need to be flushed
656        * by calling stream_server_flush_enqueue_events () */
657       session_worker_t *wrk;
658
659       wrk = session_main_get_worker (s->thread_index);
660       if (!(s->flags & SESSION_F_RX_EVT))
661         {
662           s->flags |= SESSION_F_RX_EVT;
663           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
664         }
665
666       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
667     }
668   return rv > 0 ? rv : 0;
669 }
670
671 int
672 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
673                             u32 offset, u32 max_bytes)
674 {
675   session_t *s = session_get (tc->s_index, tc->thread_index);
676   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
677 }
678
679 u32
680 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
681 {
682   session_t *s = session_get (tc->s_index, tc->thread_index);
683   u32 rv;
684
685   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
686   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
687
688   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
689     session_dequeue_notify (s);
690
691   return rv;
692 }
693
694 static inline int
695 session_notify_subscribers (u32 app_index, session_t * s,
696                             svm_fifo_t * f, session_evt_type_t evt_type)
697 {
698   app_worker_t *app_wrk;
699   application_t *app;
700   int i;
701
702   app = application_get (app_index);
703   if (!app)
704     return -1;
705
706   for (i = 0; i < f->shr->n_subscribers; i++)
707     {
708       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
709       if (!app_wrk)
710         continue;
711       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
712         return -1;
713     }
714
715   return 0;
716 }
717
718 /**
719  * Notify session peer that new data has been enqueued.
720  *
721  * @param s     Stream session for which the event is to be generated.
722  * @param lock  Flag to indicate if call should lock message queue.
723  *
724  * @return 0 on success or negative number if failed to send notification.
725  */
726 static inline int
727 session_enqueue_notify_inline (session_t * s)
728 {
729   app_worker_t *app_wrk;
730   u32 session_index;
731   u8 n_subscribers;
732
733   session_index = s->session_index;
734   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
735
736   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
737   if (PREDICT_FALSE (!app_wrk))
738     {
739       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
740       return 0;
741     }
742
743   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
744
745   s->flags &= ~SESSION_F_RX_EVT;
746
747   /* Application didn't confirm accept yet */
748   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
749     return 0;
750
751   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
752                                                      SESSION_IO_EVT_RX)))
753     return -1;
754
755   if (PREDICT_FALSE (n_subscribers))
756     {
757       s = session_get (session_index, vlib_get_thread_index ());
758       return session_notify_subscribers (app_wrk->app_index, s,
759                                          s->rx_fifo, SESSION_IO_EVT_RX);
760     }
761
762   return 0;
763 }
764
765 int
766 session_enqueue_notify (session_t * s)
767 {
768   return session_enqueue_notify_inline (s);
769 }
770
771 static void
772 session_enqueue_notify_rpc (void *arg)
773 {
774   u32 session_index = pointer_to_uword (arg);
775   session_t *s;
776
777   s = session_get_if_valid (session_index, vlib_get_thread_index ());
778   if (!s)
779     return;
780
781   session_enqueue_notify (s);
782 }
783
784 /**
785  * Like session_enqueue_notify, but can be called from a thread that does not
786  * own the session.
787  */
788 void
789 session_enqueue_notify_thread (session_handle_t sh)
790 {
791   u32 thread_index = session_thread_from_handle (sh);
792   u32 session_index = session_index_from_handle (sh);
793
794   /*
795    * Pass session index (u32) as opposed to handle (u64) in case pointers
796    * are not 64-bit.
797    */
798   session_send_rpc_evt_to_thread (thread_index,
799                                   session_enqueue_notify_rpc,
800                                   uword_to_pointer (session_index, void *));
801 }
802
803 int
804 session_dequeue_notify (session_t * s)
805 {
806   app_worker_t *app_wrk;
807
808   svm_fifo_clear_deq_ntf (s->tx_fifo);
809
810   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
811   if (PREDICT_FALSE (!app_wrk))
812     return -1;
813
814   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
815                                                      SESSION_IO_EVT_TX)))
816     return -1;
817
818   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
819     return session_notify_subscribers (app_wrk->app_index, s,
820                                        s->tx_fifo, SESSION_IO_EVT_TX);
821
822   return 0;
823 }
824
825 /**
826  * Flushes queue of sessions that are to be notified of new data
827  * enqueued events.
828  *
829  * @param thread_index Thread index for which the flush is to be performed.
830  * @return 0 on success or a positive number indicating the number of
831  *         failures due to API queue being full.
832  */
833 int
834 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
835 {
836   session_worker_t *wrk = session_main_get_worker (thread_index);
837   session_t *s;
838   int i, errors = 0;
839   u32 *indices;
840
841   indices = wrk->session_to_enqueue[transport_proto];
842
843   for (i = 0; i < vec_len (indices); i++)
844     {
845       s = session_get_if_valid (indices[i], thread_index);
846       if (PREDICT_FALSE (!s))
847         {
848           errors++;
849           continue;
850         }
851
852       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
853                            0 /* TODO/not needed */ );
854
855       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
856         errors++;
857     }
858
859   vec_reset_length (indices);
860   wrk->session_to_enqueue[transport_proto] = indices;
861
862   return errors;
863 }
864
865 int
866 session_main_flush_all_enqueue_events (u8 transport_proto)
867 {
868   vlib_thread_main_t *vtm = vlib_get_thread_main ();
869   int i, errors = 0;
870   for (i = 0; i < 1 + vtm->n_threads; i++)
871     errors += session_main_flush_enqueue_events (transport_proto, i);
872   return errors;
873 }
874
875 int
876 session_stream_connect_notify (transport_connection_t * tc,
877                                session_error_t err)
878 {
879   u32 opaque = 0, new_ti, new_si;
880   app_worker_t *app_wrk;
881   session_t *s = 0, *ho;
882
883   /*
884    * Cleanup half-open table
885    */
886   session_lookup_del_half_open (tc);
887
888   ho = ho_session_get (tc->s_index);
889   opaque = ho->opaque;
890   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
891   if (!app_wrk)
892     return -1;
893
894   if (err)
895     return app_worker_connect_notify (app_wrk, s, err, opaque);
896
897   s = session_alloc_for_connection (tc);
898   s->session_state = SESSION_STATE_CONNECTING;
899   s->app_wrk_index = app_wrk->wrk_index;
900   new_si = s->session_index;
901   new_ti = s->thread_index;
902
903   if ((err = app_worker_init_connected (app_wrk, s)))
904     {
905       session_free (s);
906       app_worker_connect_notify (app_wrk, 0, err, opaque);
907       return -1;
908     }
909
910   s = session_get (new_si, new_ti);
911   s->session_state = SESSION_STATE_READY;
912   session_lookup_add_connection (tc, session_handle (s));
913
914   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
915     {
916       session_lookup_del_connection (tc);
917       /* Avoid notifying app about rejected session cleanup */
918       s = session_get (new_si, new_ti);
919       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
920       session_free (s);
921       return -1;
922     }
923
924   return 0;
925 }
926
927 typedef union session_switch_pool_reply_args_
928 {
929   struct
930   {
931     u32 session_index;
932     u16 thread_index;
933     u8 is_closed;
934   };
935   u64 as_u64;
936 } session_switch_pool_reply_args_t;
937
938 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
939                "switch pool reply args size");
940
941 static void
942 session_switch_pool_reply (void *arg)
943 {
944   session_switch_pool_reply_args_t rargs;
945   session_t *s;
946
947   rargs.as_u64 = pointer_to_uword (arg);
948   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
949   if (!s)
950     return;
951
952   /* Session closed during migration. Clean everything up */
953   if (rargs.is_closed)
954     {
955       transport_cleanup (session_get_transport_proto (s), s->connection_index,
956                          s->thread_index);
957       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
958       session_free (s);
959       return;
960     }
961
962   /* Notify app that it has data on the new session */
963   session_enqueue_notify (s);
964 }
965
966 typedef struct _session_switch_pool_args
967 {
968   u32 session_index;
969   u32 thread_index;
970   u32 new_thread_index;
971   u32 new_session_index;
972 } session_switch_pool_args_t;
973
974 /**
975  * Notify old thread of the session pool switch
976  */
977 static void
978 session_switch_pool (void *cb_args)
979 {
980   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
981   session_switch_pool_reply_args_t rargs;
982   session_handle_t new_sh;
983   segment_manager_t *sm;
984   app_worker_t *app_wrk;
985   session_t *s;
986
987   ASSERT (args->thread_index == vlib_get_thread_index ());
988   s = session_get (args->session_index, args->thread_index);
989
990   /* Check if session closed during migration */
991   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
992
993   transport_cleanup (session_get_transport_proto (s), s->connection_index,
994                      s->thread_index);
995
996   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
997   if (app_wrk)
998     {
999       /* Cleanup fifo segment slice state for fifos */
1000       sm = app_worker_get_connect_segment_manager (app_wrk);
1001       segment_manager_detach_fifo (sm, &s->rx_fifo);
1002       segment_manager_detach_fifo (sm, &s->tx_fifo);
1003
1004       /* Notify app, using old session, about the migration event */
1005       if (!rargs.is_closed)
1006         {
1007           new_sh = session_make_handle (args->new_session_index,
1008                                         args->new_thread_index);
1009           app_worker_migrate_notify (app_wrk, s, new_sh);
1010         }
1011     }
1012
1013   /* Trigger app read and fifo updates on the new thread */
1014   rargs.session_index = args->new_session_index;
1015   rargs.thread_index = args->new_thread_index;
1016   session_send_rpc_evt_to_thread (args->new_thread_index,
1017                                   session_switch_pool_reply,
1018                                   uword_to_pointer (rargs.as_u64, void *));
1019
1020   session_free (s);
1021   clib_mem_free (cb_args);
1022 }
1023
1024 /**
1025  * Move dgram session to the right thread
1026  */
1027 int
1028 session_dgram_connect_notify (transport_connection_t * tc,
1029                               u32 old_thread_index, session_t ** new_session)
1030 {
1031   session_t *new_s;
1032   session_switch_pool_args_t *rpc_args;
1033   segment_manager_t *sm;
1034   app_worker_t *app_wrk;
1035
1036   /*
1037    * Clone half-open session to the right thread.
1038    */
1039   new_s = session_clone_safe (tc->s_index, old_thread_index);
1040   new_s->connection_index = tc->c_index;
1041   new_s->session_state = SESSION_STATE_READY;
1042   new_s->flags |= SESSION_F_IS_MIGRATING;
1043
1044   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1045     session_lookup_add_connection (tc, session_handle (new_s));
1046
1047   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1048   if (app_wrk)
1049     {
1050       /* New set of fifos attached to the same shared memory */
1051       sm = app_worker_get_connect_segment_manager (app_wrk);
1052       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1053       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1054     }
1055
1056   /*
1057    * Ask thread owning the old session to clean it up and make us the tx
1058    * fifo owner
1059    */
1060   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1061   rpc_args->new_session_index = new_s->session_index;
1062   rpc_args->new_thread_index = new_s->thread_index;
1063   rpc_args->session_index = tc->s_index;
1064   rpc_args->thread_index = old_thread_index;
1065   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1066                                   rpc_args);
1067
1068   tc->s_index = new_s->session_index;
1069   new_s->connection_index = tc->c_index;
1070   *new_session = new_s;
1071   return 0;
1072 }
1073
1074 /**
1075  * Notification from transport that connection is being closed.
1076  *
1077  * A disconnect is sent to application but state is not removed. Once
1078  * disconnect is acknowledged by application, session disconnect is called.
1079  * Ultimately this leads to close being called on transport (passive close).
1080  */
1081 void
1082 session_transport_closing_notify (transport_connection_t * tc)
1083 {
1084   app_worker_t *app_wrk;
1085   session_t *s;
1086
1087   s = session_get (tc->s_index, tc->thread_index);
1088   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1089     return;
1090
1091   /* Wait for reply from app before sending notification as the
1092    * accept might be rejected */
1093   if (s->session_state == SESSION_STATE_ACCEPTING)
1094     {
1095       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1096       return;
1097     }
1098
1099   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1100   app_wrk = app_worker_get (s->app_wrk_index);
1101   app_worker_close_notify (app_wrk, s);
1102 }
1103
1104 /**
1105  * Notification from transport that connection is being deleted
1106  *
1107  * This removes the session if it is still valid. It should be called only on
1108  * previously fully established sessions. For instance failed connects should
1109  * call stream_session_connect_notify and indicate that the connect has
1110  * failed.
1111  */
1112 void
1113 session_transport_delete_notify (transport_connection_t * tc)
1114 {
1115   session_t *s;
1116
1117   /* App might've been removed already */
1118   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1119     return;
1120
1121   switch (s->session_state)
1122     {
1123     case SESSION_STATE_CREATED:
1124       /* Session was created but accept notification was not yet sent to the
1125        * app. Cleanup everything. */
1126       session_lookup_del_session (s);
1127       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1128       session_free (s);
1129       break;
1130     case SESSION_STATE_ACCEPTING:
1131     case SESSION_STATE_TRANSPORT_CLOSING:
1132     case SESSION_STATE_CLOSING:
1133     case SESSION_STATE_TRANSPORT_CLOSED:
1134       /* If transport finishes or times out before we get a reply
1135        * from the app, mark transport as closed and wait for reply
1136        * before removing the session. Cleanup session table in advance
1137        * because transport will soon be closed and closed sessions
1138        * are assumed to have been removed from the lookup table */
1139       session_lookup_del_session (s);
1140       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1141       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1142       svm_fifo_dequeue_drop_all (s->tx_fifo);
1143       break;
1144     case SESSION_STATE_APP_CLOSED:
1145       /* Cleanup lookup table as transport needs to still be valid.
1146        * Program transport close to ensure that all session events
1147        * have been cleaned up. Once transport close is called, the
1148        * session is just removed because both transport and app have
1149        * confirmed the close*/
1150       session_lookup_del_session (s);
1151       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1152       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1153       svm_fifo_dequeue_drop_all (s->tx_fifo);
1154       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1155       break;
1156     case SESSION_STATE_TRANSPORT_DELETED:
1157       break;
1158     case SESSION_STATE_CLOSED:
1159       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1160       session_delete (s);
1161       break;
1162     default:
1163       clib_warning ("session state %u", s->session_state);
1164       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1165       session_delete (s);
1166       break;
1167     }
1168 }
1169
1170 /**
1171  * Notification from transport that it is closed
1172  *
1173  * Should be called by transport, prior to calling delete notify, once it
1174  * knows that no more data will be exchanged. This could serve as an
1175  * early acknowledgment of an active close especially if transport delete
1176  * can be delayed a long time, e.g., tcp time-wait.
1177  */
1178 void
1179 session_transport_closed_notify (transport_connection_t * tc)
1180 {
1181   app_worker_t *app_wrk;
1182   session_t *s;
1183
1184   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1185     return;
1186
1187   /* Transport thinks that app requested close but it actually didn't.
1188    * Can happen for tcp:
1189    * 1)if fin and rst are received in close succession.
1190    * 2)if app shutdown the connection.  */
1191   if (s->session_state == SESSION_STATE_READY)
1192     {
1193       session_transport_closing_notify (tc);
1194       svm_fifo_dequeue_drop_all (s->tx_fifo);
1195       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1196     }
1197   /* If app close has not been received or has not yet resulted in
1198    * a transport close, only mark the session transport as closed */
1199   else if (s->session_state <= SESSION_STATE_CLOSING)
1200     {
1201       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1202     }
1203   /* If app also closed, switch to closed */
1204   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1205     s->session_state = SESSION_STATE_CLOSED;
1206
1207   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1208   if (app_wrk)
1209     app_worker_transport_closed_notify (app_wrk, s);
1210 }
1211
1212 /**
1213  * Notify application that connection has been reset.
1214  */
1215 void
1216 session_transport_reset_notify (transport_connection_t * tc)
1217 {
1218   app_worker_t *app_wrk;
1219   session_t *s;
1220
1221   s = session_get (tc->s_index, tc->thread_index);
1222   svm_fifo_dequeue_drop_all (s->tx_fifo);
1223   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1224     return;
1225   if (s->session_state == SESSION_STATE_ACCEPTING)
1226     {
1227       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1228       return;
1229     }
1230   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1231   app_wrk = app_worker_get (s->app_wrk_index);
1232   app_worker_reset_notify (app_wrk, s);
1233 }
1234
1235 int
1236 session_stream_accept_notify (transport_connection_t * tc)
1237 {
1238   app_worker_t *app_wrk;
1239   session_t *s;
1240
1241   s = session_get (tc->s_index, tc->thread_index);
1242   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1243   if (!app_wrk)
1244     return -1;
1245   if (s->session_state != SESSION_STATE_CREATED)
1246     return 0;
1247   s->session_state = SESSION_STATE_ACCEPTING;
1248   if (app_worker_accept_notify (app_wrk, s))
1249     {
1250       /* On transport delete, no notifications should be sent. Unless, the
1251        * accept is retried and successful. */
1252       s->session_state = SESSION_STATE_CREATED;
1253       return -1;
1254     }
1255   return 0;
1256 }
1257
1258 /**
1259  * Accept a stream session. Optionally ping the server by callback.
1260  */
1261 int
1262 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1263                        u32 thread_index, u8 notify)
1264 {
1265   session_t *s;
1266   int rv;
1267
1268   s = session_alloc_for_connection (tc);
1269   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1270   s->session_state = SESSION_STATE_CREATED;
1271
1272   if ((rv = app_worker_init_accepted (s)))
1273     {
1274       session_free (s);
1275       return rv;
1276     }
1277
1278   session_lookup_add_connection (tc, session_handle (s));
1279
1280   /* Shoulder-tap the server */
1281   if (notify)
1282     {
1283       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1284       if ((rv = app_worker_accept_notify (app_wrk, s)))
1285         {
1286           session_lookup_del_session (s);
1287           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1288           session_free (s);
1289           return rv;
1290         }
1291     }
1292
1293   return 0;
1294 }
1295
1296 int
1297 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1298                       u32 thread_index)
1299 {
1300   app_worker_t *app_wrk;
1301   session_t *s;
1302   int rv;
1303
1304   s = session_alloc_for_connection (tc);
1305   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1306
1307   if ((rv = app_worker_init_accepted (s)))
1308     {
1309       session_free (s);
1310       return rv;
1311     }
1312
1313   session_lookup_add_connection (tc, session_handle (s));
1314
1315   app_wrk = app_worker_get (s->app_wrk_index);
1316   if ((rv = app_worker_accept_notify (app_wrk, s)))
1317     {
1318       session_lookup_del_session (s);
1319       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1320       session_free (s);
1321       return rv;
1322     }
1323
1324   return 0;
1325 }
1326
1327 int
1328 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1329 {
1330   transport_connection_t *tc;
1331   transport_endpoint_cfg_t *tep;
1332   app_worker_t *app_wrk;
1333   session_handle_t sh;
1334   session_t *s;
1335   int rv;
1336
1337   tep = session_endpoint_to_transport_cfg (rmt);
1338   rv = transport_connect (rmt->transport_proto, tep);
1339   if (rv < 0)
1340     {
1341       SESSION_DBG ("Transport failed to open connection.");
1342       return rv;
1343     }
1344
1345   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1346
1347   /* For dgram type of service, allocate session and fifos now */
1348   app_wrk = app_worker_get (rmt->app_wrk_index);
1349   s = session_alloc_for_connection (tc);
1350   s->app_wrk_index = app_wrk->wrk_index;
1351   s->session_state = SESSION_STATE_OPENED;
1352   if (app_worker_init_connected (app_wrk, s))
1353     {
1354       session_free (s);
1355       return -1;
1356     }
1357
1358   sh = session_handle (s);
1359   *rsh = sh;
1360
1361   session_lookup_add_connection (tc, sh);
1362   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1363 }
1364
1365 int
1366 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1367 {
1368   transport_connection_t *tc;
1369   transport_endpoint_cfg_t *tep;
1370   app_worker_t *app_wrk;
1371   session_t *ho;
1372   int rv;
1373
1374   tep = session_endpoint_to_transport_cfg (rmt);
1375   rv = transport_connect (rmt->transport_proto, tep);
1376   if (rv < 0)
1377     {
1378       SESSION_DBG ("Transport failed to open connection.");
1379       return rv;
1380     }
1381
1382   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1383
1384   app_wrk = app_worker_get (rmt->app_wrk_index);
1385
1386   /* If transport offers a vc service, only allocate established
1387    * session once the connection has been established.
1388    * In the meantime allocate half-open session for tracking purposes
1389    * associate half-open connection to it and add session to app-worker
1390    * half-open table. These are needed to allocate the established
1391    * session on transport notification, and to cleanup the half-open
1392    * session if the app detaches before connection establishment.
1393    */
1394   ho = session_alloc_for_half_open (tc);
1395   ho->app_wrk_index = app_wrk->wrk_index;
1396   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1397   ho->opaque = rmt->opaque;
1398   *rsh = session_handle (ho);
1399
1400   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1401     session_lookup_add_half_open (tc, tc->c_index);
1402
1403   return 0;
1404 }
1405
1406 int
1407 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1408 {
1409   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1410
1411   /* Not supported for now */
1412   *rsh = SESSION_INVALID_HANDLE;
1413   return transport_connect (rmt->transport_proto, tep_cfg);
1414 }
1415
1416 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1417                                         session_handle_t *);
1418
1419 /* *INDENT-OFF* */
1420 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1421   session_open_vc,
1422   session_open_cl,
1423   session_open_app,
1424 };
1425 /* *INDENT-ON* */
1426
1427 /**
1428  * Ask transport to open connection to remote transport endpoint.
1429  *
1430  * Stores handle for matching request with reply since the call can be
1431  * asynchronous. For instance, for TCP the 3-way handshake must complete
1432  * before reply comes. Session is only created once connection is established.
1433  *
1434  * @param app_index Index of the application requesting the connect
1435  * @param st Session type requested.
1436  * @param tep Remote transport endpoint
1437  * @param opaque Opaque data (typically, api_context) the application expects
1438  *               on open completion.
1439  */
1440 int
1441 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1442 {
1443   transport_service_type_t tst;
1444   tst = transport_protocol_service_type (rmt->transport_proto);
1445   return session_open_srv_fns[tst](rmt, rsh);
1446 }
1447
1448 /**
1449  * Ask transport to listen on session endpoint.
1450  *
1451  * @param s Session for which listen will be called. Note that unlike
1452  *          established sessions, listen sessions are not associated to a
1453  *          thread.
1454  * @param sep Local endpoint to be listened on.
1455  */
1456 int
1457 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1458 {
1459   transport_endpoint_cfg_t *tep;
1460   int tc_index;
1461   u32 s_index;
1462
1463   /* Transport bind/listen */
1464   tep = session_endpoint_to_transport_cfg (sep);
1465   s_index = ls->session_index;
1466   tc_index = transport_start_listen (session_get_transport_proto (ls),
1467                                      s_index, tep);
1468
1469   if (tc_index < 0)
1470     return tc_index;
1471
1472   /* Attach transport to session. Lookup tables are populated by the app
1473    * worker because local tables (for ct sessions) are not backed by a fib */
1474   ls = listen_session_get (s_index);
1475   ls->connection_index = tc_index;
1476
1477   return 0;
1478 }
1479
1480 /**
1481  * Ask transport to stop listening on local transport endpoint.
1482  *
1483  * @param s Session to stop listening on. It must be in state LISTENING.
1484  */
1485 int
1486 session_stop_listen (session_t * s)
1487 {
1488   transport_proto_t tp = session_get_transport_proto (s);
1489   transport_connection_t *tc;
1490
1491   if (s->session_state != SESSION_STATE_LISTENING)
1492     return SESSION_E_NOLISTEN;
1493
1494   tc = transport_get_listener (tp, s->connection_index);
1495
1496   /* If no transport, assume everything was cleaned up already */
1497   if (!tc)
1498     return SESSION_E_NONE;
1499
1500   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1501     session_lookup_del_connection (tc);
1502
1503   transport_stop_listen (tp, s->connection_index);
1504   return 0;
1505 }
1506
1507 /**
1508  * Initialize session half-closing procedure.
1509  *
1510  * Note that half-closing will not change the state of the session.
1511  */
1512 void
1513 session_half_close (session_t *s)
1514 {
1515   if (!s)
1516     return;
1517
1518   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1519 }
1520
1521 /**
1522  * Initialize session closing procedure.
1523  *
1524  * Request is always sent to session node to ensure that all outstanding
1525  * requests are served before transport is notified.
1526  */
1527 void
1528 session_close (session_t * s)
1529 {
1530   if (!s)
1531     return;
1532
1533   if (s->session_state >= SESSION_STATE_CLOSING)
1534     {
1535       /* Session will only be removed once both app and transport
1536        * acknowledge the close */
1537       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1538           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1539         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1540       return;
1541     }
1542
1543   /* App closed so stop propagating dequeue notifications */
1544   svm_fifo_clear_deq_ntf (s->tx_fifo);
1545   s->session_state = SESSION_STATE_CLOSING;
1546   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1547 }
1548
1549 /**
1550  * Force a close without waiting for data to be flushed
1551  */
1552 void
1553 session_reset (session_t * s)
1554 {
1555   if (s->session_state >= SESSION_STATE_CLOSING)
1556     return;
1557   /* Drop all outstanding tx data */
1558   svm_fifo_dequeue_drop_all (s->tx_fifo);
1559   s->session_state = SESSION_STATE_CLOSING;
1560   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1561 }
1562
1563 /**
1564  * Notify transport the session can be half-disconnected.
1565  *
1566  * Must be called from the session's thread.
1567  */
1568 void
1569 session_transport_half_close (session_t *s)
1570 {
1571   /* Only READY session can be half-closed */
1572   if (s->session_state != SESSION_STATE_READY)
1573     {
1574       return;
1575     }
1576
1577   transport_half_close (session_get_transport_proto (s), s->connection_index,
1578                         s->thread_index);
1579 }
1580
1581 /**
1582  * Notify transport the session can be disconnected. This should eventually
1583  * result in a delete notification that allows us to cleanup session state.
1584  * Called for both active/passive disconnects.
1585  *
1586  * Must be called from the session's thread.
1587  */
1588 void
1589 session_transport_close (session_t * s)
1590 {
1591   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1592     {
1593       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1594         s->session_state = SESSION_STATE_CLOSED;
1595       /* If transport is already deleted, just free the session */
1596       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1597         session_free_w_fifos (s);
1598       return;
1599     }
1600
1601   /* If the tx queue wasn't drained, the transport can continue to try
1602    * sending the outstanding data (in closed state it cannot). It MUST however
1603    * at one point, either after sending everything or after a timeout, call
1604    * delete notify. This will finally lead to the complete cleanup of the
1605    * session.
1606    */
1607   s->session_state = SESSION_STATE_APP_CLOSED;
1608
1609   transport_close (session_get_transport_proto (s), s->connection_index,
1610                    s->thread_index);
1611 }
1612
1613 /**
1614  * Force transport close
1615  */
1616 void
1617 session_transport_reset (session_t * s)
1618 {
1619   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1620     {
1621       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1622         s->session_state = SESSION_STATE_CLOSED;
1623       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1624         session_free_w_fifos (s);
1625       return;
1626     }
1627
1628   s->session_state = SESSION_STATE_APP_CLOSED;
1629   transport_reset (session_get_transport_proto (s), s->connection_index,
1630                    s->thread_index);
1631 }
1632
1633 /**
1634  * Cleanup transport and session state.
1635  *
1636  * Notify transport of the cleanup and free the session. This should
1637  * be called only if transport reported some error and is already
1638  * closed.
1639  */
1640 void
1641 session_transport_cleanup (session_t * s)
1642 {
1643   /* Delete from main lookup table before we axe the the transport */
1644   session_lookup_del_session (s);
1645   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1646     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1647                        s->thread_index);
1648   /* Since we called cleanup, no delete notification will come. So, make
1649    * sure the session is properly freed. */
1650   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1651   session_free (s);
1652 }
1653
1654 /**
1655  * Allocate worker mqs in share-able segment
1656  *
1657  * That can only be a newly created memfd segment, that must be mapped
1658  * by all apps/stack users unless private rx mqs are enabled.
1659  */
1660 void
1661 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1662 {
1663   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1664   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1665   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1666   uword mqs_seg_size;
1667   int i;
1668
1669   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1670
1671   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1672     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1673   };
1674   cfg->consumer_pid = 0;
1675   cfg->n_rings = 2;
1676   cfg->q_nitems = mq_q_length;
1677   cfg->ring_cfgs = rc;
1678
1679   /*
1680    * Compute mqs segment size based on rings config and leave space
1681    * for passing extended configuration messages, i.e., data allocated
1682    * outside of the rings. If provided with a config value, accept it
1683    * if larger than minimum size.
1684    */
1685   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1686   mqs_seg_size = mqs_seg_size + (32 << 10);
1687   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1688
1689   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1690   mqs_seg->ssvm.my_pid = getpid ();
1691   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1692
1693   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1694     {
1695       clib_warning ("failed to initialize queue segment");
1696       return;
1697     }
1698
1699   fifo_segment_init (mqs_seg);
1700
1701   /* Special fifo segment that's filled only with mqs */
1702   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1703
1704   for (i = 0; i < vec_len (smm->wrk); i++)
1705     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1706 }
1707
1708 fifo_segment_t *
1709 session_main_get_wrk_mqs_segment (void)
1710 {
1711   return &session_main.wrk_mqs_segment;
1712 }
1713
1714 u64
1715 session_segment_handle (session_t * s)
1716 {
1717   svm_fifo_t *f;
1718
1719   if (!s->rx_fifo)
1720     return SESSION_INVALID_HANDLE;
1721
1722   f = s->rx_fifo;
1723   return segment_manager_make_segment_handle (f->segment_manager,
1724                                               f->segment_index);
1725 }
1726
1727 /* *INDENT-OFF* */
1728 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1729     session_tx_fifo_peek_and_snd,
1730     session_tx_fifo_dequeue_and_snd,
1731     session_tx_fifo_dequeue_internal,
1732     session_tx_fifo_dequeue_and_snd
1733 };
1734 /* *INDENT-ON* */
1735
1736 void
1737 session_register_transport (transport_proto_t transport_proto,
1738                             const transport_proto_vft_t * vft, u8 is_ip4,
1739                             u32 output_node)
1740 {
1741   session_main_t *smm = &session_main;
1742   session_type_t session_type;
1743   u32 next_index = ~0;
1744
1745   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1746
1747   vec_validate (smm->session_type_to_next, session_type);
1748   vec_validate (smm->session_tx_fns, session_type);
1749
1750   if (output_node != ~0)
1751     next_index = vlib_node_add_next (vlib_get_main (),
1752                                      session_queue_node.index, output_node);
1753
1754   smm->session_type_to_next[session_type] = next_index;
1755   smm->session_tx_fns[session_type] =
1756     session_tx_fns[vft->transport_options.tx_type];
1757 }
1758
1759 void
1760 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1761 {
1762   session_main_t *smm = &session_main;
1763   session_update_time_fn *fi;
1764   u32 fi_pos = ~0;
1765   u8 found = 0;
1766
1767   vec_foreach (fi, smm->update_time_fns)
1768     {
1769       if (*fi == fn)
1770         {
1771           fi_pos = fi - smm->update_time_fns;
1772           found = 1;
1773           break;
1774         }
1775     }
1776
1777   if (is_add)
1778     {
1779       if (found)
1780         {
1781           clib_warning ("update time fn %p already registered", fn);
1782           return;
1783         }
1784       vec_add1 (smm->update_time_fns, fn);
1785     }
1786   else
1787     {
1788       vec_del1 (smm->update_time_fns, fi_pos);
1789     }
1790 }
1791
1792 transport_proto_t
1793 session_add_transport_proto (void)
1794 {
1795   session_main_t *smm = &session_main;
1796   session_worker_t *wrk;
1797   u32 thread;
1798
1799   smm->last_transport_proto_type += 1;
1800
1801   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1802     {
1803       wrk = session_main_get_worker (thread);
1804       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1805     }
1806
1807   return smm->last_transport_proto_type;
1808 }
1809
1810 transport_connection_t *
1811 session_get_transport (session_t * s)
1812 {
1813   if (s->session_state != SESSION_STATE_LISTENING)
1814     return transport_get_connection (session_get_transport_proto (s),
1815                                      s->connection_index, s->thread_index);
1816   else
1817     return transport_get_listener (session_get_transport_proto (s),
1818                                    s->connection_index);
1819 }
1820
1821 void
1822 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1823 {
1824   if (s->session_state != SESSION_STATE_LISTENING)
1825     return transport_get_endpoint (session_get_transport_proto (s),
1826                                    s->connection_index, s->thread_index, tep,
1827                                    is_lcl);
1828   else
1829     return transport_get_listener_endpoint (session_get_transport_proto (s),
1830                                             s->connection_index, tep, is_lcl);
1831 }
1832
1833 int
1834 session_transport_attribute (session_t *s, u8 is_get,
1835                              transport_endpt_attr_t *attr)
1836 {
1837   if (s->session_state < SESSION_STATE_READY)
1838     return -1;
1839
1840   return transport_connection_attribute (session_get_transport_proto (s),
1841                                          s->connection_index, s->thread_index,
1842                                          is_get, attr);
1843 }
1844
1845 transport_connection_t *
1846 listen_session_get_transport (session_t * s)
1847 {
1848   return transport_get_listener (session_get_transport_proto (s),
1849                                  s->connection_index);
1850 }
1851
1852 void
1853 session_queue_run_on_main_thread (vlib_main_t * vm)
1854 {
1855   ASSERT (vlib_get_thread_index () == 0);
1856   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1857 }
1858
1859 static clib_error_t *
1860 session_manager_main_enable (vlib_main_t * vm)
1861 {
1862   session_main_t *smm = &session_main;
1863   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1864   u32 num_threads, preallocated_sessions_per_worker;
1865   session_worker_t *wrk;
1866   int i;
1867
1868   /* We only initialize once and do not de-initialized on disable */
1869   if (smm->is_initialized)
1870     goto done;
1871
1872   num_threads = 1 /* main thread */  + vtm->n_threads;
1873
1874   if (num_threads < 1)
1875     return clib_error_return (0, "n_thread_stacks not set");
1876
1877   /* Allocate cache line aligned worker contexts */
1878   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1879   clib_spinlock_init (&session_main.pool_realloc_lock);
1880
1881   for (i = 0; i < num_threads; i++)
1882     {
1883       wrk = &smm->wrk[i];
1884       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1885       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1886       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1887       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1888       wrk->evts_pending_main =
1889         clib_llist_make_head (wrk->event_elts, evt_list);
1890       wrk->vm = vlib_get_main_by_index (i);
1891       wrk->last_vlib_time = vlib_time_now (vm);
1892       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1893       wrk->timerfd = -1;
1894       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1895
1896       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1897         session_wrk_enable_adaptive_mode (wrk);
1898     }
1899
1900   /* Allocate vpp event queues segment and queue */
1901   session_vpp_wrk_mqs_alloc (smm);
1902
1903   /* Initialize segment manager properties */
1904   segment_manager_main_init ();
1905
1906   /* Preallocate sessions */
1907   if (smm->preallocated_sessions)
1908     {
1909       if (num_threads == 1)
1910         {
1911           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1912         }
1913       else
1914         {
1915           int j;
1916           preallocated_sessions_per_worker =
1917             (1.1 * (f64) smm->preallocated_sessions /
1918              (f64) (num_threads - 1));
1919
1920           for (j = 1; j < num_threads; j++)
1921             {
1922               pool_init_fixed (smm->wrk[j].sessions,
1923                                preallocated_sessions_per_worker);
1924             }
1925         }
1926     }
1927
1928   session_lookup_init ();
1929   app_namespaces_init ();
1930   transport_init ();
1931   smm->is_initialized = 1;
1932
1933 done:
1934
1935   smm->is_enabled = 1;
1936
1937   /* Enable transports */
1938   transport_enable_disable (vm, 1);
1939   session_debug_init ();
1940
1941   return 0;
1942 }
1943
1944 static void
1945 session_manager_main_disable (vlib_main_t * vm)
1946 {
1947   transport_enable_disable (vm, 0 /* is_en */ );
1948 }
1949
1950 void
1951 session_node_enable_disable (u8 is_en)
1952 {
1953   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
1954   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1955   session_main_t *sm = &session_main;
1956   vlib_main_t *vm;
1957   vlib_node_t *n;
1958   int n_vlibs, i;
1959
1960   n_vlibs = vlib_get_n_threads ();
1961   for (i = 0; i < n_vlibs; i++)
1962     {
1963       vm = vlib_get_main_by_index (i);
1964       /* main thread with workers and not polling */
1965       if (i == 0 && n_vlibs > 1)
1966         {
1967           vlib_node_set_state (vm, session_queue_node.index, mstate);
1968           if (is_en)
1969             {
1970               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
1971               vlib_node_set_state (vm, session_queue_process_node.index,
1972                                    state);
1973               n = vlib_get_node (vm, session_queue_process_node.index);
1974               vlib_start_process (vm, n->runtime_index);
1975             }
1976           else
1977             {
1978               vlib_process_signal_event_mt (vm,
1979                                             session_queue_process_node.index,
1980                                             SESSION_Q_PROCESS_STOP, 0);
1981             }
1982           if (!sm->poll_main)
1983             continue;
1984         }
1985       vlib_node_set_state (vm, session_queue_node.index, state);
1986     }
1987
1988   if (sm->use_private_rx_mqs)
1989     application_enable_rx_mqs_nodes (is_en);
1990 }
1991
1992 clib_error_t *
1993 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1994 {
1995   clib_error_t *error = 0;
1996   if (is_en)
1997     {
1998       if (session_main.is_enabled)
1999         return 0;
2000
2001       error = session_manager_main_enable (vm);
2002       session_node_enable_disable (is_en);
2003     }
2004   else
2005     {
2006       session_main.is_enabled = 0;
2007       session_manager_main_disable (vm);
2008       session_node_enable_disable (is_en);
2009     }
2010
2011   return error;
2012 }
2013
2014 clib_error_t *
2015 session_main_init (vlib_main_t * vm)
2016 {
2017   session_main_t *smm = &session_main;
2018
2019   smm->is_enabled = 0;
2020   smm->session_enable_asap = 0;
2021   smm->poll_main = 0;
2022   smm->use_private_rx_mqs = 0;
2023   smm->no_adaptive = 0;
2024   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2025
2026   return 0;
2027 }
2028
2029 static clib_error_t *
2030 session_main_loop_init (vlib_main_t * vm)
2031 {
2032   session_main_t *smm = &session_main;
2033   if (smm->session_enable_asap)
2034     {
2035       vlib_worker_thread_barrier_sync (vm);
2036       vnet_session_enable_disable (vm, 1 /* is_en */ );
2037       vlib_worker_thread_barrier_release (vm);
2038     }
2039   return 0;
2040 }
2041
2042 VLIB_INIT_FUNCTION (session_main_init);
2043 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2044
2045 static clib_error_t *
2046 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2047 {
2048   session_main_t *smm = &session_main;
2049   u32 nitems;
2050   uword tmp;
2051
2052   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2053     {
2054       if (unformat (input, "wrk-mq-length %d", &nitems))
2055         {
2056           if (nitems >= 2048)
2057             smm->configured_wrk_mq_length = nitems;
2058           else
2059             clib_warning ("event queue length %d too small, ignored", nitems);
2060         }
2061       else if (unformat (input, "preallocated-sessions %d",
2062                          &smm->preallocated_sessions))
2063         ;
2064       else if (unformat (input, "v4-session-table-buckets %d",
2065                          &smm->configured_v4_session_table_buckets))
2066         ;
2067       else if (unformat (input, "v4-halfopen-table-buckets %d",
2068                          &smm->configured_v4_halfopen_table_buckets))
2069         ;
2070       else if (unformat (input, "v6-session-table-buckets %d",
2071                          &smm->configured_v6_session_table_buckets))
2072         ;
2073       else if (unformat (input, "v6-halfopen-table-buckets %d",
2074                          &smm->configured_v6_halfopen_table_buckets))
2075         ;
2076       else if (unformat (input, "v4-session-table-memory %U",
2077                          unformat_memory_size, &tmp))
2078         {
2079           if (tmp >= 0x100000000)
2080             return clib_error_return (0, "memory size %llx (%lld) too large",
2081                                       tmp, tmp);
2082           smm->configured_v4_session_table_memory = tmp;
2083         }
2084       else if (unformat (input, "v4-halfopen-table-memory %U",
2085                          unformat_memory_size, &tmp))
2086         {
2087           if (tmp >= 0x100000000)
2088             return clib_error_return (0, "memory size %llx (%lld) too large",
2089                                       tmp, tmp);
2090           smm->configured_v4_halfopen_table_memory = tmp;
2091         }
2092       else if (unformat (input, "v6-session-table-memory %U",
2093                          unformat_memory_size, &tmp))
2094         {
2095           if (tmp >= 0x100000000)
2096             return clib_error_return (0, "memory size %llx (%lld) too large",
2097                                       tmp, tmp);
2098           smm->configured_v6_session_table_memory = tmp;
2099         }
2100       else if (unformat (input, "v6-halfopen-table-memory %U",
2101                          unformat_memory_size, &tmp))
2102         {
2103           if (tmp >= 0x100000000)
2104             return clib_error_return (0, "memory size %llx (%lld) too large",
2105                                       tmp, tmp);
2106           smm->configured_v6_halfopen_table_memory = tmp;
2107         }
2108       else if (unformat (input, "local-endpoints-table-memory %U",
2109                          unformat_memory_size, &tmp))
2110         {
2111           if (tmp >= 0x100000000)
2112             return clib_error_return (0, "memory size %llx (%lld) too large",
2113                                       tmp, tmp);
2114           smm->local_endpoints_table_memory = tmp;
2115         }
2116       else if (unformat (input, "local-endpoints-table-buckets %d",
2117                          &smm->local_endpoints_table_buckets))
2118         ;
2119       else if (unformat (input, "enable"))
2120         smm->session_enable_asap = 1;
2121       else if (unformat (input, "use-app-socket-api"))
2122         (void) appns_sapi_enable_disable (1 /* is_enable */);
2123       else if (unformat (input, "poll-main"))
2124         smm->poll_main = 1;
2125       else if (unformat (input, "use-private-rx-mqs"))
2126         smm->use_private_rx_mqs = 1;
2127       else if (unformat (input, "no-adaptive"))
2128         smm->no_adaptive = 1;
2129       /*
2130        * Deprecated but maintained for compatibility
2131        */
2132       else if (unformat (input, "evt_qs_memfd_seg"))
2133         ;
2134       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2135         ;
2136       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2137                          &tmp))
2138         ;
2139       else if (unformat (input, "event-queue-length %d", &nitems))
2140         {
2141           if (nitems >= 2048)
2142             smm->configured_wrk_mq_length = nitems;
2143           else
2144             clib_warning ("event queue length %d too small, ignored", nitems);
2145         }
2146       else
2147         return clib_error_return (0, "unknown input `%U'",
2148                                   format_unformat_error, input);
2149     }
2150   return 0;
2151 }
2152
2153 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2154
2155 /*
2156  * fd.io coding-style-patch-verification: ON
2157  *
2158  * Local Variables:
2159  * eval: (c-set-style "gnu")
2160  * End:
2161  */