tls: switch dtls to vc and track half-opens
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/application.h>
22 #include <vnet/dpo/load_balance.h>
23 #include <vnet/fib/ip4_fib.h>
24
25 session_main_t session_main;
26
27 static inline int
28 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
29                             session_evt_type_t evt_type)
30 {
31   session_worker_t *wrk = session_main_get_worker (thread_index);
32   session_event_t *evt;
33   svm_msg_q_msg_t msg;
34   svm_msg_q_t *mq;
35
36   mq = wrk->vpp_event_queue;
37   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
38     return -1;
39   if (PREDICT_FALSE (svm_msg_q_is_full (mq)
40                      || svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
41     {
42       svm_msg_q_unlock (mq);
43       return -2;
44     }
45   switch (evt_type)
46     {
47     case SESSION_CTRL_EVT_RPC:
48       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
49       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
50       evt->rpc_args.fp = data;
51       evt->rpc_args.arg = args;
52       break;
53     case SESSION_IO_EVT_RX:
54     case SESSION_IO_EVT_TX:
55     case SESSION_IO_EVT_TX_FLUSH:
56     case SESSION_IO_EVT_BUILTIN_RX:
57       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
58       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
59       evt->session_index = *(u32 *) data;
60       break;
61     case SESSION_IO_EVT_BUILTIN_TX:
62     case SESSION_CTRL_EVT_CLOSE:
63     case SESSION_CTRL_EVT_RESET:
64       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
65       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
66       evt->session_handle = session_handle ((session_t *) data);
67       break;
68     default:
69       clib_warning ("evt unhandled!");
70       svm_msg_q_unlock (mq);
71       return -1;
72     }
73   evt->event_type = evt_type;
74
75   svm_msg_q_add_and_unlock (mq, &msg);
76
77   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
78     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
79
80   return 0;
81 }
82
83 int
84 session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
85 {
86   return session_send_evt_to_thread (&f->shr->master_session_index, 0,
87                                      f->master_thread_index, evt_type);
88 }
89
90 int
91 session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
92                                       session_evt_type_t evt_type)
93 {
94   return session_send_evt_to_thread (data, 0, thread_index, evt_type);
95 }
96
97 int
98 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
99 {
100   /* only events supported are disconnect, shutdown and reset */
101   ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
102           evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
103           evt_type == SESSION_CTRL_EVT_RESET);
104   return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
105 }
106
107 void
108 session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
109                                       void *rpc_args)
110 {
111   session_send_evt_to_thread (fp, rpc_args, thread_index,
112                               SESSION_CTRL_EVT_RPC);
113 }
114
115 void
116 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
117 {
118   if (thread_index != vlib_get_thread_index ())
119     session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
120   else
121     {
122       void (*fnp) (void *) = fp;
123       fnp (rpc_args);
124     }
125 }
126
127 void
128 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
129 {
130   session_t *s = session_get (tc->s_index, tc->thread_index);
131
132   ASSERT (s->thread_index == vlib_get_thread_index ());
133   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
134
135   if (!(s->flags & SESSION_F_CUSTOM_TX))
136     {
137       s->flags |= SESSION_F_CUSTOM_TX;
138       if (svm_fifo_set_event (s->tx_fifo)
139           || transport_connection_is_descheduled (tc))
140         {
141           session_evt_elt_t *elt;
142           session_worker_t *wrk;
143
144           wrk = session_main_get_worker (tc->thread_index);
145           if (has_prio)
146             elt = session_evt_alloc_new (wrk);
147           else
148             elt = session_evt_alloc_old (wrk);
149           elt->evt.session_index = tc->s_index;
150           elt->evt.event_type = SESSION_IO_EVT_TX;
151           tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
152
153           if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
154             vlib_node_set_interrupt_pending (wrk->vm,
155                                              session_queue_node.index);
156         }
157     }
158 }
159
160 void
161 sesssion_reschedule_tx (transport_connection_t * tc)
162 {
163   session_worker_t *wrk = session_main_get_worker (tc->thread_index);
164   session_evt_elt_t *elt;
165
166   ASSERT (tc->thread_index == vlib_get_thread_index ());
167
168   elt = session_evt_alloc_new (wrk);
169   elt->evt.session_index = tc->s_index;
170   elt->evt.event_type = SESSION_IO_EVT_TX;
171
172   if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
173     vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
174 }
175
176 static void
177 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
178 {
179   u32 thread_index = vlib_get_thread_index ();
180   session_evt_elt_t *elt;
181   session_worker_t *wrk;
182
183   /* If we are in the handler thread, or being called with the worker barrier
184    * held, just append a new event to pending disconnects vector. */
185   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
186     {
187       wrk = session_main_get_worker (s->thread_index);
188       elt = session_evt_alloc_ctrl (wrk);
189       clib_memset (&elt->evt, 0, sizeof (session_event_t));
190       elt->evt.session_handle = session_handle (s);
191       elt->evt.event_type = evt;
192
193       if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
194         vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
195     }
196   else
197     session_send_ctrl_evt_to_thread (s, evt);
198 }
199
200 session_t *
201 session_alloc (u32 thread_index)
202 {
203   session_worker_t *wrk = &session_main.wrk[thread_index];
204   session_t *s;
205   u8 will_expand = 0;
206   pool_get_aligned_will_expand (wrk->sessions, will_expand,
207                                 CLIB_CACHE_LINE_BYTES);
208   /* If we have peekers, let them finish */
209   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
210     {
211       clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
212       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
213       clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
214     }
215   else
216     {
217       pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
218     }
219   clib_memset (s, 0, sizeof (*s));
220   s->session_index = s - wrk->sessions;
221   s->thread_index = thread_index;
222   s->app_index = APP_INVALID_INDEX;
223   return s;
224 }
225
226 void
227 session_free (session_t * s)
228 {
229   if (CLIB_DEBUG)
230     {
231       u8 thread_index = s->thread_index;
232       clib_memset (s, 0xFA, sizeof (*s));
233       pool_put (session_main.wrk[thread_index].sessions, s);
234       return;
235     }
236   SESSION_EVT (SESSION_EVT_FREE, s);
237   pool_put (session_main.wrk[s->thread_index].sessions, s);
238 }
239
240 u8
241 session_is_valid (u32 si, u8 thread_index)
242 {
243   session_t *s;
244   transport_connection_t *tc;
245
246   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
247
248   if (s->thread_index != thread_index || s->session_index != si)
249     return 0;
250
251   if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
252       || s->session_state <= SESSION_STATE_LISTENING)
253     return 1;
254
255   if (s->session_state == SESSION_STATE_CONNECTING &&
256       (s->flags & SESSION_F_HALF_OPEN))
257     return 1;
258
259   tc = session_get_transport (s);
260   if (s->connection_index != tc->c_index
261       || s->thread_index != tc->thread_index || tc->s_index != si)
262     return 0;
263
264   return 1;
265 }
266
267 static void
268 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
269 {
270   app_worker_t *app_wrk;
271
272   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
273   if (!app_wrk)
274     return;
275   app_worker_cleanup_notify (app_wrk, s, ntf);
276 }
277
278 void
279 session_free_w_fifos (session_t * s)
280 {
281   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
282   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
283   session_free (s);
284 }
285
286 /**
287  * Cleans up session and lookup table.
288  *
289  * Transport connection must still be valid.
290  */
291 static void
292 session_delete (session_t * s)
293 {
294   int rv;
295
296   /* Delete from the main lookup table. */
297   if ((rv = session_lookup_del_session (s)))
298     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
299
300   session_free_w_fifos (s);
301 }
302
303 void
304 session_cleanup_half_open (session_handle_t ho_handle)
305 {
306   session_t *ho = session_get_from_handle (ho_handle);
307
308   /* App transports can migrate their half-opens */
309   if (ho->flags & SESSION_F_IS_MIGRATING)
310     {
311       /* Session still migrating, move to closed state to signal that the
312        * session should be removed. */
313       if (ho->connection_index == ~0)
314         {
315           ho->session_state = SESSION_STATE_CLOSED;
316           return;
317         }
318       /* Migrated transports are no longer half-opens */
319       transport_cleanup (session_get_transport_proto (ho),
320                          ho->connection_index, ho->app_index /* overloaded */);
321       return;
322     }
323   transport_cleanup_half_open (session_get_transport_proto (ho),
324                                ho->connection_index);
325 }
326
327 static void
328 session_half_open_free (u32 ho_index)
329 {
330   app_worker_t *app_wrk;
331   session_t *ho;
332
333   ASSERT (vlib_get_thread_index () <= 1);
334   ho = ho_session_get (ho_index);
335   app_wrk = app_worker_get (ho->app_wrk_index);
336   app_worker_del_half_open (app_wrk, ho);
337   session_free (ho);
338 }
339
340 static void
341 session_half_open_free_rpc (void *args)
342 {
343   session_half_open_free (pointer_to_uword (args));
344 }
345
346 void
347 session_half_open_delete_notify (transport_connection_t *tc)
348 {
349   void *args = uword_to_pointer ((uword) tc->s_index, void *);
350   u32 ctrl_thread = vlib_num_workers () ? 1 : 0;
351   session_send_rpc_evt_to_thread (ctrl_thread, session_half_open_free_rpc,
352                                   args);
353 }
354
355 void
356 session_half_open_migrate_notify (transport_connection_t *tc)
357 {
358   session_t *ho;
359
360   ho = ho_session_get (tc->s_index);
361   ho->flags |= SESSION_F_IS_MIGRATING;
362   ho->connection_index = ~0;
363 }
364
365 int
366 session_half_open_migrated_notify (transport_connection_t *tc)
367 {
368   session_t *ho;
369
370   ho = ho_session_get (tc->s_index);
371
372   /* App probably detached so the half-open must be cleaned up */
373   if (ho->session_state == SESSION_STATE_CLOSED)
374     {
375       session_half_open_delete_notify (tc);
376       return -1;
377     }
378   ho->connection_index = tc->c_index;
379   /* Overload app index for half-open with new thread */
380   ho->app_index = tc->thread_index;
381   return 0;
382 }
383
384 session_t *
385 session_alloc_for_connection (transport_connection_t * tc)
386 {
387   session_t *s;
388   u32 thread_index = tc->thread_index;
389
390   ASSERT (thread_index == vlib_get_thread_index ()
391           || transport_protocol_is_cl (tc->proto));
392
393   s = session_alloc (thread_index);
394   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
395   s->session_state = SESSION_STATE_CLOSED;
396
397   /* Attach transport to session and vice versa */
398   s->connection_index = tc->c_index;
399   tc->s_index = s->session_index;
400   return s;
401 }
402
403 session_t *
404 session_alloc_for_half_open (transport_connection_t *tc)
405 {
406   session_t *s;
407
408   s = ho_session_alloc ();
409   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
410   s->connection_index = tc->c_index;
411   tc->s_index = s->session_index;
412   return s;
413 }
414
415 /**
416  * Discards bytes from buffer chain
417  *
418  * It discards n_bytes_to_drop starting at first buffer after chain_b
419  */
420 always_inline void
421 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
422                                      vlib_buffer_t ** chain_b,
423                                      u32 n_bytes_to_drop)
424 {
425   vlib_buffer_t *next = *chain_b;
426   u32 to_drop = n_bytes_to_drop;
427   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
428   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
429     {
430       next = vlib_get_buffer (vm, next->next_buffer);
431       if (next->current_length > to_drop)
432         {
433           vlib_buffer_advance (next, to_drop);
434           to_drop = 0;
435         }
436       else
437         {
438           to_drop -= next->current_length;
439           next->current_length = 0;
440         }
441     }
442   *chain_b = next;
443
444   if (to_drop == 0)
445     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
446 }
447
448 /**
449  * Enqueue buffer chain tail
450  */
451 always_inline int
452 session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
453                             u32 offset, u8 is_in_order)
454 {
455   vlib_buffer_t *chain_b;
456   u32 chain_bi, len, diff;
457   vlib_main_t *vm = vlib_get_main ();
458   u8 *data;
459   u32 written = 0;
460   int rv = 0;
461
462   if (is_in_order && offset)
463     {
464       diff = offset - b->current_length;
465       if (diff > b->total_length_not_including_first_buffer)
466         return 0;
467       chain_b = b;
468       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
469       chain_bi = vlib_get_buffer_index (vm, chain_b);
470     }
471   else
472     chain_bi = b->next_buffer;
473
474   do
475     {
476       chain_b = vlib_get_buffer (vm, chain_bi);
477       data = vlib_buffer_get_current (chain_b);
478       len = chain_b->current_length;
479       if (!len)
480         continue;
481       if (is_in_order)
482         {
483           rv = svm_fifo_enqueue (s->rx_fifo, len, data);
484           if (rv == len)
485             {
486               written += rv;
487             }
488           else if (rv < len)
489             {
490               return (rv > 0) ? (written + rv) : written;
491             }
492           else if (rv > len)
493             {
494               written += rv;
495
496               /* written more than what was left in chain */
497               if (written > b->total_length_not_including_first_buffer)
498                 return written;
499
500               /* drop the bytes that have already been delivered */
501               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
502             }
503         }
504       else
505         {
506           rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
507           if (rv)
508             {
509               clib_warning ("failed to enqueue multi-buffer seg");
510               return -1;
511             }
512           offset += len;
513         }
514     }
515   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
516           ? chain_b->next_buffer : 0));
517
518   if (is_in_order)
519     return written;
520
521   return 0;
522 }
523
524 void
525 session_fifo_tuning (session_t * s, svm_fifo_t * f,
526                      session_ft_action_t act, u32 len)
527 {
528   if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING)
529     {
530       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
531       app_worker_session_fifo_tuning (app_wrk, s, f, act, len);
532       if (CLIB_ASSERT_ENABLE)
533         {
534           segment_manager_t *sm;
535           sm = segment_manager_get (f->segment_manager);
536           ASSERT (f->shr->size >= 4096);
537           ASSERT (f->shr->size <= sm->max_fifo_size);
538         }
539     }
540 }
541
542 /*
543  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
544  * event but on request can queue notification events for later delivery by
545  * calling stream_server_flush_enqueue_events().
546  *
547  * @param tc Transport connection which is to be enqueued data
548  * @param b Buffer to be enqueued
549  * @param offset Offset at which to start enqueueing if out-of-order
550  * @param queue_event Flag to indicate if peer is to be notified or if event
551  *                    is to be queued. The former is useful when more data is
552  *                    enqueued and only one event is to be generated.
553  * @param is_in_order Flag to indicate if data is in order
554  * @return Number of bytes enqueued or a negative value if enqueueing failed.
555  */
556 int
557 session_enqueue_stream_connection (transport_connection_t * tc,
558                                    vlib_buffer_t * b, u32 offset,
559                                    u8 queue_event, u8 is_in_order)
560 {
561   session_t *s;
562   int enqueued = 0, rv, in_order_off;
563
564   s = session_get (tc->s_index, tc->thread_index);
565
566   if (is_in_order)
567     {
568       enqueued = svm_fifo_enqueue (s->rx_fifo,
569                                    b->current_length,
570                                    vlib_buffer_get_current (b));
571       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
572                          && enqueued >= 0))
573         {
574           in_order_off = enqueued > b->current_length ? enqueued : 0;
575           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
576           if (rv > 0)
577             enqueued += rv;
578         }
579     }
580   else
581     {
582       rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
583                                          b->current_length,
584                                          vlib_buffer_get_current (b));
585       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
586         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
587       /* if something was enqueued, report even this as success for ooo
588        * segment handling */
589       return rv;
590     }
591
592   if (queue_event)
593     {
594       /* Queue RX event on this fifo. Eventually these will need to be flushed
595        * by calling stream_server_flush_enqueue_events () */
596       session_worker_t *wrk;
597
598       wrk = session_main_get_worker (s->thread_index);
599       if (!(s->flags & SESSION_F_RX_EVT))
600         {
601           s->flags |= SESSION_F_RX_EVT;
602           vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
603         }
604
605       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
606     }
607
608   return enqueued;
609 }
610
611 int
612 session_enqueue_dgram_connection (session_t * s,
613                                   session_dgram_hdr_t * hdr,
614                                   vlib_buffer_t * b, u8 proto, u8 queue_event)
615 {
616   int rv;
617
618   ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
619           >= b->current_length + sizeof (*hdr));
620
621   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
622     {
623       /* *INDENT-OFF* */
624       svm_fifo_seg_t segs[2] = {
625           { (u8 *) hdr, sizeof (*hdr) },
626           { vlib_buffer_get_current (b), b->current_length }
627       };
628       /* *INDENT-ON* */
629
630       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
631                                       0 /* allow_partial */ );
632     }
633   else
634     {
635       vlib_main_t *vm = vlib_get_main ();
636       svm_fifo_seg_t *segs = 0, *seg;
637       vlib_buffer_t *it = b;
638       u32 n_segs = 1;
639
640       vec_add2 (segs, seg, 1);
641       seg->data = (u8 *) hdr;
642       seg->len = sizeof (*hdr);
643       while (it)
644         {
645           vec_add2 (segs, seg, 1);
646           seg->data = vlib_buffer_get_current (it);
647           seg->len = it->current_length;
648           n_segs++;
649           if (!(it->flags & VLIB_BUFFER_NEXT_PRESENT))
650             break;
651           it = vlib_get_buffer (vm, it->next_buffer);
652         }
653       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, n_segs,
654                                       0 /* allow partial */ );
655       vec_free (segs);
656     }
657
658   if (queue_event && rv > 0)
659     {
660       /* Queue RX event on this fifo. Eventually these will need to be flushed
661        * by calling stream_server_flush_enqueue_events () */
662       session_worker_t *wrk;
663
664       wrk = session_main_get_worker (s->thread_index);
665       if (!(s->flags & SESSION_F_RX_EVT))
666         {
667           s->flags |= SESSION_F_RX_EVT;
668           vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
669         }
670
671       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
672     }
673   return rv > 0 ? rv : 0;
674 }
675
676 int
677 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
678                             u32 offset, u32 max_bytes)
679 {
680   session_t *s = session_get (tc->s_index, tc->thread_index);
681   return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
682 }
683
684 u32
685 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
686 {
687   session_t *s = session_get (tc->s_index, tc->thread_index);
688   u32 rv;
689
690   rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
691   session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv);
692
693   if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
694     session_dequeue_notify (s);
695
696   return rv;
697 }
698
699 static inline int
700 session_notify_subscribers (u32 app_index, session_t * s,
701                             svm_fifo_t * f, session_evt_type_t evt_type)
702 {
703   app_worker_t *app_wrk;
704   application_t *app;
705   int i;
706
707   app = application_get (app_index);
708   if (!app)
709     return -1;
710
711   for (i = 0; i < f->shr->n_subscribers; i++)
712     {
713       app_wrk = application_get_worker (app, f->shr->subscribers[i]);
714       if (!app_wrk)
715         continue;
716       if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
717         return -1;
718     }
719
720   return 0;
721 }
722
723 /**
724  * Notify session peer that new data has been enqueued.
725  *
726  * @param s     Stream session for which the event is to be generated.
727  * @param lock  Flag to indicate if call should lock message queue.
728  *
729  * @return 0 on success or negative number if failed to send notification.
730  */
731 static inline int
732 session_enqueue_notify_inline (session_t * s)
733 {
734   app_worker_t *app_wrk;
735   u32 session_index;
736   u8 n_subscribers;
737
738   session_index = s->session_index;
739   n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
740
741   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
742   if (PREDICT_FALSE (!app_wrk))
743     {
744       SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
745       return 0;
746     }
747
748   SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
749
750   s->flags &= ~SESSION_F_RX_EVT;
751
752   /* Application didn't confirm accept yet */
753   if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
754     return 0;
755
756   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
757                                                      SESSION_IO_EVT_RX)))
758     return -1;
759
760   if (PREDICT_FALSE (n_subscribers))
761     {
762       s = session_get (session_index, vlib_get_thread_index ());
763       return session_notify_subscribers (app_wrk->app_index, s,
764                                          s->rx_fifo, SESSION_IO_EVT_RX);
765     }
766
767   return 0;
768 }
769
770 int
771 session_enqueue_notify (session_t * s)
772 {
773   return session_enqueue_notify_inline (s);
774 }
775
776 static void
777 session_enqueue_notify_rpc (void *arg)
778 {
779   u32 session_index = pointer_to_uword (arg);
780   session_t *s;
781
782   s = session_get_if_valid (session_index, vlib_get_thread_index ());
783   if (!s)
784     return;
785
786   session_enqueue_notify (s);
787 }
788
789 /**
790  * Like session_enqueue_notify, but can be called from a thread that does not
791  * own the session.
792  */
793 void
794 session_enqueue_notify_thread (session_handle_t sh)
795 {
796   u32 thread_index = session_thread_from_handle (sh);
797   u32 session_index = session_index_from_handle (sh);
798
799   /*
800    * Pass session index (u32) as opposed to handle (u64) in case pointers
801    * are not 64-bit.
802    */
803   session_send_rpc_evt_to_thread (thread_index,
804                                   session_enqueue_notify_rpc,
805                                   uword_to_pointer (session_index, void *));
806 }
807
808 int
809 session_dequeue_notify (session_t * s)
810 {
811   app_worker_t *app_wrk;
812
813   svm_fifo_clear_deq_ntf (s->tx_fifo);
814
815   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
816   if (PREDICT_FALSE (!app_wrk))
817     return -1;
818
819   if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
820                                                      SESSION_IO_EVT_TX)))
821     return -1;
822
823   if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
824     return session_notify_subscribers (app_wrk->app_index, s,
825                                        s->tx_fifo, SESSION_IO_EVT_TX);
826
827   return 0;
828 }
829
830 /**
831  * Flushes queue of sessions that are to be notified of new data
832  * enqueued events.
833  *
834  * @param thread_index Thread index for which the flush is to be performed.
835  * @return 0 on success or a positive number indicating the number of
836  *         failures due to API queue being full.
837  */
838 int
839 session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
840 {
841   session_worker_t *wrk = session_main_get_worker (thread_index);
842   session_t *s;
843   int i, errors = 0;
844   u32 *indices;
845
846   indices = wrk->session_to_enqueue[transport_proto];
847
848   for (i = 0; i < vec_len (indices); i++)
849     {
850       s = session_get_if_valid (indices[i], thread_index);
851       if (PREDICT_FALSE (!s))
852         {
853           errors++;
854           continue;
855         }
856
857       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
858                            0 /* TODO/not needed */ );
859
860       if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
861         errors++;
862     }
863
864   vec_reset_length (indices);
865   wrk->session_to_enqueue[transport_proto] = indices;
866
867   return errors;
868 }
869
870 int
871 session_main_flush_all_enqueue_events (u8 transport_proto)
872 {
873   vlib_thread_main_t *vtm = vlib_get_thread_main ();
874   int i, errors = 0;
875   for (i = 0; i < 1 + vtm->n_threads; i++)
876     errors += session_main_flush_enqueue_events (transport_proto, i);
877   return errors;
878 }
879
880 int
881 session_stream_connect_notify (transport_connection_t * tc,
882                                session_error_t err)
883 {
884   u32 opaque = 0, new_ti, new_si;
885   app_worker_t *app_wrk;
886   session_t *s = 0, *ho;
887
888   /*
889    * Cleanup half-open table
890    */
891   session_lookup_del_half_open (tc);
892
893   ho = ho_session_get (tc->s_index);
894   opaque = ho->opaque;
895   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
896   if (!app_wrk)
897     return -1;
898
899   if (err)
900     return app_worker_connect_notify (app_wrk, s, err, opaque);
901
902   s = session_alloc_for_connection (tc);
903   s->session_state = SESSION_STATE_CONNECTING;
904   s->app_wrk_index = app_wrk->wrk_index;
905   new_si = s->session_index;
906   new_ti = s->thread_index;
907
908   if ((err = app_worker_init_connected (app_wrk, s)))
909     {
910       session_free (s);
911       app_worker_connect_notify (app_wrk, 0, err, opaque);
912       return -1;
913     }
914
915   s = session_get (new_si, new_ti);
916   s->session_state = SESSION_STATE_READY;
917   session_lookup_add_connection (tc, session_handle (s));
918
919   if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque))
920     {
921       session_lookup_del_connection (tc);
922       /* Avoid notifying app about rejected session cleanup */
923       s = session_get (new_si, new_ti);
924       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
925       session_free (s);
926       return -1;
927     }
928
929   return 0;
930 }
931
932 static void
933 session_switch_pool_reply (void *arg)
934 {
935   u32 session_index = pointer_to_uword (arg);
936   session_t *s;
937
938   s = session_get_if_valid (session_index, vlib_get_thread_index ());
939   if (!s)
940     return;
941
942   /* Notify app that it has data on the new session */
943   session_enqueue_notify (s);
944 }
945
946 typedef struct _session_switch_pool_args
947 {
948   u32 session_index;
949   u32 thread_index;
950   u32 new_thread_index;
951   u32 new_session_index;
952 } session_switch_pool_args_t;
953
954 /**
955  * Notify old thread of the session pool switch
956  */
957 static void
958 session_switch_pool (void *cb_args)
959 {
960   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
961   session_handle_t new_sh;
962   segment_manager_t *sm;
963   app_worker_t *app_wrk;
964   session_t *s;
965   void *rargs;
966
967   ASSERT (args->thread_index == vlib_get_thread_index ());
968   s = session_get (args->session_index, args->thread_index);
969
970   transport_cleanup (session_get_transport_proto (s), s->connection_index,
971                      s->thread_index);
972
973   new_sh = session_make_handle (args->new_session_index,
974                                 args->new_thread_index);
975
976   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
977   if (app_wrk)
978     {
979       /* Cleanup fifo segment slice state for fifos */
980       sm = app_worker_get_connect_segment_manager (app_wrk);
981       segment_manager_detach_fifo (sm, &s->rx_fifo);
982       segment_manager_detach_fifo (sm, &s->tx_fifo);
983
984       /* Notify app, using old session, about the migration event */
985       app_worker_migrate_notify (app_wrk, s, new_sh);
986     }
987
988   /* Trigger app read and fifo updates on the new thread */
989   rargs = uword_to_pointer (args->new_session_index, void *);
990   session_send_rpc_evt_to_thread (args->new_thread_index,
991                                   session_switch_pool_reply, rargs);
992
993   session_free (s);
994   clib_mem_free (cb_args);
995 }
996
997 /**
998  * Move dgram session to the right thread
999  */
1000 int
1001 session_dgram_connect_notify (transport_connection_t * tc,
1002                               u32 old_thread_index, session_t ** new_session)
1003 {
1004   session_t *new_s;
1005   session_switch_pool_args_t *rpc_args;
1006   segment_manager_t *sm;
1007   app_worker_t *app_wrk;
1008
1009   /*
1010    * Clone half-open session to the right thread.
1011    */
1012   new_s = session_clone_safe (tc->s_index, old_thread_index);
1013   new_s->connection_index = tc->c_index;
1014   new_s->session_state = SESSION_STATE_READY;
1015   new_s->flags |= SESSION_F_IS_MIGRATING;
1016
1017   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1018     session_lookup_add_connection (tc, session_handle (new_s));
1019
1020   app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
1021   if (app_wrk)
1022     {
1023       /* New set of fifos attached to the same shared memory */
1024       sm = app_worker_get_connect_segment_manager (app_wrk);
1025       segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s);
1026       segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s);
1027     }
1028
1029   /*
1030    * Ask thread owning the old session to clean it up and make us the tx
1031    * fifo owner
1032    */
1033   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
1034   rpc_args->new_session_index = new_s->session_index;
1035   rpc_args->new_thread_index = new_s->thread_index;
1036   rpc_args->session_index = tc->s_index;
1037   rpc_args->thread_index = old_thread_index;
1038   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
1039                                   rpc_args);
1040
1041   tc->s_index = new_s->session_index;
1042   new_s->connection_index = tc->c_index;
1043   *new_session = new_s;
1044   return 0;
1045 }
1046
1047 /**
1048  * Notification from transport that connection is being closed.
1049  *
1050  * A disconnect is sent to application but state is not removed. Once
1051  * disconnect is acknowledged by application, session disconnect is called.
1052  * Ultimately this leads to close being called on transport (passive close).
1053  */
1054 void
1055 session_transport_closing_notify (transport_connection_t * tc)
1056 {
1057   app_worker_t *app_wrk;
1058   session_t *s;
1059
1060   s = session_get (tc->s_index, tc->thread_index);
1061   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1062     return;
1063   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1064   app_wrk = app_worker_get (s->app_wrk_index);
1065   app_worker_close_notify (app_wrk, s);
1066 }
1067
1068 /**
1069  * Notification from transport that connection is being deleted
1070  *
1071  * This removes the session if it is still valid. It should be called only on
1072  * previously fully established sessions. For instance failed connects should
1073  * call stream_session_connect_notify and indicate that the connect has
1074  * failed.
1075  */
1076 void
1077 session_transport_delete_notify (transport_connection_t * tc)
1078 {
1079   session_t *s;
1080
1081   /* App might've been removed already */
1082   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1083     return;
1084
1085   switch (s->session_state)
1086     {
1087     case SESSION_STATE_CREATED:
1088       /* Session was created but accept notification was not yet sent to the
1089        * app. Cleanup everything. */
1090       session_lookup_del_session (s);
1091       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1092       session_free (s);
1093       break;
1094     case SESSION_STATE_ACCEPTING:
1095     case SESSION_STATE_TRANSPORT_CLOSING:
1096     case SESSION_STATE_CLOSING:
1097     case SESSION_STATE_TRANSPORT_CLOSED:
1098       /* If transport finishes or times out before we get a reply
1099        * from the app, mark transport as closed and wait for reply
1100        * before removing the session. Cleanup session table in advance
1101        * because transport will soon be closed and closed sessions
1102        * are assumed to have been removed from the lookup table */
1103       session_lookup_del_session (s);
1104       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1105       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1106       svm_fifo_dequeue_drop_all (s->tx_fifo);
1107       break;
1108     case SESSION_STATE_APP_CLOSED:
1109       /* Cleanup lookup table as transport needs to still be valid.
1110        * Program transport close to ensure that all session events
1111        * have been cleaned up. Once transport close is called, the
1112        * session is just removed because both transport and app have
1113        * confirmed the close*/
1114       session_lookup_del_session (s);
1115       s->session_state = SESSION_STATE_TRANSPORT_DELETED;
1116       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1117       svm_fifo_dequeue_drop_all (s->tx_fifo);
1118       session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1119       break;
1120     case SESSION_STATE_TRANSPORT_DELETED:
1121       break;
1122     case SESSION_STATE_CLOSED:
1123       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1124       session_delete (s);
1125       break;
1126     default:
1127       clib_warning ("session state %u", s->session_state);
1128       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
1129       session_delete (s);
1130       break;
1131     }
1132 }
1133
1134 /**
1135  * Notification from transport that it is closed
1136  *
1137  * Should be called by transport, prior to calling delete notify, once it
1138  * knows that no more data will be exchanged. This could serve as an
1139  * early acknowledgment of an active close especially if transport delete
1140  * can be delayed a long time, e.g., tcp time-wait.
1141  */
1142 void
1143 session_transport_closed_notify (transport_connection_t * tc)
1144 {
1145   app_worker_t *app_wrk;
1146   session_t *s;
1147
1148   if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
1149     return;
1150
1151   /* Transport thinks that app requested close but it actually didn't.
1152    * Can happen for tcp:
1153    * 1)if fin and rst are received in close succession.
1154    * 2)if app shutdown the connection.  */
1155   if (s->session_state == SESSION_STATE_READY)
1156     {
1157       session_transport_closing_notify (tc);
1158       svm_fifo_dequeue_drop_all (s->tx_fifo);
1159       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1160     }
1161   /* If app close has not been received or has not yet resulted in
1162    * a transport close, only mark the session transport as closed */
1163   else if (s->session_state <= SESSION_STATE_CLOSING)
1164     {
1165       s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
1166     }
1167   /* If app also closed, switch to closed */
1168   else if (s->session_state == SESSION_STATE_APP_CLOSED)
1169     s->session_state = SESSION_STATE_CLOSED;
1170
1171   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1172   if (app_wrk)
1173     app_worker_transport_closed_notify (app_wrk, s);
1174 }
1175
1176 /**
1177  * Notify application that connection has been reset.
1178  */
1179 void
1180 session_transport_reset_notify (transport_connection_t * tc)
1181 {
1182   app_worker_t *app_wrk;
1183   session_t *s;
1184
1185   s = session_get (tc->s_index, tc->thread_index);
1186   svm_fifo_dequeue_drop_all (s->tx_fifo);
1187   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1188     return;
1189   s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
1190   app_wrk = app_worker_get (s->app_wrk_index);
1191   app_worker_reset_notify (app_wrk, s);
1192 }
1193
1194 int
1195 session_stream_accept_notify (transport_connection_t * tc)
1196 {
1197   app_worker_t *app_wrk;
1198   session_t *s;
1199
1200   s = session_get (tc->s_index, tc->thread_index);
1201   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1202   if (!app_wrk)
1203     return -1;
1204   if (s->session_state != SESSION_STATE_CREATED)
1205     return 0;
1206   s->session_state = SESSION_STATE_ACCEPTING;
1207   if (app_worker_accept_notify (app_wrk, s))
1208     {
1209       /* On transport delete, no notifications should be sent. Unless, the
1210        * accept is retried and successful. */
1211       s->session_state = SESSION_STATE_CREATED;
1212       return -1;
1213     }
1214   return 0;
1215 }
1216
1217 /**
1218  * Accept a stream session. Optionally ping the server by callback.
1219  */
1220 int
1221 session_stream_accept (transport_connection_t * tc, u32 listener_index,
1222                        u32 thread_index, u8 notify)
1223 {
1224   session_t *s;
1225   int rv;
1226
1227   s = session_alloc_for_connection (tc);
1228   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1229   s->session_state = SESSION_STATE_CREATED;
1230
1231   if ((rv = app_worker_init_accepted (s)))
1232     {
1233       session_free (s);
1234       return rv;
1235     }
1236
1237   session_lookup_add_connection (tc, session_handle (s));
1238
1239   /* Shoulder-tap the server */
1240   if (notify)
1241     {
1242       app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1243       if ((rv = app_worker_accept_notify (app_wrk, s)))
1244         {
1245           session_lookup_del_session (s);
1246           segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1247           session_free (s);
1248           return rv;
1249         }
1250     }
1251
1252   return 0;
1253 }
1254
1255 int
1256 session_dgram_accept (transport_connection_t * tc, u32 listener_index,
1257                       u32 thread_index)
1258 {
1259   app_worker_t *app_wrk;
1260   session_t *s;
1261   int rv;
1262
1263   s = session_alloc_for_connection (tc);
1264   s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1265
1266   if ((rv = app_worker_init_accepted (s)))
1267     {
1268       session_free (s);
1269       return rv;
1270     }
1271
1272   session_lookup_add_connection (tc, session_handle (s));
1273
1274   app_wrk = app_worker_get (s->app_wrk_index);
1275   if ((rv = app_worker_accept_notify (app_wrk, s)))
1276     {
1277       session_lookup_del_session (s);
1278       segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1279       session_free (s);
1280       return rv;
1281     }
1282
1283   s->session_state = SESSION_STATE_READY;
1284
1285   return 0;
1286 }
1287
1288 int
1289 session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1290 {
1291   transport_connection_t *tc;
1292   transport_endpoint_cfg_t *tep;
1293   app_worker_t *app_wrk;
1294   session_handle_t sh;
1295   session_t *s;
1296   int rv;
1297
1298   tep = session_endpoint_to_transport_cfg (rmt);
1299   rv = transport_connect (rmt->transport_proto, tep);
1300   if (rv < 0)
1301     {
1302       SESSION_DBG ("Transport failed to open connection.");
1303       return rv;
1304     }
1305
1306   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1307
1308   /* For dgram type of service, allocate session and fifos now */
1309   app_wrk = app_worker_get (rmt->app_wrk_index);
1310   s = session_alloc_for_connection (tc);
1311   s->app_wrk_index = app_wrk->wrk_index;
1312   s->session_state = SESSION_STATE_OPENED;
1313   if (app_worker_init_connected (app_wrk, s))
1314     {
1315       session_free (s);
1316       return -1;
1317     }
1318
1319   sh = session_handle (s);
1320   *rsh = sh;
1321
1322   session_lookup_add_connection (tc, sh);
1323   return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
1324 }
1325
1326 int
1327 session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1328 {
1329   transport_connection_t *tc;
1330   transport_endpoint_cfg_t *tep;
1331   app_worker_t *app_wrk;
1332   session_t *ho;
1333   int rv;
1334
1335   tep = session_endpoint_to_transport_cfg (rmt);
1336   rv = transport_connect (rmt->transport_proto, tep);
1337   if (rv < 0)
1338     {
1339       SESSION_DBG ("Transport failed to open connection.");
1340       return rv;
1341     }
1342
1343   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1344
1345   app_wrk = app_worker_get (rmt->app_wrk_index);
1346
1347   /* If transport offers a vc service, only allocate established
1348    * session once the connection has been established.
1349    * In the meantime allocate half-open session for tracking purposes
1350    * associate half-open connection to it and add session to app-worker
1351    * half-open table. These are needed to allocate the established
1352    * session on transport notification, and to cleanup the half-open
1353    * session if the app detaches before connection establishment.
1354    */
1355   ho = session_alloc_for_half_open (tc);
1356   ho->app_wrk_index = app_wrk->wrk_index;
1357   ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
1358   ho->opaque = rmt->opaque;
1359   *rsh = session_handle (ho);
1360
1361   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1362     session_lookup_add_half_open (tc, tc->c_index);
1363
1364   return 0;
1365 }
1366
1367 int
1368 session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1369 {
1370   transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
1371
1372   /* Not supported for now */
1373   *rsh = SESSION_INVALID_HANDLE;
1374   return transport_connect (rmt->transport_proto, tep_cfg);
1375 }
1376
1377 typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
1378                                         session_handle_t *);
1379
1380 /* *INDENT-OFF* */
1381 static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1382   session_open_vc,
1383   session_open_cl,
1384   session_open_app,
1385 };
1386 /* *INDENT-ON* */
1387
1388 /**
1389  * Ask transport to open connection to remote transport endpoint.
1390  *
1391  * Stores handle for matching request with reply since the call can be
1392  * asynchronous. For instance, for TCP the 3-way handshake must complete
1393  * before reply comes. Session is only created once connection is established.
1394  *
1395  * @param app_index Index of the application requesting the connect
1396  * @param st Session type requested.
1397  * @param tep Remote transport endpoint
1398  * @param opaque Opaque data (typically, api_context) the application expects
1399  *               on open completion.
1400  */
1401 int
1402 session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
1403 {
1404   transport_service_type_t tst;
1405   tst = transport_protocol_service_type (rmt->transport_proto);
1406   return session_open_srv_fns[tst](rmt, rsh);
1407 }
1408
1409 /**
1410  * Ask transport to listen on session endpoint.
1411  *
1412  * @param s Session for which listen will be called. Note that unlike
1413  *          established sessions, listen sessions are not associated to a
1414  *          thread.
1415  * @param sep Local endpoint to be listened on.
1416  */
1417 int
1418 session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1419 {
1420   transport_endpoint_t *tep;
1421   int tc_index;
1422   u32 s_index;
1423
1424   /* Transport bind/listen */
1425   tep = session_endpoint_to_transport (sep);
1426   s_index = ls->session_index;
1427   tc_index = transport_start_listen (session_get_transport_proto (ls),
1428                                      s_index, tep);
1429
1430   if (tc_index < 0)
1431     return tc_index;
1432
1433   /* Attach transport to session. Lookup tables are populated by the app
1434    * worker because local tables (for ct sessions) are not backed by a fib */
1435   ls = listen_session_get (s_index);
1436   ls->connection_index = tc_index;
1437
1438   return 0;
1439 }
1440
1441 /**
1442  * Ask transport to stop listening on local transport endpoint.
1443  *
1444  * @param s Session to stop listening on. It must be in state LISTENING.
1445  */
1446 int
1447 session_stop_listen (session_t * s)
1448 {
1449   transport_proto_t tp = session_get_transport_proto (s);
1450   transport_connection_t *tc;
1451
1452   if (s->session_state != SESSION_STATE_LISTENING)
1453     return SESSION_E_NOLISTEN;
1454
1455   tc = transport_get_listener (tp, s->connection_index);
1456
1457   /* If no transport, assume everything was cleaned up already */
1458   if (!tc)
1459     return SESSION_E_NONE;
1460
1461   if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1462     session_lookup_del_connection (tc);
1463
1464   transport_stop_listen (tp, s->connection_index);
1465   return 0;
1466 }
1467
1468 /**
1469  * Initialize session half-closing procedure.
1470  *
1471  * Note that half-closing will not change the state of the session.
1472  */
1473 void
1474 session_half_close (session_t *s)
1475 {
1476   if (!s)
1477     return;
1478
1479   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
1480 }
1481
1482 /**
1483  * Initialize session closing procedure.
1484  *
1485  * Request is always sent to session node to ensure that all outstanding
1486  * requests are served before transport is notified.
1487  */
1488 void
1489 session_close (session_t * s)
1490 {
1491   if (!s)
1492     return;
1493
1494   if (s->session_state >= SESSION_STATE_CLOSING)
1495     {
1496       /* Session will only be removed once both app and transport
1497        * acknowledge the close */
1498       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1499           || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1500         session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1501       return;
1502     }
1503
1504   s->session_state = SESSION_STATE_CLOSING;
1505   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1506 }
1507
1508 /**
1509  * Force a close without waiting for data to be flushed
1510  */
1511 void
1512 session_reset (session_t * s)
1513 {
1514   if (s->session_state >= SESSION_STATE_CLOSING)
1515     return;
1516   /* Drop all outstanding tx data */
1517   svm_fifo_dequeue_drop_all (s->tx_fifo);
1518   s->session_state = SESSION_STATE_CLOSING;
1519   session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1520 }
1521
1522 /**
1523  * Notify transport the session can be half-disconnected.
1524  *
1525  * Must be called from the session's thread.
1526  */
1527 void
1528 session_transport_half_close (session_t *s)
1529 {
1530   /* Only READY session can be half-closed */
1531   if (s->session_state != SESSION_STATE_READY)
1532     {
1533       return;
1534     }
1535
1536   transport_half_close (session_get_transport_proto (s), s->connection_index,
1537                         s->thread_index);
1538 }
1539
1540 /**
1541  * Notify transport the session can be disconnected. This should eventually
1542  * result in a delete notification that allows us to cleanup session state.
1543  * Called for both active/passive disconnects.
1544  *
1545  * Must be called from the session's thread.
1546  */
1547 void
1548 session_transport_close (session_t * s)
1549 {
1550   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1551     {
1552       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1553         s->session_state = SESSION_STATE_CLOSED;
1554       /* If transport is already deleted, just free the session */
1555       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1556         session_free_w_fifos (s);
1557       return;
1558     }
1559
1560   /* If the tx queue wasn't drained, the transport can continue to try
1561    * sending the outstanding data (in closed state it cannot). It MUST however
1562    * at one point, either after sending everything or after a timeout, call
1563    * delete notify. This will finally lead to the complete cleanup of the
1564    * session.
1565    */
1566   s->session_state = SESSION_STATE_APP_CLOSED;
1567
1568   transport_close (session_get_transport_proto (s), s->connection_index,
1569                    s->thread_index);
1570 }
1571
1572 /**
1573  * Force transport close
1574  */
1575 void
1576 session_transport_reset (session_t * s)
1577 {
1578   if (s->session_state >= SESSION_STATE_APP_CLOSED)
1579     {
1580       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1581         s->session_state = SESSION_STATE_CLOSED;
1582       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1583         session_free_w_fifos (s);
1584       return;
1585     }
1586
1587   s->session_state = SESSION_STATE_APP_CLOSED;
1588   transport_reset (session_get_transport_proto (s), s->connection_index,
1589                    s->thread_index);
1590 }
1591
1592 /**
1593  * Cleanup transport and session state.
1594  *
1595  * Notify transport of the cleanup and free the session. This should
1596  * be called only if transport reported some error and is already
1597  * closed.
1598  */
1599 void
1600 session_transport_cleanup (session_t * s)
1601 {
1602   /* Delete from main lookup table before we axe the the transport */
1603   session_lookup_del_session (s);
1604   if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1605     transport_cleanup (session_get_transport_proto (s), s->connection_index,
1606                        s->thread_index);
1607   /* Since we called cleanup, no delete notification will come. So, make
1608    * sure the session is properly freed. */
1609   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
1610   session_free (s);
1611 }
1612
1613 /**
1614  * Allocate event queues in the shared-memory segment
1615  *
1616  * That can only be a newly created memfd segment, that must be
1617  * mapped by all apps/stack users.
1618  */
1619 void
1620 session_vpp_event_queues_allocate (session_main_t * smm)
1621 {
1622   u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1623   fifo_segment_t *eqs = &smm->evt_qs_segment;
1624   uword eqs_size = 64 << 20;
1625   pid_t vpp_pid = getpid ();
1626   int i;
1627
1628   if (smm->configured_event_queue_length)
1629     evt_q_length = smm->configured_event_queue_length;
1630
1631   if (smm->evt_qs_segment_size)
1632     eqs_size = smm->evt_qs_segment_size;
1633
1634   eqs->ssvm.ssvm_size = eqs_size;
1635   eqs->ssvm.my_pid = vpp_pid;
1636   eqs->ssvm.name = format (0, "%s%c", "session: evt-qs-segment", 0);
1637   /* clib_mem_vm_map_shared consumes first page before requested_va */
1638   eqs->ssvm.requested_va = smm->session_baseva + clib_mem_get_page_size ();
1639
1640   if (ssvm_server_init (&eqs->ssvm, SSVM_SEGMENT_MEMFD))
1641     {
1642       clib_warning ("failed to initialize queue segment");
1643       return;
1644     }
1645
1646   fifo_segment_init (eqs);
1647
1648   /* Special fifo segment that's filled only with mqs */
1649   eqs->h->n_mqs = vec_len (smm->wrk);
1650
1651   for (i = 0; i < vec_len (smm->wrk); i++)
1652     {
1653       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1654       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1655         {evt_q_length, evt_size, 0}
1656         ,
1657         {evt_q_length >> 1, 256, 0}
1658       };
1659       cfg->consumer_pid = 0;
1660       cfg->n_rings = 2;
1661       cfg->q_nitems = evt_q_length;
1662       cfg->ring_cfgs = rc;
1663
1664       smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (eqs, i, cfg);
1665     }
1666 }
1667
1668 fifo_segment_t *
1669 session_main_get_evt_q_segment (void)
1670 {
1671   return &session_main.evt_qs_segment;
1672 }
1673
1674 u64
1675 session_segment_handle (session_t * s)
1676 {
1677   svm_fifo_t *f;
1678
1679   if (!s->rx_fifo)
1680     return SESSION_INVALID_HANDLE;
1681
1682   f = s->rx_fifo;
1683   return segment_manager_make_segment_handle (f->segment_manager,
1684                                               f->segment_index);
1685 }
1686
1687 /* *INDENT-OFF* */
1688 static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1689     session_tx_fifo_peek_and_snd,
1690     session_tx_fifo_dequeue_and_snd,
1691     session_tx_fifo_dequeue_internal,
1692     session_tx_fifo_dequeue_and_snd
1693 };
1694 /* *INDENT-ON* */
1695
1696 void
1697 session_register_transport (transport_proto_t transport_proto,
1698                             const transport_proto_vft_t * vft, u8 is_ip4,
1699                             u32 output_node)
1700 {
1701   session_main_t *smm = &session_main;
1702   session_type_t session_type;
1703   u32 next_index = ~0;
1704
1705   session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1706
1707   vec_validate (smm->session_type_to_next, session_type);
1708   vec_validate (smm->session_tx_fns, session_type);
1709
1710   if (output_node != ~0)
1711     next_index = vlib_node_add_next (vlib_get_main (),
1712                                      session_queue_node.index, output_node);
1713
1714   smm->session_type_to_next[session_type] = next_index;
1715   smm->session_tx_fns[session_type] =
1716     session_tx_fns[vft->transport_options.tx_type];
1717 }
1718
1719 transport_proto_t
1720 session_add_transport_proto (void)
1721 {
1722   session_main_t *smm = &session_main;
1723   session_worker_t *wrk;
1724   u32 thread;
1725
1726   smm->last_transport_proto_type += 1;
1727
1728   for (thread = 0; thread < vec_len (smm->wrk); thread++)
1729     {
1730       wrk = session_main_get_worker (thread);
1731       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1732     }
1733
1734   return smm->last_transport_proto_type;
1735 }
1736
1737 transport_connection_t *
1738 session_get_transport (session_t * s)
1739 {
1740   if (s->session_state != SESSION_STATE_LISTENING)
1741     return transport_get_connection (session_get_transport_proto (s),
1742                                      s->connection_index, s->thread_index);
1743   else
1744     return transport_get_listener (session_get_transport_proto (s),
1745                                    s->connection_index);
1746 }
1747
1748 void
1749 session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1750 {
1751   if (s->session_state != SESSION_STATE_LISTENING)
1752     return transport_get_endpoint (session_get_transport_proto (s),
1753                                    s->connection_index, s->thread_index, tep,
1754                                    is_lcl);
1755   else
1756     return transport_get_listener_endpoint (session_get_transport_proto (s),
1757                                             s->connection_index, tep, is_lcl);
1758 }
1759
1760 int
1761 session_transport_attribute (session_t *s, u8 is_get,
1762                              transport_endpt_attr_t *attr)
1763 {
1764   if (s->session_state < SESSION_STATE_READY)
1765     return -1;
1766
1767   return transport_connection_attribute (session_get_transport_proto (s),
1768                                          s->connection_index, s->thread_index,
1769                                          is_get, attr);
1770 }
1771
1772 transport_connection_t *
1773 listen_session_get_transport (session_t * s)
1774 {
1775   return transport_get_listener (session_get_transport_proto (s),
1776                                  s->connection_index);
1777 }
1778
1779 void
1780 session_queue_run_on_main_thread (vlib_main_t * vm)
1781 {
1782   ASSERT (vlib_get_thread_index () == 0);
1783   vlib_node_set_interrupt_pending (vm, session_queue_node.index);
1784 }
1785
1786 static clib_error_t *
1787 session_manager_main_enable (vlib_main_t * vm)
1788 {
1789   session_main_t *smm = &session_main;
1790   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1791   u32 num_threads, preallocated_sessions_per_worker;
1792   session_worker_t *wrk;
1793   int i;
1794
1795   /* We only initialize once and do not de-initialized on disable */
1796   if (smm->is_initialized)
1797     goto done;
1798
1799   num_threads = 1 /* main thread */  + vtm->n_threads;
1800
1801   if (num_threads < 1)
1802     return clib_error_return (0, "n_thread_stacks not set");
1803
1804   /* Allocate cache line aligned worker contexts */
1805   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1806
1807   for (i = 0; i < num_threads; i++)
1808     {
1809       wrk = &smm->wrk[i];
1810       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1811       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1812       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1813       wrk->vm = vlib_get_main_by_index (i);
1814       wrk->last_vlib_time = vlib_time_now (vm);
1815       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1816       vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
1817
1818       if (num_threads > 1)
1819         clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1820
1821       if (!smm->no_adaptive && smm->use_private_rx_mqs)
1822         session_wrk_enable_adaptive_mode (wrk);
1823     }
1824
1825   /* Allocate vpp event queues segment and queue */
1826   session_vpp_event_queues_allocate (smm);
1827
1828   /* Initialize segment manager properties */
1829   segment_manager_main_init ();
1830
1831   /* Preallocate sessions */
1832   if (smm->preallocated_sessions)
1833     {
1834       if (num_threads == 1)
1835         {
1836           pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1837         }
1838       else
1839         {
1840           int j;
1841           preallocated_sessions_per_worker =
1842             (1.1 * (f64) smm->preallocated_sessions /
1843              (f64) (num_threads - 1));
1844
1845           for (j = 1; j < num_threads; j++)
1846             {
1847               pool_init_fixed (smm->wrk[j].sessions,
1848                                preallocated_sessions_per_worker);
1849             }
1850         }
1851     }
1852
1853   session_lookup_init ();
1854   app_namespaces_init ();
1855   transport_init ();
1856   smm->is_initialized = 1;
1857
1858 done:
1859
1860   smm->is_enabled = 1;
1861
1862   /* Enable transports */
1863   transport_enable_disable (vm, 1);
1864   session_debug_init ();
1865
1866   return 0;
1867 }
1868
1869 static void
1870 session_manager_main_disable (vlib_main_t * vm)
1871 {
1872   transport_enable_disable (vm, 0 /* is_en */ );
1873 }
1874
1875 void
1876 session_node_enable_disable (u8 is_en)
1877 {
1878   u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
1879   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1880   session_main_t *sm = &session_main;
1881   vlib_main_t *vm;
1882   vlib_node_t *n;
1883   int n_vlibs, i;
1884
1885   n_vlibs = vlib_get_n_threads ();
1886   for (i = 0; i < n_vlibs; i++)
1887     {
1888       vm = vlib_get_main_by_index (i);
1889       /* main thread with workers and not polling */
1890       if (i == 0 && n_vlibs > 1)
1891         {
1892           vlib_node_set_state (vm, session_queue_node.index, mstate);
1893           if (is_en)
1894             {
1895               vlib_node_set_state (vm, session_queue_process_node.index,
1896                                    state);
1897               n = vlib_get_node (vm, session_queue_process_node.index);
1898               vlib_start_process (vm, n->runtime_index);
1899             }
1900           else
1901             {
1902               vlib_process_signal_event_mt (vm,
1903                                             session_queue_process_node.index,
1904                                             SESSION_Q_PROCESS_STOP, 0);
1905             }
1906           if (!sm->poll_main)
1907             continue;
1908         }
1909       vlib_node_set_state (vm, session_queue_node.index, state);
1910     }
1911
1912   if (sm->use_private_rx_mqs)
1913     application_enable_rx_mqs_nodes (is_en);
1914 }
1915
1916 clib_error_t *
1917 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1918 {
1919   clib_error_t *error = 0;
1920   if (is_en)
1921     {
1922       if (session_main.is_enabled)
1923         return 0;
1924
1925       error = session_manager_main_enable (vm);
1926       session_node_enable_disable (is_en);
1927     }
1928   else
1929     {
1930       session_main.is_enabled = 0;
1931       session_manager_main_disable (vm);
1932       session_node_enable_disable (is_en);
1933     }
1934
1935   return error;
1936 }
1937
1938 clib_error_t *
1939 session_main_init (vlib_main_t * vm)
1940 {
1941   session_main_t *smm = &session_main;
1942
1943   smm->is_enabled = 0;
1944   smm->session_enable_asap = 0;
1945   smm->poll_main = 0;
1946   smm->use_private_rx_mqs = 0;
1947   smm->no_adaptive = 0;
1948   smm->session_baseva = HIGH_SEGMENT_BASEVA;
1949
1950 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1951   smm->session_va_space_size = 128ULL << 30;
1952   smm->evt_qs_segment_size = 64 << 20;
1953 #else
1954   smm->session_va_space_size = 128 << 20;
1955   smm->evt_qs_segment_size = 1 << 20;
1956 #endif
1957
1958   smm->last_transport_proto_type = TRANSPORT_PROTO_DTLS;
1959
1960   return 0;
1961 }
1962
1963 static clib_error_t *
1964 session_main_loop_init (vlib_main_t * vm)
1965 {
1966   session_main_t *smm = &session_main;
1967   if (smm->session_enable_asap)
1968     {
1969       vlib_worker_thread_barrier_sync (vm);
1970       vnet_session_enable_disable (vm, 1 /* is_en */ );
1971       vlib_worker_thread_barrier_release (vm);
1972     }
1973   return 0;
1974 }
1975
1976 VLIB_INIT_FUNCTION (session_main_init);
1977 VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init);
1978
1979 static clib_error_t *
1980 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1981 {
1982   session_main_t *smm = &session_main;
1983   u32 nitems;
1984   uword tmp;
1985
1986   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1987     {
1988       if (unformat (input, "event-queue-length %d", &nitems))
1989         {
1990           if (nitems >= 2048)
1991             smm->configured_event_queue_length = nitems;
1992           else
1993             clib_warning ("event queue length %d too small, ignored", nitems);
1994         }
1995       else if (unformat (input, "preallocated-sessions %d",
1996                          &smm->preallocated_sessions))
1997         ;
1998       else if (unformat (input, "v4-session-table-buckets %d",
1999                          &smm->configured_v4_session_table_buckets))
2000         ;
2001       else if (unformat (input, "v4-halfopen-table-buckets %d",
2002                          &smm->configured_v4_halfopen_table_buckets))
2003         ;
2004       else if (unformat (input, "v6-session-table-buckets %d",
2005                          &smm->configured_v6_session_table_buckets))
2006         ;
2007       else if (unformat (input, "v6-halfopen-table-buckets %d",
2008                          &smm->configured_v6_halfopen_table_buckets))
2009         ;
2010       else if (unformat (input, "v4-session-table-memory %U",
2011                          unformat_memory_size, &tmp))
2012         {
2013           if (tmp >= 0x100000000)
2014             return clib_error_return (0, "memory size %llx (%lld) too large",
2015                                       tmp, tmp);
2016           smm->configured_v4_session_table_memory = tmp;
2017         }
2018       else if (unformat (input, "v4-halfopen-table-memory %U",
2019                          unformat_memory_size, &tmp))
2020         {
2021           if (tmp >= 0x100000000)
2022             return clib_error_return (0, "memory size %llx (%lld) too large",
2023                                       tmp, tmp);
2024           smm->configured_v4_halfopen_table_memory = tmp;
2025         }
2026       else if (unformat (input, "v6-session-table-memory %U",
2027                          unformat_memory_size, &tmp))
2028         {
2029           if (tmp >= 0x100000000)
2030             return clib_error_return (0, "memory size %llx (%lld) too large",
2031                                       tmp, tmp);
2032           smm->configured_v6_session_table_memory = tmp;
2033         }
2034       else if (unformat (input, "v6-halfopen-table-memory %U",
2035                          unformat_memory_size, &tmp))
2036         {
2037           if (tmp >= 0x100000000)
2038             return clib_error_return (0, "memory size %llx (%lld) too large",
2039                                       tmp, tmp);
2040           smm->configured_v6_halfopen_table_memory = tmp;
2041         }
2042       else if (unformat (input, "local-endpoints-table-memory %U",
2043                          unformat_memory_size, &tmp))
2044         {
2045           if (tmp >= 0x100000000)
2046             return clib_error_return (0, "memory size %llx (%lld) too large",
2047                                       tmp, tmp);
2048           smm->local_endpoints_table_memory = tmp;
2049         }
2050       else if (unformat (input, "local-endpoints-table-buckets %d",
2051                          &smm->local_endpoints_table_buckets))
2052         ;
2053       /* Deprecated but maintained for compatibility */
2054       else if (unformat (input, "evt_qs_memfd_seg"))
2055         ;
2056       else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
2057                          &smm->evt_qs_segment_size))
2058         ;
2059       else if (unformat (input, "enable"))
2060         smm->session_enable_asap = 1;
2061       else if (unformat (input, "segment-baseva 0x%lx", &smm->session_baseva))
2062         ;
2063       else if (unformat (input, "use-app-socket-api"))
2064         appns_sapi_enable ();
2065       else if (unformat (input, "poll-main"))
2066         smm->poll_main = 1;
2067       else if (unformat (input, "use-private-rx-mqs"))
2068         smm->use_private_rx_mqs = 1;
2069       else if (unformat (input, "no-adaptive"))
2070         smm->no_adaptive = 1;
2071       else
2072         return clib_error_return (0, "unknown input `%U'",
2073                                   format_unformat_error, input);
2074     }
2075   return 0;
2076 }
2077
2078 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
2079
2080 /*
2081  * fd.io coding-style-patch-verification: ON
2082  *
2083  * Local Variables:
2084  * eval: (c-set-style "gnu")
2085  * End:
2086  */