5d070240152d2767c553dbdfb9de2154d2ec0d49
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24
25 session_main_t session_main;
26
27 static inline int
28 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
29                             session_evt_type_t evt_type)
30 {
31   session_worker_t *wrk = session_main_get_worker (thread_index);
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35
36   mq = wrk->vpp_event_queue;
37   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
38     return -1;
39   if (PREDICT_FALSE (svm_msg_q_is_full (mq)
40                      || svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
41     {
42       svm_msg_q_unlock (mq);
43       return -2;
44     }
45   switch (evt_type)
46     {
47     case SESSION_CTRL_EVT_RPC:
48       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
49       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
50       evt->rpc_args.fp = data;
51       evt->rpc_args.arg = args;
52       break;
53     case SESSION_IO_EVT_RX:
54     case SESSION_IO_EVT_TX:
55     case SESSION_IO_EVT_TX_FLUSH:
56     case SESSION_IO_EVT_BUILTIN_RX:
57       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
58       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59       evt->session_index = *(u32 *) data;
60       break;
61     case SESSION_IO_EVT_BUILTIN_TX:
62     case SESSION_CTRL_EVT_CLOSE:
63     case SESSION_CTRL_EVT_RESET:
64       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
65       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
66       evt->session_handle = session_handle ((session_t *) data);
67       break;
68     default:
69       clib_warning ("evt unhandled!");
70       svm_msg_q_unlock (mq);
71       return -1;
72     }
73   evt->event_type = evt_type;
74
75   svm_msg_q_add_and_unlock (mq, &msg);
76
77   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
78     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
79
80   return 0;
81 }
82
83 int
84 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
85 {
86   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
87                                      f->master_thread_index, evt_type);
88 }
89
90 int
91 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
92                                       session_evt_type_t evt_type)
93 {
94   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
95 }
96
97 int
98 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
99 {
100   /* only events supported are disconnect, shutdown and reset */
101   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
102           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_RESET);
104   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
105 }
106
107 void
108 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
109                                       void *rpc_args)
110 {
111   session_send_evt_to_thread (fp, rpc_args, thread_index,
112                               SESSION_CTRL_EVT_RPC);
113 }
114
115 void
116 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
117 {
118   if (thread_index != vlib_get_thread_index ())
119     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
120   else
121     {
122       void (*fnp) (void *) = fp;
123       fnp (rpc_args);
124     }
125 }
126
127 void
128 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
129 {
130   session_t *s = session_get (tc->s_index, tc->thread_index);
131
132   ASSERT (s->thread_index == vlib_get_thread_index ());
133   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
134
135   if (!(s->flags & SESSION_F_CUSTOM_TX))
136     {
137       s->flags |= SESSION_F_CUSTOM_TX;
138       if (svm_fifo_set_event (s->tx_fifo)
139           || transport_connection_is_descheduled (tc))
140         {
141           session_evt_elt_t *elt;
142           session_worker_t *wrk;
143
144           wrk = session_main_get_worker (tc->thread_index);
145           if (has_prio)
146             elt = session_evt_alloc_new (wrk);
147           else
148             elt = session_evt_alloc_old (wrk);
149           elt->evt.session_index = tc->s_index;
150           elt->evt.event_type = SESSION_IO_EVT_TX;
151           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
152
153           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
154             vlib_node_set_interrupt_pending (wrk->vm,
155                                              session_queue_node.index);
156         }
157     }
158 }
159
160 void
161 sesssion_reschedule_tx (transport_connection_t * tc)
162 {
163   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
164   session_evt_elt_t *elt;
165
166   ASSERT (tc->thread_index == vlib_get_thread_index ());
167
168   elt = session_evt_alloc_new (wrk);
169   elt->evt.session_index = tc->s_index;
170   elt->evt.event_type = SESSION_IO_EVT_TX;
171
172   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
173     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
174 }
175
176 static void
177 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
178 {
179   u32 thread_index = vlib_get_thread_index ();
180   session_evt_elt_t *elt;
181   session_worker_t *wrk;
182
183   /* If we are in the handler thread, or being called with the worker barrier
184    * held, just append a new event to pending disconnects vector. */
185   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
186     {
187       wrk = session_main_get_worker (s->thread_index);
188       elt = session_evt_alloc_ctrl (wrk);
189       clib_memset (&elt->evt, 0, sizeof (session_event_t));
190       elt->evt.session_handle = session_handle (s);
191       elt->evt.event_type = evt;
192
193       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
194         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
195     }
196   else
197     session_send_ctrl_evt_to_thread (s, evt);
198 }
199
200 session_t *
201 session_alloc (u32 thread_index)
202 {
203   session_worker_t *wrk = &session_main.wrk[thread_index];
204   session_t *s;
205   u8 will_expand = pool_get_will_expand (wrk->sessions);
206
207   /* If we have peekers, let them finish */
208   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
209     {
210       clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
211       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
212       clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
213     }
214   else
215     {
216       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
217     }
218   clib_memset (s, 0, sizeof (*s));
219   s->session_index = s - wrk->sessions;
220   s->thread_index = thread_index;
221   s->app_index = APP_INVALID_INDEX;
222   return s;
223 }
224
225 void
226 session_free (session_t * s)
227 {
228   if (CLIB_DEBUG)
229     {
230       u8 thread_index = s->thread_index;
231       clib_memset (s, 0xFA, sizeof (*s));
232       pool_put (session_main.wrk[thread_index].sessions, s);
233       return;
234     }
235   SESSION_EVT (SESSION_EVT_FREE, s);
236   pool_put (session_main.wrk[s->thread_index].sessions, s);
237 }
238
239 u8
240 session_is_valid (u32 si, u8 thread_index)
241 {
242   session_t *s;
243   transport_connection_t *tc;
244
245   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
246
247   if (s->thread_index != thread_index || s->session_index != si)
248     return 0;
249
250   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
251       || s->session_state <= SESSION_STATE_LISTENING)
252     return 1;
253
254   if (s->session_state == SESSION_STATE_CONNECTING &&
255       (s->flags & SESSION_F_HALF_OPEN))
256     return 1;
257
258   tc = session_get_transport (s);
259   if (s->connection_index != tc->c_index
260       || s->thread_index != tc->thread_index || tc->s_index != si)
261     return 0;
262
263   return 1;
264 }
265
266 static void
267 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
268 {
269   app_worker_t *app_wrk;
270
271   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
272   if (!app_wrk)
273     return;
274   app_worker_cleanup_notify (app_wrk, s, ntf);
275 }
276
277 void
278 session_free_w_fifos (session_t * s)
279 {
280   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
281   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
282   session_free (s);
283 }
284
285 /**
286  * Cleans up session and lookup table.
287  *
288  * Transport connection must still be valid.
289  */
290 static void
291 session_delete (session_t * s)
292 {
293   int rv;
294
295   /* Delete from the main lookup table. */
296   if ((rv = session_lookup_del_session (s)))
297     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
298
299   session_free_w_fifos (s);
300 }
301
302 void
303 session_cleanup_half_open (session_handle_t ho_handle)
304 {
305   session_t *ho = session_get_from_handle (ho_handle);
306
307   /* App transports can migrate their half-opens */
308   if (ho->flags & SESSION_F_IS_MIGRATING)
309     {
310       /* Session still migrating, move to closed state to signal that the
311        * session should be removed. */
312       if (ho->connection_index == ~0)
313         {
314           ho->session_state = SESSION_STATE_CLOSED;
315           return;
316         }
317       /* Migrated transports are no longer half-opens */
318       transport_cleanup (session_get_transport_proto (ho),
319                          ho->connection_index, ho->app_index /* overloaded */);
320     }
321   else
322     transport_cleanup_half_open (session_get_transport_proto (ho),
323                                  ho->connection_index);
324   session_free (ho);
325 }
326
327 static void
328 session_half_open_free (session_t *ho)
329 {
330   app_worker_t *app_wrk;
331
332   ASSERT (vlib_get_thread_index () <= 1);
333   app_wrk = app_worker_get (ho->app_wrk_index);
334   app_worker_del_half_open (app_wrk, ho);
335   session_free (ho);
336 }
337
338 static void
339 session_half_open_free_rpc (void *args)
340 {
341   session_t *ho = ho_session_get (pointer_to_uword (args));
342   session_half_open_free (ho);
343 }
344
345 void
346 session_half_open_delete_notify (transport_connection_t *tc)
347 {
348   /* Notification from ctrl thread accepted without rpc */
349   if (!tc->thread_index)
350     {
351       session_half_open_free (ho_session_get (tc->s_index));
352     }
353   else
354     {
355       void *args = uword_to_pointer ((uword) tc->s_index, void *);
356       session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
357                                             args);
358     }
359 }
360
361 void
362 session_half_open_migrate_notify (transport_connection_t *tc)
363 {
364   session_t *ho;
365
366   ho = ho_session_get (tc->s_index);
367   ho->flags |= SESSION_F_IS_MIGRATING;
368   ho->connection_index = ~0;
369 }
370
371 int
372 session_half_open_migrated_notify (transport_connection_t *tc)
373 {
374   session_t *ho;
375
376   ho = ho_session_get (tc->s_index);
377
378   /* App probably detached so the half-open must be cleaned up */
379   if (ho->session_state == SESSION_STATE_CLOSED)
380     {
381       session_half_open_delete_notify (tc);
382       return -1;
383     }
384   ho->connection_index = tc->c_index;
385   /* Overload app index for half-open with new thread */
386   ho->app_index = tc->thread_index;
387   return 0;
388 }
389
390 session_t *
391 session_alloc_for_connection (transport_connection_t * tc)
392 {
393   session_t *s;
394   u32 thread_index = tc->thread_index;
395
396   ASSERT (thread_index == vlib_get_thread_index ()
397           || transport_protocol_is_cl (tc->proto));
398
399   s = session_alloc (thread_index);
400   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
401   s->session_state = SESSION_STATE_CLOSED;
402
403   /* Attach transport to session and vice versa */
404   s->connection_index = tc->c_index;
405   tc->s_index = s->session_index;
406   return s;
407 }
408
409 session_t *
410 session_alloc_for_half_open (transport_connection_t *tc)
411 {
412   session_t *s;
413
414   s = ho_session_alloc ();
415   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
416   s->connection_index = tc->c_index;
417   tc->s_index = s->session_index;
418   return s;
419 }
420
421 /**
422  * Discards bytes from buffer chain
423  *
424  * It discards n_bytes_to_drop starting at first buffer after chain_b
425  */
426 always_inline void
427 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
428                                      vlib_buffer_t ** chain_b,
429                                      u32 n_bytes_to_drop)
430 {
431   vlib_buffer_t *next = *chain_b;
432   u32 to_drop = n_bytes_to_drop;
433   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
434   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
435     {
436       next = vlib_get_buffer (vm, next->next_buffer);
437       if (next->current_length > to_drop)
438         {
439           vlib_buffer_advance (next, to_drop);
440           to_drop = 0;
441         }
442       else
443         {
444           to_drop -= next->current_length;
445           next->current_length = 0;
446         }
447     }
448   *chain_b = next;
449
450   if (to_drop == 0)
451     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
452 }
453
454 /**
455  * Enqueue buffer chain tail
456  */
457 always_inline int
458 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
459                             u32 offset, u8 is_in_order)
460 {
461   vlib_buffer_t *chain_b;
462   u32 chain_bi, len, diff;
463   vlib_main_t *vm = vlib_get_main ();
464   u8 *data;
465   u32 written = 0;
466   int rv = 0;
467
468   if (is_in_order && offset)
469     {
470       diff = offset - b->current_length;
471       if (diff > b->total_length_not_including_first_buffer)
472         return 0;
473       chain_b = b;
474       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
475       chain_bi = vlib_get_buffer_index (vm, chain_b);
476     }
477   else
478     chain_bi = b->next_buffer;
479
480   do
481     {
482       chain_b = vlib_get_buffer (vm, chain_bi);
483       data = vlib_buffer_get_current (chain_b);
484       len = chain_b->current_length;
485       if (!len)
486         continue;
487       if (is_in_order)
488         {
489           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
490           if (rv == len)
491             {
492               written += rv;
493             }
494           else if (rv < len)
495             {
496               return (rv > 0) ? (written + rv) : written;
497             }
498           else if (rv > len)
499             {
500               written += rv;
501
502               /* written more than what was left in chain */
503               if (written > b->total_length_not_including_first_buffer)
504                 return written;
505
506               /* drop the bytes that have already been delivered */
507               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
508             }
509         }
510       else
511         {
512           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
513           if (rv)
514             {
515               clib_warning ("failed to enqueue multi-buffer seg");
516               return -1;
517             }
518           offset += len;
519         }
520     }
521   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
522           ? chain_b->next_buffer : 0));
523
524   if (is_in_order)
525     return written;
526
527   return 0;
528 }
529
530 void
531 session_fifo_tuning (session_t * s, svm_fifo_t * f,
532                      session_ft_action_t act, u32 len)
533 {
534   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
535     {
536       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
537       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
538       if (CLIB_ASSERT_ENABLE)
539         {
540           segment_manager_t *sm;
541           sm = segment_manager_get (f->segment_manager);
542           ASSERT (f->shr->size >= 4096);
543           ASSERT (f->shr->size <= sm->max_fifo_size);
544         }
545     }
546 }
547
548 /*
549  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
550  * event but on request can queue notification events for later delivery by
551  * calling stream_server_flush_enqueue_events().
552  *
553  * @param tc Transport connection which is to be enqueued data
554  * @param b Buffer to be enqueued
555  * @param offset Offset at which to start enqueueing if out-of-order
556  * @param queue_event Flag to indicate if peer is to be notified or if event
557  *                    is to be queued. The former is useful when more data is
558  *                    enqueued and only one event is to be generated.
559  * @param is_in_order Flag to indicate if data is in order
560  * @return Number of bytes enqueued or a negative value if enqueueing failed.
561  */
562 int
563 session_enqueue_stream_connection (transport_connection_t * tc,
564                                    vlib_buffer_t * b, u32 offset,
565                                    u8 queue_event, u8 is_in_order)
566 {
567   session_t *s;
568   int enqueued = 0, rv, in_order_off;
569
570   s = session_get (tc->s_index, tc->thread_index);
571
572   if (is_in_order)
573     {
574       enqueued = svm_fifo_enqueue (s->rx_fifo,
575                                    b->current_length,
576                                    vlib_buffer_get_current (b));
577       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
578                          && enqueued >= 0))
579         {
580           in_order_off = enqueued > b->current_length ? enqueued : 0;
581           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
582           if (rv > 0)
583             enqueued += rv;
584         }
585     }
586   else
587     {
588       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
589                                          b->current_length,
590                                          vlib_buffer_get_current (b));
591       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
592         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
593       /* if something was enqueued, report even this as success for ooo
594        * segment handling */
595       return rv;
596     }
597
598   if (queue_event)
599     {
600       /* Queue RX event on this fifo. Eventually these will need to be flushed
601        * by calling stream_server_flush_enqueue_events () */
602       session_worker_t *wrk;
603
604       wrk = session_main_get_worker (s->thread_index);
605       if (!(s->flags & SESSION_F_RX_EVT))
606         {
607           s->flags |= SESSION_F_RX_EVT;
608           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
609         }
610
611       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
612     }
613
614   return enqueued;
615 }
616
617 int
618 session_enqueue_dgram_connection (session_t * s,
619                                   session_dgram_hdr_t * hdr,
620                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
621 {
622   int rv;
623
624   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
625           >= b->current_length + sizeof (*hdr));
626
627   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
628     {
629       /* *INDENT-OFF* */
630       svm_fifo_seg_t segs[2] = {
631           { (u8 *) hdr, sizeof (*hdr) },
632           { vlib_buffer_get_current (b), b->current_length }
633       };
634       /* *INDENT-ON* */
635
636       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
637                                       0 /* allow_partial */ );
638     }
639   else
640     {
641       vlib_main_t *vm = vlib_get_main ();
642       svm_fifo_seg_t *segs = 0, *seg;
643       vlib_buffer_t *it = b;
644       u32 n_segs = 1;
645
646       vec_add2 (segs, seg, 1);
647       seg->data = (u8 *) hdr;
648       seg->len = sizeof (*hdr);
649       while (it)
650         {
651           vec_add2 (segs, seg, 1);
652           seg->data = vlib_buffer_get_current (it);
653           seg->len = it->current_length;
654           n_segs++;
655           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
656             break;
657           it = vlib_get_buffer (vm, it->next_buffer);
658         }
659       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
660                                       0 /* allow partial */ );
661       vec_free (segs);
662     }
663
664   if (queue_event && rv > 0)
665     {
666       /* Queue RX event on this fifo. Eventually these will need to be flushed
667        * by calling stream_server_flush_enqueue_events () */
668       session_worker_t *wrk;
669
670       wrk = session_main_get_worker (s->thread_index);
671       if (!(s->flags & SESSION_F_RX_EVT))
672         {
673           s->flags |= SESSION_F_RX_EVT;
674           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
675         }
676
677       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
678     }
679   return rv > 0 ? rv : 0;
680 }
681
682 int
683 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
684                             u32 offset, u32 max_bytes)
685 {
686   session_t *s = session_get (tc->s_index, tc->thread_index);
687   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
688 }
689
690 u32
691 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
692 {
693   session_t *s = session_get (tc->s_index, tc->thread_index);
694   u32 rv;
695
696   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
697   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
698
699   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
700     session_dequeue_notify (s);
701
702   return rv;
703 }
704
705 static inline int
706 session_notify_subscribers (u32 app_index, session_t * s,
707                             svm_fifo_t * f, session_evt_type_t evt_type)
708 {
709   app_worker_t *app_wrk;
710   application_t *app;
711   int i;
712
713   app = application_get (app_index);
714   if (!app)
715     return -1;
716
717   for (i = 0; i < f->shr->n_subscribers; i++)
718     {
719       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
720       if (!app_wrk)
721         continue;
722       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
723         return -1;
724     }
725
726   return 0;
727 }
728
729 /**
730  * Notify session peer that new data has been enqueued.
731  *
732  * @param s     Stream session for which the event is to be generated.
733  * @param lock  Flag to indicate if call should lock message queue.
734  *
735  * @return 0 on success or negative number if failed to send notification.
736  */
737 static inline int
738 session_enqueue_notify_inline (session_t * s)
739 {
740   app_worker_t *app_wrk;
741   u32 session_index;
742   u8 n_subscribers;
743
744   session_index = s->session_index;
745   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
746
747   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
748   if (PREDICT_FALSE (!app_wrk))
749     {
750       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
751       return 0;
752     }
753
754   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
755
756   s->flags &= ~SESSION_F_RX_EVT;
757
758   /* Application didn't confirm accept yet */
759   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
760     return 0;
761
762   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
763                                                      SESSION_IO_EVT_RX)))
764     return -1;
765
766   if (PREDICT_FALSE (n_subscribers))
767     {
768       s = session_get (session_index, vlib_get_thread_index ());
769       return session_notify_subscribers (app_wrk->app_index, s,
770                                          s->rx_fifo, SESSION_IO_EVT_RX);
771     }
772
773   return 0;
774 }
775
776 int
777 session_enqueue_notify (session_t * s)
778 {
779   return session_enqueue_notify_inline (s);
780 }
781
782 static void
783 session_enqueue_notify_rpc (void *arg)
784 {
785   u32 session_index = pointer_to_uword (arg);
786   session_t *s;
787
788   s = session_get_if_valid (session_index, vlib_get_thread_index ());
789   if (!s)
790     return;
791
792   session_enqueue_notify (s);
793 }
794
795 /**
796  * Like session_enqueue_notify, but can be called from a thread that does not
797  * own the session.
798  */
799 void
800 session_enqueue_notify_thread (session_handle_t sh)
801 {
802   u32 thread_index = session_thread_from_handle (sh);
803   u32 session_index = session_index_from_handle (sh);
804
805   /*
806    * Pass session index (u32) as opposed to handle (u64) in case pointers
807    * are not 64-bit.
808    */
809   session_send_rpc_evt_to_thread (thread_index,
810                                   session_enqueue_notify_rpc,
811                                   uword_to_pointer (session_index, void *));
812 }
813
814 int
815 session_dequeue_notify (session_t * s)
816 {
817   app_worker_t *app_wrk;
818
819   svm_fifo_clear_deq_ntf (s->tx_fifo);
820
821   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
822   if (PREDICT_FALSE (!app_wrk))
823     return -1;
824
825   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
826                                                      SESSION_IO_EVT_TX)))
827     return -1;
828
829   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
830     return session_notify_subscribers (app_wrk->app_index, s,
831                                        s->tx_fifo, SESSION_IO_EVT_TX);
832
833   return 0;
834 }
835
836 /**
837  * Flushes queue of sessions that are to be notified of new data
838  * enqueued events.
839  *
840  * @param thread_index Thread index for which the flush is to be performed.
841  * @return 0 on success or a positive number indicating the number of
842  *         failures due to API queue being full.
843  */
844 int
845 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
846 {
847   session_worker_t *wrk = session_main_get_worker (thread_index);
848   session_t *s;
849   int i, errors = 0;
850   u32 *indices;
851
852   indices = wrk->session_to_enqueue[transport_proto];
853
854   for (i = 0; i < vec_len (indices); i++)
855     {
856       s = session_get_if_valid (indices[i], thread_index);
857       if (PREDICT_FALSE (!s))
858         {
859           errors++;
860           continue;
861         }
862
863       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
864                            0 /* TODO/not needed */ );
865
866       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
867         errors++;
868     }
869
870   vec_reset_length (indices);
871   wrk->session_to_enqueue[transport_proto] = indices;
872
873   return errors;
874 }
875
876 int
877 session_main_flush_all_enqueue_events (u8 transport_proto)
878 {
879   vlib_thread_main_t *vtm = vlib_get_thread_main ();
880   int i, errors = 0;
881   for (i = 0; i < 1 + vtm->n_threads; i++)
882     errors += session_main_flush_enqueue_events (transport_proto, i);
883   return errors;
884 }
885
886 int
887 session_stream_connect_notify (transport_connection_t * tc,
888                                session_error_t err)
889 {
890   u32 opaque = 0, new_ti, new_si;
891   app_worker_t *app_wrk;
892   session_t *s = 0, *ho;
893
894   /*
895    * Cleanup half-open table
896    */
897   session_lookup_del_half_open (tc);
898
899   ho = ho_session_get (tc->s_index);
900   opaque = ho->opaque;
901   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
902   if (!app_wrk)
903     return -1;
904
905   if (err)
906     return app_worker_connect_notify (app_wrk, s, err, opaque);
907
908   s = session_alloc_for_connection (tc);
909   s->session_state = SESSION_STATE_CONNECTING;
910   s->app_wrk_index = app_wrk->wrk_index;
911   new_si = s->session_index;
912   new_ti = s->thread_index;
913
914   if ((err = app_worker_init_connected (app_wrk, s)))
915     {
916       session_free (s);
917       app_worker_connect_notify (app_wrk, 0, err, opaque);
918       return -1;
919     }
920
921   s = session_get (new_si, new_ti);
922   s->session_state = SESSION_STATE_READY;
923   session_lookup_add_connection (tc, session_handle (s));
924
925   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
926     {
927       session_lookup_del_connection (tc);
928       /* Avoid notifying app about rejected session cleanup */
929       s = session_get (new_si, new_ti);
930       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
931       session_free (s);
932       return -1;
933     }
934
935   return 0;
936 }
937
938 typedef union session_switch_pool_reply_args_
939 {
940   struct
941   {
942     u32 session_index;
943     u16 thread_index;
944     u8 is_closed;
945   };
946   u64 as_u64;
947 } session_switch_pool_reply_args_t;
948
949 STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
950                "switch pool reply args size");
951
952 static void
953 session_switch_pool_reply (void *arg)
954 {
955   session_switch_pool_reply_args_t rargs;
956   session_t *s;
957
958   rargs.as_u64 = pointer_to_uword (arg);
959   s = session_get_if_valid (rargs.session_index, rargs.thread_index);
960   if (!s)
961     return;
962
963   /* Session closed during migration. Clean everything up */
964   if (rargs.is_closed)
965     {
966       transport_cleanup (session_get_transport_proto (s), s->connection_index,
967                          s->thread_index);
968       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
969       session_free (s);
970       return;
971     }
972
973   /* Notify app that it has data on the new session */
974   session_enqueue_notify (s);
975 }
976
977 typedef struct _session_switch_pool_args
978 {
979   u32 session_index;
980   u32 thread_index;
981   u32 new_thread_index;
982   u32 new_session_index;
983 } session_switch_pool_args_t;
984
985 /**
986  * Notify old thread of the session pool switch
987  */
988 static void
989 session_switch_pool (void *cb_args)
990 {
991   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
992   session_switch_pool_reply_args_t rargs;
993   session_handle_t new_sh;
994   segment_manager_t *sm;
995   app_worker_t *app_wrk;
996   session_t *s;
997
998   ASSERT (args->thread_index == vlib_get_thread_index ());
999   s = session_get (args->session_index, args->thread_index);
1000
1001   /* Check if session closed during migration */
1002   rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
1003
1004   transport_cleanup (session_get_transport_proto (s), s->connection_index,
1005                      s->thread_index);
1006
1007   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1008   if (app_wrk)
1009     {
1010       /* Cleanup fifo segment slice state for fifos */
1011       sm = app_worker_get_connect_segment_manager (app_wrk);
1012       segment_manager_detach_fifo (sm, &s->rx_fifo);
1013       segment_manager_detach_fifo (sm, &s->tx_fifo);
1014
1015       /* Notify app, using old session, about the migration event */
1016       if (!rargs.is_closed)
1017         {
1018           new_sh = session_make_handle (args->new_session_index,
1019                                         args->new_thread_index);
1020           app_worker_migrate_notify (app_wrk, s, new_sh);
1021         }
1022     }
1023
1024   /* Trigger app read and fifo updates on the new thread */
1025   rargs.session_index = args->new_session_index;
1026   rargs.thread_index = args->new_thread_index;
1027   session_send_rpc_evt_to_thread (args->new_thread_index,
1028                                   session_switch_pool_reply,
1029                                   uword_to_pointer (rargs.as_u64, void *));
1030
1031   session_free (s);
1032   clib_mem_free (cb_args);
1033 }
1034
1035 /**
1036  * Move dgram session to the right thread
1037  */
1038 int
1039 session_dgram_connect_notify (transport_connection_t * tc,
1040                               u32 old_thread_index, session_t ** new_session)
1041 {
1042   session_t *new_s;
1043   session_switch_pool_args_t *rpc_args;
1044   segment_manager_t *sm;
1045   app_worker_t *app_wrk;
1046
1047   /*
1048    * Clone half-open session to the right thread.
1049    */
1050   new_s = session_clone_safe (tc->s_index, old_thread_index);
1051   new_s->connection_index = tc->c_index;
1052   new_s->session_state = SESSION_STATE_READY;
1053   new_s->flags |= SESSION_F_IS_MIGRATING;
1054
1055   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1056     session_lookup_add_connection (tc, session_handle (new_s));
1057
1058   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1059   if (app_wrk)
1060     {
1061       /* New set of fifos attached to the same shared memory */
1062       sm = app_worker_get_connect_segment_manager (app_wrk);
1063       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1064       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1065     }
1066
1067   /*
1068    * Ask thread owning the old session to clean it up and make us the tx
1069    * fifo owner
1070    */
1071   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1072   rpc_args->new_session_index = new_s->session_index;
1073   rpc_args->new_thread_index = new_s->thread_index;
1074   rpc_args->session_index = tc->s_index;
1075   rpc_args->thread_index = old_thread_index;
1076   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1077                                   rpc_args);
1078
1079   tc->s_index = new_s->session_index;
1080   new_s->connection_index = tc->c_index;
1081   *new_session = new_s;
1082   return 0;
1083 }
1084
1085 /**
1086  * Notification from transport that connection is being closed.
1087  *
1088  * A disconnect is sent to application but state is not removed. Once
1089  * disconnect is acknowledged by application, session disconnect is called.
1090  * Ultimately this leads to close being called on transport (passive close).
1091  */
1092 void
1093 session_transport_closing_notify (transport_connection_t * tc)
1094 {
1095   app_worker_t *app_wrk;
1096   session_t *s;
1097
1098   s = session_get (tc->s_index, tc->thread_index);
1099   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1100     return;
1101
1102   /* Wait for reply from app before sending notification as the
1103    * accept might be rejected */
1104   if (s->session_state == SESSION_STATE_ACCEPTING)
1105     {
1106       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1107       return;
1108     }
1109
1110   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1111   app_wrk = app_worker_get (s->app_wrk_index);
1112   app_worker_close_notify (app_wrk, s);
1113 }
1114
1115 /**
1116  * Notification from transport that connection is being deleted
1117  *
1118  * This removes the session if it is still valid. It should be called only on
1119  * previously fully established sessions. For instance failed connects should
1120  * call stream_session_connect_notify and indicate that the connect has
1121  * failed.
1122  */
1123 void
1124 session_transport_delete_notify (transport_connection_t * tc)
1125 {
1126   session_t *s;
1127
1128   /* App might've been removed already */
1129   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1130     return;
1131
1132   switch (s->session_state)
1133     {
1134     case SESSION_STATE_CREATED:
1135       /* Session was created but accept notification was not yet sent to the
1136        * app. Cleanup everything. */
1137       session_lookup_del_session (s);
1138       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1139       session_free (s);
1140       break;
1141     case SESSION_STATE_ACCEPTING:
1142     case SESSION_STATE_TRANSPORT_CLOSING:
1143     case SESSION_STATE_CLOSING:
1144     case SESSION_STATE_TRANSPORT_CLOSED:
1145       /* If transport finishes or times out before we get a reply
1146        * from the app, mark transport as closed and wait for reply
1147        * before removing the session. Cleanup session table in advance
1148        * because transport will soon be closed and closed sessions
1149        * are assumed to have been removed from the lookup table */
1150       session_lookup_del_session (s);
1151       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1152       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1153       svm_fifo_dequeue_drop_all (s->tx_fifo);
1154       break;
1155     case SESSION_STATE_APP_CLOSED:
1156       /* Cleanup lookup table as transport needs to still be valid.
1157        * Program transport close to ensure that all session events
1158        * have been cleaned up. Once transport close is called, the
1159        * session is just removed because both transport and app have
1160        * confirmed the close*/
1161       session_lookup_del_session (s);
1162       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1163       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1164       svm_fifo_dequeue_drop_all (s->tx_fifo);
1165       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1166       break;
1167     case SESSION_STATE_TRANSPORT_DELETED:
1168       break;
1169     case SESSION_STATE_CLOSED:
1170       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1171       session_delete (s);
1172       break;
1173     default:
1174       clib_warning ("session state %u", s->session_state);
1175       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1176       session_delete (s);
1177       break;
1178     }
1179 }
1180
1181 /**
1182  * Notification from transport that it is closed
1183  *
1184  * Should be called by transport, prior to calling delete notify, once it
1185  * knows that no more data will be exchanged. This could serve as an
1186  * early acknowledgment of an active close especially if transport delete
1187  * can be delayed a long time, e.g., tcp time-wait.
1188  */
1189 void
1190 session_transport_closed_notify (transport_connection_t * tc)
1191 {
1192   app_worker_t *app_wrk;
1193   session_t *s;
1194
1195   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1196     return;
1197
1198   /* Transport thinks that app requested close but it actually didn't.
1199    * Can happen for tcp:
1200    * 1)if fin and rst are received in close succession.
1201    * 2)if app shutdown the connection.  */
1202   if (s->session_state == SESSION_STATE_READY)
1203     {
1204       session_transport_closing_notify (tc);
1205       svm_fifo_dequeue_drop_all (s->tx_fifo);
1206       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1207     }
1208   /* If app close has not been received or has not yet resulted in
1209    * a transport close, only mark the session transport as closed */
1210   else if (s->session_state <= SESSION_STATE_CLOSING)
1211     {
1212       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1213     }
1214   /* If app also closed, switch to closed */
1215   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1216     s->session_state = SESSION_STATE_CLOSED;
1217
1218   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1219   if (app_wrk)
1220     app_worker_transport_closed_notify (app_wrk, s);
1221 }
1222
1223 /**
1224  * Notify application that connection has been reset.
1225  */
1226 void
1227 session_transport_reset_notify (transport_connection_t * tc)
1228 {
1229   app_worker_t *app_wrk;
1230   session_t *s;
1231
1232   s = session_get (tc->s_index, tc->thread_index);
1233   svm_fifo_dequeue_drop_all (s->tx_fifo);
1234   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1235     return;
1236   if (s->session_state == SESSION_STATE_ACCEPTING)
1237     {
1238       s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1239       return;
1240     }
1241   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1242   app_wrk = app_worker_get (s->app_wrk_index);
1243   app_worker_reset_notify (app_wrk, s);
1244 }
1245
1246 int
1247 session_stream_accept_notify (transport_connection_t * tc)
1248 {
1249   app_worker_t *app_wrk;
1250   session_t *s;
1251
1252   s = session_get (tc->s_index, tc->thread_index);
1253   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1254   if (!app_wrk)
1255     return -1;
1256   if (s->session_state != SESSION_STATE_CREATED)
1257     return 0;
1258   s->session_state = SESSION_STATE_ACCEPTING;
1259   if (app_worker_accept_notify (app_wrk, s))
1260     {
1261       /* On transport delete, no notifications should be sent. Unless, the
1262        * accept is retried and successful. */
1263       s->session_state = SESSION_STATE_CREATED;
1264       return -1;
1265     }
1266   return 0;
1267 }
1268
1269 /**
1270  * Accept a stream session. Optionally ping the server by callback.
1271  */
1272 int
1273 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1274                        u32 thread_index, u8 notify)
1275 {
1276   session_t *s;
1277   int rv;
1278
1279   s = session_alloc_for_connection (tc);
1280   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1281   s->session_state = SESSION_STATE_CREATED;
1282
1283   if ((rv = app_worker_init_accepted (s)))
1284     {
1285       session_free (s);
1286       return rv;
1287     }
1288
1289   session_lookup_add_connection (tc, session_handle (s));
1290
1291   /* Shoulder-tap the server */
1292   if (notify)
1293     {
1294       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1295       if ((rv = app_worker_accept_notify (app_wrk, s)))
1296         {
1297           session_lookup_del_session (s);
1298           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1299           session_free (s);
1300           return rv;
1301         }
1302     }
1303
1304   return 0;
1305 }
1306
1307 int
1308 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1309                       u32 thread_index)
1310 {
1311   app_worker_t *app_wrk;
1312   session_t *s;
1313   int rv;
1314
1315   s = session_alloc_for_connection (tc);
1316   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1317
1318   if ((rv = app_worker_init_accepted (s)))
1319     {
1320       session_free (s);
1321       return rv;
1322     }
1323
1324   session_lookup_add_connection (tc, session_handle (s));
1325
1326   app_wrk = app_worker_get (s->app_wrk_index);
1327   if ((rv = app_worker_accept_notify (app_wrk, s)))
1328     {
1329       session_lookup_del_session (s);
1330       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1331       session_free (s);
1332       return rv;
1333     }
1334
1335   return 0;
1336 }
1337
1338 int
1339 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1340 {
1341   transport_connection_t *tc;
1342   transport_endpoint_cfg_t *tep;
1343   app_worker_t *app_wrk;
1344   session_handle_t sh;
1345   session_t *s;
1346   int rv;
1347
1348   tep = session_endpoint_to_transport_cfg (rmt);
1349   rv = transport_connect (rmt->transport_proto, tep);
1350   if (rv < 0)
1351     {
1352       SESSION_DBG ("Transport failed to open connection.");
1353       return rv;
1354     }
1355
1356   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1357
1358   /* For dgram type of service, allocate session and fifos now */
1359   app_wrk = app_worker_get (rmt->app_wrk_index);
1360   s = session_alloc_for_connection (tc);
1361   s->app_wrk_index = app_wrk->wrk_index;
1362   s->session_state = SESSION_STATE_OPENED;
1363   if (app_worker_init_connected (app_wrk, s))
1364     {
1365       session_free (s);
1366       return -1;
1367     }
1368
1369   sh = session_handle (s);
1370   *rsh = sh;
1371
1372   session_lookup_add_connection (tc, sh);
1373   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1374 }
1375
1376 int
1377 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1378 {
1379   transport_connection_t *tc;
1380   transport_endpoint_cfg_t *tep;
1381   app_worker_t *app_wrk;
1382   session_t *ho;
1383   int rv;
1384
1385   tep = session_endpoint_to_transport_cfg (rmt);
1386   rv = transport_connect (rmt->transport_proto, tep);
1387   if (rv < 0)
1388     {
1389       SESSION_DBG ("Transport failed to open connection.");
1390       return rv;
1391     }
1392
1393   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1394
1395   app_wrk = app_worker_get (rmt->app_wrk_index);
1396
1397   /* If transport offers a vc service, only allocate established
1398    * session once the connection has been established.
1399    * In the meantime allocate half-open session for tracking purposes
1400    * associate half-open connection to it and add session to app-worker
1401    * half-open table. These are needed to allocate the established
1402    * session on transport notification, and to cleanup the half-open
1403    * session if the app detaches before connection establishment.
1404    */
1405   ho = session_alloc_for_half_open (tc);
1406   ho->app_wrk_index = app_wrk->wrk_index;
1407   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1408   ho->opaque = rmt->opaque;
1409   *rsh = session_handle (ho);
1410
1411   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1412     session_lookup_add_half_open (tc, tc->c_index);
1413
1414   return 0;
1415 }
1416
1417 int
1418 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1419 {
1420   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1421
1422   /* Not supported for now */
1423   *rsh = SESSION_INVALID_HANDLE;
1424   return transport_connect (rmt->transport_proto, tep_cfg);
1425 }
1426
1427 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1428                                         session_handle_t *);
1429
1430 /* *INDENT-OFF* */
1431 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1432   session_open_vc,
1433   session_open_cl,
1434   session_open_app,
1435 };
1436 /* *INDENT-ON* */
1437
1438 /**
1439  * Ask transport to open connection to remote transport endpoint.
1440  *
1441  * Stores handle for matching request with reply since the call can be
1442  * asynchronous. For instance, for TCP the 3-way handshake must complete
1443  * before reply comes. Session is only created once connection is established.
1444  *
1445  * @param app_index Index of the application requesting the connect
1446  * @param st Session type requested.
1447  * @param tep Remote transport endpoint
1448  * @param opaque Opaque data (typically, api_context) the application expects
1449  *               on open completion.
1450  */
1451 int
1452 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1453 {
1454   transport_service_type_t tst;
1455   tst = transport_protocol_service_type (rmt->transport_proto);
1456   return session_open_srv_fns[tst](rmt, rsh);
1457 }
1458
1459 /**
1460  * Ask transport to listen on session endpoint.
1461  *
1462  * @param s Session for which listen will be called. Note that unlike
1463  *          established sessions, listen sessions are not associated to a
1464  *          thread.
1465  * @param sep Local endpoint to be listened on.
1466  */
1467 int
1468 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1469 {
1470   transport_endpoint_cfg_t *tep;
1471   int tc_index;
1472   u32 s_index;
1473
1474   /* Transport bind/listen */
1475   tep = session_endpoint_to_transport_cfg (sep);
1476   s_index = ls->session_index;
1477   tc_index = transport_start_listen (session_get_transport_proto (ls),
1478                                      s_index, tep);
1479
1480   if (tc_index < 0)
1481     return tc_index;
1482
1483   /* Attach transport to session. Lookup tables are populated by the app
1484    * worker because local tables (for ct sessions) are not backed by a fib */
1485   ls = listen_session_get (s_index);
1486   ls->connection_index = tc_index;
1487
1488   return 0;
1489 }
1490
1491 /**
1492  * Ask transport to stop listening on local transport endpoint.
1493  *
1494  * @param s Session to stop listening on. It must be in state LISTENING.
1495  */
1496 int
1497 session_stop_listen (session_t * s)
1498 {
1499   transport_proto_t tp = session_get_transport_proto (s);
1500   transport_connection_t *tc;
1501
1502   if (s->session_state != SESSION_STATE_LISTENING)
1503     return SESSION_E_NOLISTEN;
1504
1505   tc = transport_get_listener (tp, s->connection_index);
1506
1507   /* If no transport, assume everything was cleaned up already */
1508   if (!tc)
1509     return SESSION_E_NONE;
1510
1511   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1512     session_lookup_del_connection (tc);
1513
1514   transport_stop_listen (tp, s->connection_index);
1515   return 0;
1516 }
1517
1518 /**
1519  * Initialize session half-closing procedure.
1520  *
1521  * Note that half-closing will not change the state of the session.
1522  */
1523 void
1524 session_half_close (session_t *s)
1525 {
1526   if (!s)
1527     return;
1528
1529   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1530 }
1531
1532 /**
1533  * Initialize session closing procedure.
1534  *
1535  * Request is always sent to session node to ensure that all outstanding
1536  * requests are served before transport is notified.
1537  */
1538 void
1539 session_close (session_t * s)
1540 {
1541   if (!s)
1542     return;
1543
1544   if (s->session_state >= SESSION_STATE_CLOSING)
1545     {
1546       /* Session will only be removed once both app and transport
1547        * acknowledge the close */
1548       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1549           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1550         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1551       return;
1552     }
1553
1554   /* App closed so stop propagating dequeue notifications */
1555   svm_fifo_clear_deq_ntf (s->tx_fifo);
1556   s->session_state = SESSION_STATE_CLOSING;
1557   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1558 }
1559
1560 /**
1561  * Force a close without waiting for data to be flushed
1562  */
1563 void
1564 session_reset (session_t * s)
1565 {
1566   if (s->session_state >= SESSION_STATE_CLOSING)
1567     return;
1568   /* Drop all outstanding tx data */
1569   svm_fifo_dequeue_drop_all (s->tx_fifo);
1570   s->session_state = SESSION_STATE_CLOSING;
1571   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1572 }
1573
1574 /**
1575  * Notify transport the session can be half-disconnected.
1576  *
1577  * Must be called from the session's thread.
1578  */
1579 void
1580 session_transport_half_close (session_t *s)
1581 {
1582   /* Only READY session can be half-closed */
1583   if (s->session_state != SESSION_STATE_READY)
1584     {
1585       return;
1586     }
1587
1588   transport_half_close (session_get_transport_proto (s), s->connection_index,
1589                         s->thread_index);
1590 }
1591
1592 /**
1593  * Notify transport the session can be disconnected. This should eventually
1594  * result in a delete notification that allows us to cleanup session state.
1595  * Called for both active/passive disconnects.
1596  *
1597  * Must be called from the session's thread.
1598  */
1599 void
1600 session_transport_close (session_t * s)
1601 {
1602   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1603     {
1604       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1605         s->session_state = SESSION_STATE_CLOSED;
1606       /* If transport is already deleted, just free the session */
1607       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1608         session_free_w_fifos (s);
1609       return;
1610     }
1611
1612   /* If the tx queue wasn't drained, the transport can continue to try
1613    * sending the outstanding data (in closed state it cannot). It MUST however
1614    * at one point, either after sending everything or after a timeout, call
1615    * delete notify. This will finally lead to the complete cleanup of the
1616    * session.
1617    */
1618   s->session_state = SESSION_STATE_APP_CLOSED;
1619
1620   transport_close (session_get_transport_proto (s), s->connection_index,
1621                    s->thread_index);
1622 }
1623
1624 /**
1625  * Force transport close
1626  */
1627 void
1628 session_transport_reset (session_t * s)
1629 {
1630   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1631     {
1632       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1633         s->session_state = SESSION_STATE_CLOSED;
1634       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1635         session_free_w_fifos (s);
1636       return;
1637     }
1638
1639   s->session_state = SESSION_STATE_APP_CLOSED;
1640   transport_reset (session_get_transport_proto (s), s->connection_index,
1641                    s->thread_index);
1642 }
1643
1644 /**
1645  * Cleanup transport and session state.
1646  *
1647  * Notify transport of the cleanup and free the session. This should
1648  * be called only if transport reported some error and is already
1649  * closed.
1650  */
1651 void
1652 session_transport_cleanup (session_t * s)
1653 {
1654   /* Delete from main lookup table before we axe the the transport */
1655   session_lookup_del_session (s);
1656   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1657     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1658                        s->thread_index);
1659   /* Since we called cleanup, no delete notification will come. So, make
1660    * sure the session is properly freed. */
1661   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1662   session_free (s);
1663 }
1664
1665 /**
1666  * Allocate worker mqs in share-able segment
1667  *
1668  * That can only be a newly created memfd segment, that must be mapped
1669  * by all apps/stack users unless private rx mqs are enabled.
1670  */
1671 void
1672 session_vpp_wrk_mqs_alloc (session_main_t *smm)
1673 {
1674   u32 mq_q_length = 2048, evt_size = sizeof (session_event_t);
1675   fifo_segment_t *mqs_seg = &smm->wrk_mqs_segment;
1676   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1677   uword mqs_seg_size;
1678   int i;
1679
1680   mq_q_length = clib_max (mq_q_length, smm->configured_wrk_mq_length);
1681
1682   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1683     { mq_q_length, evt_size, 0 }, { mq_q_length >> 1, 256, 0 }
1684   };
1685   cfg->consumer_pid = 0;
1686   cfg->n_rings = 2;
1687   cfg->q_nitems = mq_q_length;
1688   cfg->ring_cfgs = rc;
1689
1690   /*
1691    * Compute mqs segment size based on rings config and leave space
1692    * for passing extended configuration messages, i.e., data allocated
1693    * outside of the rings. If provided with a config value, accept it
1694    * if larger than minimum size.
1695    */
1696   mqs_seg_size = svm_msg_q_size_to_alloc (cfg) * vec_len (smm->wrk);
1697   mqs_seg_size = mqs_seg_size + (32 << 10);
1698   mqs_seg_size = clib_max (mqs_seg_size, smm->wrk_mqs_segment_size);
1699
1700   mqs_seg->ssvm.ssvm_size = mqs_seg_size;
1701   mqs_seg->ssvm.my_pid = getpid ();
1702   mqs_seg->ssvm.name = format (0, "%s%c", "session: wrk-mqs-segment", 0);
1703
1704   if (ssvm_server_init (&mqs_seg->ssvm, SSVM_SEGMENT_MEMFD))
1705     {
1706       clib_warning ("failed to initialize queue segment");
1707       return;
1708     }
1709
1710   fifo_segment_init (mqs_seg);
1711
1712   /* Special fifo segment that's filled only with mqs */
1713   mqs_seg->h->n_mqs = vec_len (smm->wrk);
1714
1715   for (i = 0; i < vec_len (smm->wrk); i++)
1716     smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (mqs_seg, i, cfg);
1717 }
1718
1719 fifo_segment_t *
1720 session_main_get_wrk_mqs_segment (void)
1721 {
1722   return &session_main.wrk_mqs_segment;
1723 }
1724
1725 u64
1726 session_segment_handle (session_t * s)
1727 {
1728   svm_fifo_t *f;
1729
1730   if (!s->rx_fifo)
1731     return SESSION_INVALID_HANDLE;
1732
1733   f = s->rx_fifo;
1734   return segment_manager_make_segment_handle (f->segment_manager,
1735                                               f->segment_index);
1736 }
1737
1738 /* *INDENT-OFF* */
1739 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1740     session_tx_fifo_peek_and_snd,
1741     session_tx_fifo_dequeue_and_snd,
1742     session_tx_fifo_dequeue_internal,
1743     session_tx_fifo_dequeue_and_snd
1744 };
1745 /* *INDENT-ON* */
1746
1747 void
1748 session_register_transport (transport_proto_t transport_proto,
1749                             const transport_proto_vft_t * vft, u8 is_ip4,
1750                             u32 output_node)
1751 {
1752   session_main_t *smm = &session_main;
1753   session_type_t session_type;
1754   u32 next_index = ~0;
1755
1756   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1757
1758   vec_validate (smm->session_type_to_next, session_type);
1759   vec_validate (smm->session_tx_fns, session_type);
1760
1761   if (output_node != ~0)
1762     next_index = vlib_node_add_next (vlib_get_main (),
1763                                      session_queue_node.index, output_node);
1764
1765   smm->session_type_to_next[session_type] = next_index;
1766   smm->session_tx_fns[session_type] =
1767     session_tx_fns[vft->transport_options.tx_type];
1768 }
1769
1770 void
1771 session_register_update_time_fn (session_update_time_fn fn, u8 is_add)
1772 {
1773   session_main_t *smm = &session_main;
1774   session_update_time_fn *fi;
1775   u32 fi_pos = ~0;
1776   u8 found = 0;
1777
1778   vec_foreach (fi, smm->update_time_fns)
1779     {
1780       if (*fi == fn)
1781         {
1782           fi_pos = fi - smm->update_time_fns;
1783           found = 1;
1784           break;
1785         }
1786     }
1787
1788   if (is_add)
1789     {
1790       if (found)
1791         {
1792           clib_warning ("update time fn %p already registered", fn);
1793           return;
1794         }
1795       vec_add1 (smm->update_time_fns, fn);
1796     }
1797   else
1798     {
1799       vec_del1 (smm->update_time_fns, fi_pos);
1800     }
1801 }
1802
1803 transport_proto_t
1804 session_add_transport_proto (void)
1805 {
1806   session_main_t *smm = &session_main;
1807   session_worker_t *wrk;
1808   u32 thread;
1809
1810   smm->last_transport_proto_type += 1;
1811
1812   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1813     {
1814       wrk = session_main_get_worker (thread);
1815       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1816     }
1817
1818   return smm->last_transport_proto_type;
1819 }
1820
1821 transport_connection_t *
1822 session_get_transport (session_t * s)
1823 {
1824   if (s->session_state != SESSION_STATE_LISTENING)
1825     return transport_get_connection (session_get_transport_proto (s),
1826                                      s->connection_index, s->thread_index);
1827   else
1828     return transport_get_listener (session_get_transport_proto (s),
1829                                    s->connection_index);
1830 }
1831
1832 void
1833 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1834 {
1835   if (s->session_state != SESSION_STATE_LISTENING)
1836     return transport_get_endpoint (session_get_transport_proto (s),
1837                                    s->connection_index, s->thread_index, tep,
1838                                    is_lcl);
1839   else
1840     return transport_get_listener_endpoint (session_get_transport_proto (s),
1841                                             s->connection_index, tep, is_lcl);
1842 }
1843
1844 int
1845 session_transport_attribute (session_t *s, u8 is_get,
1846                              transport_endpt_attr_t *attr)
1847 {
1848   if (s->session_state < SESSION_STATE_READY)
1849     return -1;
1850
1851   return transport_connection_attribute (session_get_transport_proto (s),
1852                                          s->connection_index, s->thread_index,
1853                                          is_get, attr);
1854 }
1855
1856 transport_connection_t *
1857 listen_session_get_transport (session_t * s)
1858 {
1859   return transport_get_listener (session_get_transport_proto (s),
1860                                  s->connection_index);
1861 }
1862
1863 void
1864 session_queue_run_on_main_thread (vlib_main_t * vm)
1865 {
1866   ASSERT (vlib_get_thread_index () == 0);
1867   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1868 }
1869
1870 static clib_error_t *
1871 session_manager_main_enable (vlib_main_t * vm)
1872 {
1873   session_main_t *smm = &session_main;
1874   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1875   u32 num_threads, preallocated_sessions_per_worker;
1876   session_worker_t *wrk;
1877   int i;
1878
1879   /* We only initialize once and do not de-initialized on disable */
1880   if (smm->is_initialized)
1881     goto done;
1882
1883   num_threads = 1 /* main thread */  + vtm->n_threads;
1884
1885   if (num_threads < 1)
1886     return clib_error_return (0, "n_thread_stacks not set");
1887
1888   /* Allocate cache line aligned worker contexts */
1889   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1890   clib_spinlock_init (&session_main.pool_realloc_lock);
1891
1892   for (i = 0; i < num_threads; i++)
1893     {
1894       wrk = &smm->wrk[i];
1895       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1896       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1897       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1898       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
1899       wrk->evts_pending_main =
1900         clib_llist_make_head (wrk->event_elts, evt_list);
1901       wrk->vm = vlib_get_main_by_index (i);
1902       wrk->last_vlib_time = vlib_time_now (vm);
1903       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1904       wrk->timerfd = -1;
1905       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1906
1907       if (num_threads > 1)
1908         clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1909
1910       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1911         session_wrk_enable_adaptive_mode (wrk);
1912     }
1913
1914   /* Allocate vpp event queues segment and queue */
1915   session_vpp_wrk_mqs_alloc (smm);
1916
1917   /* Initialize segment manager properties */
1918   segment_manager_main_init ();
1919
1920   /* Preallocate sessions */
1921   if (smm->preallocated_sessions)
1922     {
1923       if (num_threads == 1)
1924         {
1925           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1926         }
1927       else
1928         {
1929           int j;
1930           preallocated_sessions_per_worker =
1931             (1.1 * (f64) smm->preallocated_sessions /
1932              (f64) (num_threads - 1));
1933
1934           for (j = 1; j < num_threads; j++)
1935             {
1936               pool_init_fixed (smm->wrk[j].sessions,
1937                                preallocated_sessions_per_worker);
1938             }
1939         }
1940     }
1941
1942   session_lookup_init ();
1943   app_namespaces_init ();
1944   transport_init ();
1945   smm->is_initialized = 1;
1946
1947 done:
1948
1949   smm->is_enabled = 1;
1950
1951   /* Enable transports */
1952   transport_enable_disable (vm, 1);
1953   session_debug_init ();
1954
1955   return 0;
1956 }
1957
1958 static void
1959 session_manager_main_disable (vlib_main_t * vm)
1960 {
1961   transport_enable_disable (vm, 0 /* is_en */ );
1962 }
1963
1964 void
1965 session_node_enable_disable (u8 is_en)
1966 {
1967   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
1968   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1969   session_main_t *sm = &session_main;
1970   vlib_main_t *vm;
1971   vlib_node_t *n;
1972   int n_vlibs, i;
1973
1974   n_vlibs = vlib_get_n_threads ();
1975   for (i = 0; i < n_vlibs; i++)
1976     {
1977       vm = vlib_get_main_by_index (i);
1978       /* main thread with workers and not polling */
1979       if (i == 0 && n_vlibs > 1)
1980         {
1981           vlib_node_set_state (vm, session_queue_node.index, mstate);
1982           if (is_en)
1983             {
1984               session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
1985               vlib_node_set_state (vm, session_queue_process_node.index,
1986                                    state);
1987               n = vlib_get_node (vm, session_queue_process_node.index);
1988               vlib_start_process (vm, n->runtime_index);
1989             }
1990           else
1991             {
1992               vlib_process_signal_event_mt (vm,
1993                                             session_queue_process_node.index,
1994                                             SESSION_Q_PROCESS_STOP, 0);
1995             }
1996           if (!sm->poll_main)
1997             continue;
1998         }
1999       vlib_node_set_state (vm, session_queue_node.index, state);
2000     }
2001
2002   if (sm->use_private_rx_mqs)
2003     application_enable_rx_mqs_nodes (is_en);
2004 }
2005
2006 clib_error_t *
2007 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
2008 {
2009   clib_error_t *error = 0;
2010   if (is_en)
2011     {
2012       if (session_main.is_enabled)
2013         return 0;
2014
2015       error = session_manager_main_enable (vm);
2016       session_node_enable_disable (is_en);
2017     }
2018   else
2019     {
2020       session_main.is_enabled = 0;
2021       session_manager_main_disable (vm);
2022       session_node_enable_disable (is_en);
2023     }
2024
2025   return error;
2026 }
2027
2028 clib_error_t *
2029 session_main_init (vlib_main_t * vm)
2030 {
2031   session_main_t *smm = &session_main;
2032
2033   smm->is_enabled = 0;
2034   smm->session_enable_asap = 0;
2035   smm->poll_main = 0;
2036   smm->use_private_rx_mqs = 0;
2037   smm->no_adaptive = 0;
2038   smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP;
2039
2040   return 0;
2041 }
2042
2043 static clib_error_t *
2044 session_main_loop_init (vlib_main_t * vm)
2045 {
2046   session_main_t *smm = &session_main;
2047   if (smm->session_enable_asap)
2048     {
2049       vlib_worker_thread_barrier_sync (vm);
2050       vnet_session_enable_disable (vm, 1 /* is_en */ );
2051       vlib_worker_thread_barrier_release (vm);
2052     }
2053   return 0;
2054 }
2055
2056 VLIB_INIT_FUNCTION (session_main_init);
2057 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
2058
2059 static clib_error_t *
2060 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
2061 {
2062   session_main_t *smm = &session_main;
2063   u32 nitems;
2064   uword tmp;
2065
2066   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2067     {
2068       if (unformat (input, "wrk-mq-length %d", &nitems))
2069         {
2070           if (nitems >= 2048)
2071             smm->configured_wrk_mq_length = nitems;
2072           else
2073             clib_warning ("event queue length %d too small, ignored", nitems);
2074         }
2075       else if (unformat (input, "preallocated-sessions %d",
2076                          &smm->preallocated_sessions))
2077         ;
2078       else if (unformat (input, "v4-session-table-buckets %d",
2079                          &smm->configured_v4_session_table_buckets))
2080         ;
2081       else if (unformat (input, "v4-halfopen-table-buckets %d",
2082                          &smm->configured_v4_halfopen_table_buckets))
2083         ;
2084       else if (unformat (input, "v6-session-table-buckets %d",
2085                          &smm->configured_v6_session_table_buckets))
2086         ;
2087       else if (unformat (input, "v6-halfopen-table-buckets %d",
2088                          &smm->configured_v6_halfopen_table_buckets))
2089         ;
2090       else if (unformat (input, "v4-session-table-memory %U",
2091                          unformat_memory_size, &tmp))
2092         {
2093           if (tmp >= 0x100000000)
2094             return clib_error_return (0, "memory size %llx (%lld) too large",
2095                                       tmp, tmp);
2096           smm->configured_v4_session_table_memory = tmp;
2097         }
2098       else if (unformat (input, "v4-halfopen-table-memory %U",
2099                          unformat_memory_size, &tmp))
2100         {
2101           if (tmp >= 0x100000000)
2102             return clib_error_return (0, "memory size %llx (%lld) too large",
2103                                       tmp, tmp);
2104           smm->configured_v4_halfopen_table_memory = tmp;
2105         }
2106       else if (unformat (input, "v6-session-table-memory %U",
2107                          unformat_memory_size, &tmp))
2108         {
2109           if (tmp >= 0x100000000)
2110             return clib_error_return (0, "memory size %llx (%lld) too large",
2111                                       tmp, tmp);
2112           smm->configured_v6_session_table_memory = tmp;
2113         }
2114       else if (unformat (input, "v6-halfopen-table-memory %U",
2115                          unformat_memory_size, &tmp))
2116         {
2117           if (tmp >= 0x100000000)
2118             return clib_error_return (0, "memory size %llx (%lld) too large",
2119                                       tmp, tmp);
2120           smm->configured_v6_halfopen_table_memory = tmp;
2121         }
2122       else if (unformat (input, "local-endpoints-table-memory %U",
2123                          unformat_memory_size, &tmp))
2124         {
2125           if (tmp >= 0x100000000)
2126             return clib_error_return (0, "memory size %llx (%lld) too large",
2127                                       tmp, tmp);
2128           smm->local_endpoints_table_memory = tmp;
2129         }
2130       else if (unformat (input, "local-endpoints-table-buckets %d",
2131                          &smm->local_endpoints_table_buckets))
2132         ;
2133       else if (unformat (input, "enable"))
2134         smm->session_enable_asap = 1;
2135       else if (unformat (input, "use-app-socket-api"))
2136         (void) appns_sapi_enable_disable (1 /* is_enable */);
2137       else if (unformat (input, "poll-main"))
2138         smm->poll_main = 1;
2139       else if (unformat (input, "use-private-rx-mqs"))
2140         smm->use_private_rx_mqs = 1;
2141       else if (unformat (input, "no-adaptive"))
2142         smm->no_adaptive = 1;
2143       /*
2144        * Deprecated but maintained for compatibility
2145        */
2146       else if (unformat (input, "evt_qs_memfd_seg"))
2147         ;
2148       else if (unformat (input, "segment-baseva 0x%lx", &tmp))
2149         ;
2150       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2151                          &tmp))
2152         ;
2153       else if (unformat (input, "event-queue-length %d", &nitems))
2154         {
2155           if (nitems >= 2048)
2156             smm->configured_wrk_mq_length = nitems;
2157           else
2158             clib_warning ("event queue length %d too small, ignored", nitems);
2159         }
2160       else
2161         return clib_error_return (0, "unknown input `%U'",
2162                                   format_unformat_error, input);
2163     }
2164   return 0;
2165 }
2166
2167 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2168
2169 /*
2170  * fd.io coding-style-patch-verification: ON
2171  *
2172  * Local Variables:
2173  * eval: (c-set-style "gnu")
2174  * End:
2175  */