616bfc9f1508a2f13d07185694345bd0ddeaad45
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vlibmemory/api.h>
24 #include <vnet/dpo/load_balance.h>
25 #include <vnet/fib/ip4_fib.h>
26 #include <vnet/tcp/tcp.h>
27
28 session_manager_main_t session_manager_main;
29 extern transport_proto_vft_t *tp_vfts;
30
31 static void
32 session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
33                             u32 thread_index, void *fp, void *rpc_args)
34 {
35   u32 tries = 0;
36   session_fifo_event_t evt = { {0}, };
37   unix_shared_memory_queue_t *q;
38
39   evt.event_type = evt_type;
40   if (evt_type == FIFO_EVENT_RPC)
41     {
42       evt.rpc_args.fp = fp;
43       evt.rpc_args.arg = rpc_args;
44     }
45   else
46     evt.session_handle = session_handle;
47
48   q = session_manager_get_vpp_event_queue (thread_index);
49   while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1))
50     {
51       if (tries++ == 3)
52         {
53           SESSION_DBG ("failed to enqueue evt");
54           break;
55         }
56     }
57 }
58
59 void
60 session_send_session_evt_to_thread (u64 session_handle,
61                                     fifo_event_type_t evt_type,
62                                     u32 thread_index)
63 {
64   session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0);
65 }
66
67 void
68 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
69 {
70   if (thread_index != vlib_get_thread_index ())
71     session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp,
72                                 rpc_args);
73   else
74     {
75       void (*fnp) (void *) = fp;
76       fnp (rpc_args);
77     }
78 }
79
80 stream_session_t *
81 session_alloc (u32 thread_index)
82 {
83   session_manager_main_t *smm = &session_manager_main;
84   stream_session_t *s;
85   u8 will_expand = 0;
86   pool_get_aligned_will_expand (smm->sessions[thread_index], will_expand,
87                                 CLIB_CACHE_LINE_BYTES);
88   /* If we have peekers, let them finish */
89   if (PREDICT_FALSE (will_expand))
90     {
91       clib_spinlock_lock_if_init (&smm->peekers_write_locks[thread_index]);
92       pool_get_aligned (session_manager_main.sessions[thread_index], s,
93                         CLIB_CACHE_LINE_BYTES);
94       clib_spinlock_unlock_if_init (&smm->peekers_write_locks[thread_index]);
95     }
96   else
97     {
98       pool_get_aligned (session_manager_main.sessions[thread_index], s,
99                         CLIB_CACHE_LINE_BYTES);
100     }
101   memset (s, 0, sizeof (*s));
102   s->session_index = s - session_manager_main.sessions[thread_index];
103   s->thread_index = thread_index;
104   return s;
105 }
106
107 static void
108 session_free (stream_session_t * s)
109 {
110   pool_put (session_manager_main.sessions[s->thread_index], s);
111   if (CLIB_DEBUG)
112     memset (s, 0xFA, sizeof (*s));
113 }
114
115 static int
116 session_alloc_fifos (segment_manager_t * sm, stream_session_t * s)
117 {
118   svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
119   u32 fifo_segment_index;
120   int rv;
121
122   if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
123                                                  &server_tx_fifo,
124                                                  &fifo_segment_index)))
125     return rv;
126   /* Initialize backpointers */
127   server_rx_fifo->master_session_index = s->session_index;
128   server_rx_fifo->master_thread_index = s->thread_index;
129
130   server_tx_fifo->master_session_index = s->session_index;
131   server_tx_fifo->master_thread_index = s->thread_index;
132
133   s->server_rx_fifo = server_rx_fifo;
134   s->server_tx_fifo = server_tx_fifo;
135   s->svm_segment_index = fifo_segment_index;
136   return 0;
137 }
138
139 static stream_session_t *
140 session_alloc_for_connection (transport_connection_t * tc)
141 {
142   stream_session_t *s;
143   u32 thread_index = tc->thread_index;
144
145   ASSERT (thread_index == vlib_get_thread_index ());
146
147   s = session_alloc (thread_index);
148   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
149   s->session_state = SESSION_STATE_CONNECTING;
150   s->enqueue_epoch = ~0;
151
152   /* Attach transport to session and vice versa */
153   s->connection_index = tc->c_index;
154   tc->s_index = s->session_index;
155   return s;
156 }
157
158 static int
159 session_alloc_and_init (segment_manager_t * sm, transport_connection_t * tc,
160                         u8 alloc_fifos, stream_session_t ** ret_s)
161 {
162   stream_session_t *s;
163   int rv;
164
165   s = session_alloc_for_connection (tc);
166   if (alloc_fifos && (rv = session_alloc_fifos (sm, s)))
167     {
168       session_free (s);
169       *ret_s = 0;
170       return rv;
171     }
172
173   /* Add to the main lookup table */
174   session_lookup_add_connection (tc, session_handle (s));
175
176   *ret_s = s;
177   return 0;
178 }
179
180 /**
181  * Discards bytes from buffer chain
182  *
183  * It discards n_bytes_to_drop starting at first buffer after chain_b
184  */
185 always_inline void
186 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
187                                      vlib_buffer_t ** chain_b,
188                                      u32 n_bytes_to_drop)
189 {
190   vlib_buffer_t *next = *chain_b;
191   u32 to_drop = n_bytes_to_drop;
192   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
193   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
194     {
195       next = vlib_get_buffer (vm, next->next_buffer);
196       if (next->current_length > to_drop)
197         {
198           vlib_buffer_advance (next, to_drop);
199           to_drop = 0;
200         }
201       else
202         {
203           to_drop -= next->current_length;
204           next->current_length = 0;
205         }
206     }
207   *chain_b = next;
208
209   if (to_drop == 0)
210     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
211 }
212
213 /**
214  * Enqueue buffer chain tail
215  */
216 always_inline int
217 session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
218                             u32 offset, u8 is_in_order)
219 {
220   vlib_buffer_t *chain_b;
221   u32 chain_bi, len, diff;
222   vlib_main_t *vm = vlib_get_main ();
223   u8 *data;
224   u32 written = 0;
225   int rv = 0;
226
227   if (is_in_order && offset)
228     {
229       diff = offset - b->current_length;
230       if (diff > b->total_length_not_including_first_buffer)
231         return 0;
232       chain_b = b;
233       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
234       chain_bi = vlib_get_buffer_index (vm, chain_b);
235     }
236   else
237     chain_bi = b->next_buffer;
238
239   do
240     {
241       chain_b = vlib_get_buffer (vm, chain_bi);
242       data = vlib_buffer_get_current (chain_b);
243       len = chain_b->current_length;
244       if (!len)
245         continue;
246       if (is_in_order)
247         {
248           rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
249           if (rv == len)
250             {
251               written += rv;
252             }
253           else if (rv < len)
254             {
255               return (rv > 0) ? (written + rv) : written;
256             }
257           else if (rv > len)
258             {
259               written += rv;
260
261               /* written more than what was left in chain */
262               if (written > b->total_length_not_including_first_buffer)
263                 return written;
264
265               /* drop the bytes that have already been delivered */
266               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
267             }
268         }
269       else
270         {
271           rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
272                                              data);
273           if (rv)
274             {
275               clib_warning ("failed to enqueue multi-buffer seg");
276               return -1;
277             }
278           offset += len;
279         }
280     }
281   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
282           ? chain_b->next_buffer : 0));
283
284   if (is_in_order)
285     return written;
286
287   return 0;
288 }
289
290 /*
291  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
292  * event but on request can queue notification events for later delivery by
293  * calling stream_server_flush_enqueue_events().
294  *
295  * @param tc Transport connection which is to be enqueued data
296  * @param b Buffer to be enqueued
297  * @param offset Offset at which to start enqueueing if out-of-order
298  * @param queue_event Flag to indicate if peer is to be notified or if event
299  *                    is to be queued. The former is useful when more data is
300  *                    enqueued and only one event is to be generated.
301  * @param is_in_order Flag to indicate if data is in order
302  * @return Number of bytes enqueued or a negative value if enqueueing failed.
303  */
304 int
305 session_enqueue_stream_connection (transport_connection_t * tc,
306                                    vlib_buffer_t * b, u32 offset,
307                                    u8 queue_event, u8 is_in_order)
308 {
309   stream_session_t *s;
310   int enqueued = 0, rv, in_order_off;
311
312   s = session_get (tc->s_index, tc->thread_index);
313
314   if (is_in_order)
315     {
316       enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
317                                           b->current_length,
318                                           vlib_buffer_get_current (b));
319       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
320                          && enqueued >= 0))
321         {
322           in_order_off = enqueued > b->current_length ? enqueued : 0;
323           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
324           if (rv > 0)
325             enqueued += rv;
326         }
327     }
328   else
329     {
330       rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
331                                          b->current_length,
332                                          vlib_buffer_get_current (b));
333       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
334         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
335       /* if something was enqueued, report even this as success for ooo
336        * segment handling */
337       return rv;
338     }
339
340   if (queue_event)
341     {
342       /* Queue RX event on this fifo. Eventually these will need to be flushed
343        * by calling stream_server_flush_enqueue_events () */
344       session_manager_main_t *smm = vnet_get_session_manager_main ();
345       u32 thread_index = s->thread_index;
346       u32 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index];
347
348       if (s->enqueue_epoch != enqueue_epoch)
349         {
350           s->enqueue_epoch = enqueue_epoch;
351           vec_add1 (smm->session_to_enqueue[tc->proto][thread_index],
352                     s - smm->sessions[thread_index]);
353         }
354     }
355
356   return enqueued;
357 }
358
359 int
360 session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b,
361                                   u8 proto, u8 queue_event)
362 {
363   int enqueued = 0, rv, in_order_off;
364
365   if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length)
366     return -1;
367   enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
368                                       vlib_buffer_get_current (b));
369   if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
370     {
371       in_order_off = enqueued > b->current_length ? enqueued : 0;
372       rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
373       if (rv > 0)
374         enqueued += rv;
375     }
376   if (queue_event)
377     {
378       /* Queue RX event on this fifo. Eventually these will need to be flushed
379        * by calling stream_server_flush_enqueue_events () */
380       session_manager_main_t *smm = vnet_get_session_manager_main ();
381       u32 thread_index = s->thread_index;
382       u32 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index];
383
384       if (s->enqueue_epoch != enqueue_epoch)
385         {
386           s->enqueue_epoch = enqueue_epoch;
387           vec_add1 (smm->session_to_enqueue[proto][thread_index],
388                     s - smm->sessions[thread_index]);
389         }
390     }
391   return enqueued;
392 }
393
394 /** Check if we have space in rx fifo to push more bytes */
395 u8
396 stream_session_no_space (transport_connection_t * tc, u32 thread_index,
397                          u16 data_len)
398 {
399   stream_session_t *s = session_get (tc->s_index, thread_index);
400
401   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
402     return 1;
403
404   if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
405     return 1;
406
407   return 0;
408 }
409
410 u32
411 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
412 {
413   stream_session_t *s = session_get (tc->s_index, tc->thread_index);
414   if (!s->server_tx_fifo)
415     return 0;
416   return svm_fifo_max_dequeue (s->server_tx_fifo);
417 }
418
419 int
420 stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
421                            u32 offset, u32 max_bytes)
422 {
423   stream_session_t *s = session_get (tc->s_index, tc->thread_index);
424   return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
425 }
426
427 u32
428 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
429 {
430   stream_session_t *s = session_get (tc->s_index, tc->thread_index);
431   return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
432 }
433
434 /**
435  * Notify session peer that new data has been enqueued.
436  *
437  * @param s Stream session for which the event is to be generated.
438  * @param block Flag to indicate if call should block if event queue is full.
439  *
440  * @return 0 on succes or negative number if failed to send notification.
441  */
442 static int
443 session_enqueue_notify (stream_session_t * s, u8 block)
444 {
445   application_t *app;
446   session_fifo_event_t evt;
447   unix_shared_memory_queue_t *q;
448
449   if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
450     {
451       /* Session is closed so app will never clean up. Flush rx fifo */
452       u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
453       if (to_dequeue)
454         svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
455       return 0;
456     }
457
458   /* Get session's server */
459   app = application_get_if_valid (s->app_index);
460
461   if (PREDICT_FALSE (app == 0))
462     {
463       clib_warning ("invalid s->app_index = %d", s->app_index);
464       return 0;
465     }
466
467   /* Built-in server? Hand event to the callback... */
468   if (app->cb_fns.builtin_server_rx_callback)
469     return app->cb_fns.builtin_server_rx_callback (s);
470
471   /* If no event, send one */
472   if (svm_fifo_set_event (s->server_rx_fifo))
473     {
474       /* Fabricate event */
475       evt.fifo = s->server_rx_fifo;
476       evt.event_type = FIFO_EVENT_APP_RX;
477
478       /* Add event to server's event queue */
479       q = app->event_queue;
480
481       /* Based on request block (or not) for lack of space */
482       if (block || PREDICT_TRUE (q->cursize < q->maxsize))
483         unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
484                                       0 /* do wait for mutex */ );
485       else
486         {
487           clib_warning ("fifo full");
488           return -1;
489         }
490     }
491
492   /* *INDENT-OFF* */
493   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
494       ed->data[0] = evt.event_type;
495       ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
496   }));
497   /* *INDENT-ON* */
498
499   return 0;
500 }
501
502 /**
503  * Flushes queue of sessions that are to be notified of new data
504  * enqueued events.
505  *
506  * @param thread_index Thread index for which the flush is to be performed.
507  * @return 0 on success or a positive number indicating the number of
508  *         failures due to API queue being full.
509  */
510 int
511 session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
512 {
513   session_manager_main_t *smm = &session_manager_main;
514   u32 *indices;
515   stream_session_t *s;
516   int i, errors = 0;
517
518   indices = smm->session_to_enqueue[transport_proto][thread_index];
519
520   for (i = 0; i < vec_len (indices); i++)
521     {
522       s = session_get_if_valid (indices[i], thread_index);
523       if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ ))
524         errors++;
525     }
526
527   vec_reset_length (indices);
528   smm->session_to_enqueue[transport_proto][thread_index] = indices;
529   smm->current_enqueue_epoch[transport_proto][thread_index]++;
530
531   return errors;
532 }
533
534 /**
535  * Init fifo tail and head pointers
536  *
537  * Useful if transport uses absolute offsets for tracking ooo segments.
538  */
539 void
540 stream_session_init_fifos_pointers (transport_connection_t * tc,
541                                     u32 rx_pointer, u32 tx_pointer)
542 {
543   stream_session_t *s;
544   s = session_get (tc->s_index, tc->thread_index);
545   svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
546   svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
547 }
548
549 int
550 session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
551 {
552   application_t *app;
553   stream_session_t *new_s = 0;
554   u64 handle;
555   u32 opaque = 0;
556   int error = 0;
557   segment_manager_t *sm;
558   u8 alloc_fifos;
559
560   /*
561    * Find connection handle and cleanup half-open table
562    */
563   handle = session_lookup_half_open_handle (tc);
564   if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
565     {
566       SESSION_DBG ("half-open was removed!");
567       return -1;
568     }
569   session_lookup_del_half_open (tc);
570
571   /* Get the app's index from the handle we stored when opening connection
572    * and the opaque (api_context for external apps) from transport session
573    * index */
574   app = application_get_if_valid (handle >> 32);
575   if (!app)
576     return -1;
577   opaque = tc->s_index;
578
579   /*
580    * Allocate new session with fifos (svm segments are allocated if needed)
581    */
582   if (!is_fail)
583     {
584       sm = application_get_connect_segment_manager (app);
585       alloc_fifos = !application_is_builtin_proxy (app);
586       if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s))
587         {
588           is_fail = 1;
589           error = -1;
590         }
591       else
592         new_s->app_index = app->index;
593     }
594
595   /*
596    * Notify client application
597    */
598   if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
599                                               is_fail))
600     {
601       SESSION_DBG ("failed to notify app");
602       if (!is_fail)
603         stream_session_disconnect (new_s);
604     }
605   else
606     {
607       if (!is_fail)
608         new_s->session_state = SESSION_STATE_READY;
609     }
610
611   return error;
612 }
613
614 typedef struct _session_switch_pool_args
615 {
616   u32 session_index;
617   u32 thread_index;
618   u32 new_thread_index;
619   u32 new_session_index;
620 } session_switch_pool_args_t;
621
622 static void
623 session_switch_pool (void *cb_args)
624 {
625   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
626   stream_session_t *s;
627   ASSERT (args->thread_index == vlib_get_thread_index ());
628   s = session_get (args->session_index, args->thread_index);
629   s->server_tx_fifo->master_session_index = args->new_session_index;
630   s->server_tx_fifo->master_thread_index = args->new_thread_index;
631   tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
632   session_free (s);
633   clib_mem_free (cb_args);
634 }
635
636 /**
637  * Move dgram session to the right thread
638  */
639 int
640 session_dgram_connect_notify (transport_connection_t * tc,
641                               u32 old_thread_index,
642                               stream_session_t ** new_session)
643 {
644   stream_session_t *new_s;
645   session_switch_pool_args_t *rpc_args;
646
647   /*
648    * Clone half-open session to the right thread.
649    */
650   new_s = session_clone_safe (tc->s_index, old_thread_index);
651   new_s->connection_index = tc->c_index;
652   new_s->server_rx_fifo->master_session_index = new_s->session_index;
653   new_s->server_rx_fifo->master_thread_index = new_s->thread_index;
654   new_s->session_state = SESSION_STATE_READY;
655   session_lookup_add_connection (tc, session_handle (new_s));
656
657   /*
658    * Ask thread owning the old session to clean it up and make us the tx
659    * fifo owner
660    */
661   rpc_args = clib_mem_alloc (sizeof (*rpc_args));
662   rpc_args->new_session_index = new_s->session_index;
663   rpc_args->new_thread_index = new_s->thread_index;
664   rpc_args->session_index = tc->s_index;
665   rpc_args->thread_index = old_thread_index;
666   session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
667                                   rpc_args);
668
669   tc->s_index = new_s->session_index;
670   new_s->connection_index = tc->c_index;
671   *new_session = new_s;
672   return 0;
673 }
674
675 void
676 stream_session_accept_notify (transport_connection_t * tc)
677 {
678   application_t *server;
679   stream_session_t *s;
680
681   s = session_get (tc->s_index, tc->thread_index);
682   server = application_get (s->app_index);
683   server->cb_fns.session_accept_callback (s);
684 }
685
686 /**
687  * Notification from transport that connection is being closed.
688  *
689  * A disconnect is sent to application but state is not removed. Once
690  * disconnect is acknowledged by application, session disconnect is called.
691  * Ultimately this leads to close being called on transport (passive close).
692  */
693 void
694 stream_session_disconnect_notify (transport_connection_t * tc)
695 {
696   application_t *server;
697   stream_session_t *s;
698
699   s = session_get (tc->s_index, tc->thread_index);
700   server = application_get (s->app_index);
701   server->cb_fns.session_disconnect_callback (s);
702 }
703
704 /**
705  * Cleans up session and lookup table.
706  */
707 void
708 stream_session_delete (stream_session_t * s)
709 {
710   int rv;
711
712   /* Delete from the main lookup table. */
713   if ((rv = session_lookup_del_session (s)))
714     clib_warning ("hash delete error, rv %d", rv);
715
716   /* Cleanup fifo segments */
717   segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
718                                  s->server_tx_fifo);
719   session_free (s);
720 }
721
722 /**
723  * Notification from transport that connection is being deleted
724  *
725  * This removes the session if it is still valid. It should be called only on
726  * previously fully established sessions. For instance failed connects should
727  * call stream_session_connect_notify and indicate that the connect has
728  * failed.
729  */
730 void
731 stream_session_delete_notify (transport_connection_t * tc)
732 {
733   stream_session_t *s;
734
735   /* App might've been removed already */
736   s = session_get_if_valid (tc->s_index, tc->thread_index);
737   if (!s)
738     return;
739   stream_session_delete (s);
740 }
741
742 /**
743  * Notify application that connection has been reset.
744  */
745 void
746 stream_session_reset_notify (transport_connection_t * tc)
747 {
748   stream_session_t *s;
749   application_t *app;
750   s = session_get (tc->s_index, tc->thread_index);
751
752   app = application_get (s->app_index);
753   app->cb_fns.session_reset_callback (s);
754 }
755
756 /**
757  * Accept a stream session. Optionally ping the server by callback.
758  */
759 int
760 stream_session_accept (transport_connection_t * tc, u32 listener_index,
761                        u8 notify)
762 {
763   application_t *server;
764   stream_session_t *s, *listener;
765   segment_manager_t *sm;
766   session_type_t sst;
767   int rv;
768
769   sst = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
770
771   /* Find the server */
772   listener = listen_session_get (sst, listener_index);
773   server = application_get (listener->app_index);
774
775   sm = application_get_listen_segment_manager (server, listener);
776   if ((rv = session_alloc_and_init (sm, tc, 1, &s)))
777     return rv;
778
779   s->app_index = server->index;
780   s->listener_index = listener_index;
781   s->session_state = SESSION_STATE_ACCEPTING;
782
783   /* Shoulder-tap the server */
784   if (notify)
785     {
786       server->cb_fns.session_accept_callback (s);
787     }
788
789   return 0;
790 }
791
792 /**
793  * Ask transport to open connection to remote transport endpoint.
794  *
795  * Stores handle for matching request with reply since the call can be
796  * asynchronous. For instance, for TCP the 3-way handshake must complete
797  * before reply comes. Session is only created once connection is established.
798  *
799  * @param app_index Index of the application requesting the connect
800  * @param st Session type requested.
801  * @param tep Remote transport endpoint
802  * @param opaque Opaque data (typically, api_context) the application expects
803  *               on open completion.
804  */
805 int
806 session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque)
807 {
808   transport_connection_t *tc;
809   session_type_t sst;
810   segment_manager_t *sm;
811   stream_session_t *s;
812   application_t *app;
813   int rv;
814   u64 handle;
815
816   sst = session_type_from_proto_and_ip (rmt->transport_proto, rmt->is_ip4);
817   rv = tp_vfts[sst].open (session_endpoint_to_transport (rmt));
818   if (rv < 0)
819     {
820       SESSION_DBG ("Transport failed to open connection.");
821       return VNET_API_ERROR_SESSION_CONNECT;
822     }
823
824   tc = tp_vfts[sst].get_half_open ((u32) rv);
825
826   /* If transport offers a stream service, only allocate session once the
827    * connection has been established.
828    */
829   if (transport_is_stream (rmt->transport_proto))
830     {
831       /* Add connection to half-open table and save app and tc index. The
832        * latter is needed to help establish the connection while the former
833        * is needed when the connect notify comes and we have to notify the
834        * external app
835        */
836       handle = (((u64) app_index) << 32) | (u64) tc->c_index;
837       session_lookup_add_half_open (tc, handle);
838
839       /* Store api_context (opaque) for when the reply comes. Not the nicest
840        * thing but better than allocating a separate half-open pool.
841        */
842       tc->s_index = opaque;
843     }
844   /* For dgram type of service, allocate session and fifos now.
845    */
846   else
847     {
848       app = application_get (app_index);
849       sm = application_get_connect_segment_manager (app);
850
851       if (session_alloc_and_init (sm, tc, 1, &s))
852         return -1;
853       s->app_index = app->index;
854       s->session_state = SESSION_STATE_CONNECTING_READY;
855
856       /* Tell the app about the new event fifo for this session */
857       app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
858     }
859   return 0;
860 }
861
862 /**
863  * Ask transport to listen on local transport endpoint.
864  *
865  * @param s Session for which listen will be called. Note that unlike
866  *          established sessions, listen sessions are not associated to a
867  *          thread.
868  * @param tep Local endpoint to be listened on.
869  */
870 int
871 stream_session_listen (stream_session_t * s, session_endpoint_t * sep)
872 {
873   transport_connection_t *tc;
874   u32 tci;
875
876   /* Transport bind/listen  */
877   tci = tp_vfts[s->session_type].bind (s->session_index,
878                                        session_endpoint_to_transport (sep));
879
880   if (tci == (u32) ~ 0)
881     return -1;
882
883   /* Attach transport to session */
884   s->connection_index = tci;
885   tc = tp_vfts[s->session_type].get_listener (tci);
886
887   /* Weird but handle it ... */
888   if (tc == 0)
889     return -1;
890
891   /* Add to the main lookup table */
892   session_lookup_add_connection (tc, s->session_index);
893   return 0;
894 }
895
896 /**
897  * Ask transport to stop listening on local transport endpoint.
898  *
899  * @param s Session to stop listening on. It must be in state LISTENING.
900  */
901 int
902 stream_session_stop_listen (stream_session_t * s)
903 {
904   transport_connection_t *tc;
905
906   if (s->session_state != SESSION_STATE_LISTENING)
907     {
908       clib_warning ("not a listening session");
909       return -1;
910     }
911
912   tc = tp_vfts[s->session_type].get_listener (s->connection_index);
913   if (!tc)
914     {
915       clib_warning ("no transport");
916       return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
917     }
918
919   session_lookup_del_connection (tc);
920   tp_vfts[s->session_type].unbind (s->connection_index);
921   return 0;
922 }
923
924 /**
925  * Disconnect session and propagate to transport. This should eventually
926  * result in a delete notification that allows us to cleanup session state.
927  * Called for both active/passive disconnects.
928  *
929  * Should be called from the session's thread.
930  */
931 void
932 stream_session_disconnect (stream_session_t * s)
933 {
934   s->session_state = SESSION_STATE_CLOSED;
935   tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
936 }
937
938 /**
939  * Cleanup transport and session state.
940  *
941  * Notify transport of the cleanup, wait for a delete notify to actually
942  * remove the session state.
943  */
944 void
945 stream_session_cleanup (stream_session_t * s)
946 {
947   int rv;
948
949   s->session_state = SESSION_STATE_CLOSED;
950
951   /* Delete from the main lookup table to avoid more enqueues */
952   rv = session_lookup_del_session (s);
953   if (rv)
954     clib_warning ("hash delete error, rv %d", rv);
955
956   tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
957 }
958
959 /**
960  * Allocate vpp event queue (once) per worker thread
961  */
962 void
963 session_vpp_event_queue_allocate (session_manager_main_t * smm,
964                                   u32 thread_index)
965 {
966   api_main_t *am = &api_main;
967   void *oldheap;
968   u32 event_queue_length = 2048;
969
970   if (smm->vpp_event_queues[thread_index] == 0)
971     {
972       /* Allocate event fifo in the /vpe-api shared-memory segment */
973       oldheap = svm_push_data_heap (am->vlib_rp);
974
975       if (smm->configured_event_queue_length)
976         event_queue_length = smm->configured_event_queue_length;
977
978       smm->vpp_event_queues[thread_index] =
979         unix_shared_memory_queue_init
980         (event_queue_length,
981          sizeof (session_fifo_event_t), 0 /* consumer pid */ ,
982          0 /* (do not) send signal when queue non-empty */ );
983
984       svm_pop_heap (oldheap);
985     }
986 }
987
988 session_type_t
989 session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4)
990 {
991   if (proto == TRANSPORT_PROTO_TCP)
992     {
993       if (is_ip4)
994         return SESSION_TYPE_IP4_TCP;
995       else
996         return SESSION_TYPE_IP6_TCP;
997     }
998   else
999     {
1000       if (is_ip4)
1001         return SESSION_TYPE_IP4_UDP;
1002       else
1003         return SESSION_TYPE_IP6_UDP;
1004     }
1005
1006   return SESSION_N_TYPES;
1007 }
1008
1009 transport_connection_t *
1010 session_get_transport (stream_session_t * s)
1011 {
1012   if (s->session_state != SESSION_STATE_LISTENING)
1013     return tp_vfts[s->session_type].get_connection (s->connection_index,
1014                                                     s->thread_index);
1015   return 0;
1016 }
1017
1018 transport_connection_t *
1019 listen_session_get_transport (stream_session_t * s)
1020 {
1021   return tp_vfts[s->session_type].get_listener (s->connection_index);
1022 }
1023
1024 int
1025 listen_session_get_local_session_endpoint (stream_session_t * listener,
1026                                            session_endpoint_t * sep)
1027 {
1028   transport_connection_t *tc;
1029   tc =
1030     tp_vfts[listener->session_type].get_listener (listener->connection_index);
1031   if (!tc)
1032     {
1033       clib_warning ("no transport");
1034       return -1;
1035     }
1036
1037   /* N.B. The ip should not be copied because this is the local endpoint */
1038   sep->port = tc->lcl_port;
1039   sep->transport_proto = tc->proto;
1040   sep->is_ip4 = tc->is_ip4;
1041   return 0;
1042 }
1043
1044 static clib_error_t *
1045 session_manager_main_enable (vlib_main_t * vm)
1046 {
1047   session_manager_main_t *smm = &session_manager_main;
1048   vlib_thread_main_t *vtm = vlib_get_thread_main ();
1049   u32 num_threads;
1050   u32 preallocated_sessions_per_worker;
1051   int i, j;
1052
1053   num_threads = 1 /* main thread */  + vtm->n_threads;
1054
1055   if (num_threads < 1)
1056     return clib_error_return (0, "n_thread_stacks not set");
1057
1058   /* $$$ config parameters */
1059   svm_fifo_segment_init (0x200000000ULL /* first segment base VA */ ,
1060                          20 /* timeout in seconds */ );
1061
1062   /* configure per-thread ** vectors */
1063   vec_validate (smm->sessions, num_threads - 1);
1064   vec_validate (smm->tx_buffers, num_threads - 1);
1065   vec_validate (smm->pending_event_vector, num_threads - 1);
1066   vec_validate (smm->pending_disconnects, num_threads - 1);
1067   vec_validate (smm->free_event_vector, num_threads - 1);
1068   vec_validate (smm->vpp_event_queues, num_threads - 1);
1069   vec_validate (smm->session_peekers, num_threads - 1);
1070   vec_validate (smm->peekers_readers_locks, num_threads - 1);
1071   vec_validate (smm->peekers_write_locks, num_threads - 1);
1072
1073   for (i = 0; i < TRANSPORT_N_PROTO; i++)
1074     for (j = 0; j < num_threads; j++)
1075       {
1076         vec_validate (smm->session_to_enqueue[i], num_threads - 1);
1077         vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
1078       }
1079
1080   for (i = 0; i < num_threads; i++)
1081     {
1082       vec_validate (smm->free_event_vector[i], 0);
1083       _vec_len (smm->free_event_vector[i]) = 0;
1084       vec_validate (smm->pending_event_vector[i], 0);
1085       _vec_len (smm->pending_event_vector[i]) = 0;
1086       vec_validate (smm->pending_disconnects[i], 0);
1087       _vec_len (smm->pending_disconnects[i]) = 0;
1088       if (num_threads > 1)
1089         {
1090           clib_spinlock_init (&smm->peekers_readers_locks[i]);
1091           clib_spinlock_init (&smm->peekers_write_locks[i]);
1092         }
1093     }
1094
1095 #if SESSION_DBG
1096   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
1097 #endif
1098
1099   /* Allocate vpp event queues */
1100   for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
1101     session_vpp_event_queue_allocate (smm, i);
1102
1103   /* Preallocate sessions */
1104   if (smm->preallocated_sessions)
1105     {
1106       if (num_threads == 1)
1107         {
1108           pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
1109         }
1110       else
1111         {
1112           int j;
1113           preallocated_sessions_per_worker =
1114             (1.1 * (f64) smm->preallocated_sessions /
1115              (f64) (num_threads - 1));
1116
1117           for (j = 1; j < num_threads; j++)
1118             {
1119               pool_init_fixed (smm->sessions[j],
1120                                preallocated_sessions_per_worker);
1121             }
1122         }
1123     }
1124
1125   session_lookup_init ();
1126   app_namespaces_init ();
1127   transport_init ();
1128
1129   smm->is_enabled = 1;
1130
1131   /* Enable TCP transport */
1132   vnet_tcp_enable_disable (vm, 1);
1133
1134   return 0;
1135 }
1136
1137 void
1138 session_node_enable_disable (u8 is_en)
1139 {
1140   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1141   /* *INDENT-OFF* */
1142   foreach_vlib_main (({
1143     vlib_node_set_state (this_vlib_main, session_queue_node.index,
1144                          state);
1145   }));
1146   /* *INDENT-ON* */
1147 }
1148
1149 clib_error_t *
1150 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1151 {
1152   clib_error_t *error = 0;
1153   if (is_en)
1154     {
1155       if (session_manager_main.is_enabled)
1156         return 0;
1157
1158       session_node_enable_disable (is_en);
1159       error = session_manager_main_enable (vm);
1160     }
1161   else
1162     {
1163       session_manager_main.is_enabled = 0;
1164       session_node_enable_disable (is_en);
1165     }
1166
1167   return error;
1168 }
1169
1170 clib_error_t *
1171 session_manager_main_init (vlib_main_t * vm)
1172 {
1173   session_manager_main_t *smm = &session_manager_main;
1174   smm->is_enabled = 0;
1175   return 0;
1176 }
1177
1178 VLIB_INIT_FUNCTION (session_manager_main_init);
1179
1180 static clib_error_t *
1181 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1182 {
1183   session_manager_main_t *smm = &session_manager_main;
1184   u32 nitems;
1185   uword tmp;
1186
1187   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1188     {
1189       if (unformat (input, "event-queue-length %d", &nitems))
1190         {
1191           if (nitems >= 2048)
1192             smm->configured_event_queue_length = nitems;
1193           else
1194             clib_warning ("event queue length %d too small, ignored", nitems);
1195         }
1196       else if (unformat (input, "preallocated-sessions %d",
1197                          &smm->preallocated_sessions))
1198         ;
1199       else if (unformat (input, "v4-session-table-buckets %d",
1200                          &smm->configured_v4_session_table_buckets))
1201         ;
1202       else if (unformat (input, "v4-halfopen-table-buckets %d",
1203                          &smm->configured_v4_halfopen_table_buckets))
1204         ;
1205       else if (unformat (input, "v6-session-table-buckets %d",
1206                          &smm->configured_v6_session_table_buckets))
1207         ;
1208       else if (unformat (input, "v6-halfopen-table-buckets %d",
1209                          &smm->configured_v6_halfopen_table_buckets))
1210         ;
1211       else if (unformat (input, "v4-session-table-memory %U",
1212                          unformat_memory_size, &tmp))
1213         {
1214           if (tmp >= 0x100000000)
1215             return clib_error_return (0, "memory size %llx (%lld) too large",
1216                                       tmp, tmp);
1217           smm->configured_v4_session_table_memory = tmp;
1218         }
1219       else if (unformat (input, "v4-halfopen-table-memory %U",
1220                          unformat_memory_size, &tmp))
1221         {
1222           if (tmp >= 0x100000000)
1223             return clib_error_return (0, "memory size %llx (%lld) too large",
1224                                       tmp, tmp);
1225           smm->configured_v4_halfopen_table_memory = tmp;
1226         }
1227       else if (unformat (input, "v6-session-table-memory %U",
1228                          unformat_memory_size, &tmp))
1229         {
1230           if (tmp >= 0x100000000)
1231             return clib_error_return (0, "memory size %llx (%lld) too large",
1232                                       tmp, tmp);
1233           smm->configured_v6_session_table_memory = tmp;
1234         }
1235       else if (unformat (input, "v6-halfopen-table-memory %U",
1236                          unformat_memory_size, &tmp))
1237         {
1238           if (tmp >= 0x100000000)
1239             return clib_error_return (0, "memory size %llx (%lld) too large",
1240                                       tmp, tmp);
1241           smm->configured_v6_halfopen_table_memory = tmp;
1242         }
1243       else if (unformat (input, "local-endpoints-table-memory %U",
1244                          unformat_memory_size, &tmp))
1245         {
1246           if (tmp >= 0x100000000)
1247             return clib_error_return (0, "memory size %llx (%lld) too large",
1248                                       tmp, tmp);
1249           smm->local_endpoints_table_memory = tmp;
1250         }
1251       else if (unformat (input, "local-endpoints-table-buckets %d",
1252                          &smm->local_endpoints_table_buckets))
1253         ;
1254       else
1255         return clib_error_return (0, "unknown input `%U'",
1256                                   format_unformat_error, input);
1257     }
1258   return 0;
1259 }
1260
1261 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1262
1263 /*
1264  * fd.io coding-style-patch-verification: ON
1265  *
1266  * Local Variables:
1267  * eval: (c-set-style "gnu")
1268  * End:
1269  */