tcp: horizontal scaling improvments
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vlibmemory/api.h>
24 #include <vnet/dpo/load_balance.h>
25 #include <vnet/fib/ip4_fib.h>
26 #include <vnet/tcp/tcp.h>
27
28 session_manager_main_t session_manager_main;
29 extern transport_proto_vft_t *tp_vfts;
30
31 int
32 stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
33                          u8 alloc_fifos, stream_session_t ** ret_s)
34 {
35   session_manager_main_t *smm = &session_manager_main;
36   svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
37   u32 fifo_segment_index;
38   u32 pool_index;
39   stream_session_t *s;
40   u64 value;
41   u32 thread_index = tc->thread_index;
42   int rv;
43
44   ASSERT (thread_index == vlib_get_thread_index ());
45
46   /* Create the session */
47   pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
48   memset (s, 0, sizeof (*s));
49   pool_index = s - smm->sessions[thread_index];
50
51   /* Allocate fifos */
52   if (alloc_fifos)
53     {
54       if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
55                                                      &server_tx_fifo,
56                                                      &fifo_segment_index)))
57         {
58           pool_put (smm->sessions[thread_index], s);
59           return rv;
60         }
61       /* Initialize backpointers */
62       server_rx_fifo->master_session_index = pool_index;
63       server_rx_fifo->master_thread_index = thread_index;
64
65       server_tx_fifo->master_session_index = pool_index;
66       server_tx_fifo->master_thread_index = thread_index;
67
68       s->server_rx_fifo = server_rx_fifo;
69       s->server_tx_fifo = server_tx_fifo;
70       s->svm_segment_index = fifo_segment_index;
71     }
72
73   /* Initialize state machine, such as it is... */
74   s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
75                                                     tc->is_ip4);
76   s->session_state = SESSION_STATE_CONNECTING;
77   s->thread_index = thread_index;
78   s->session_index = pool_index;
79
80   /* Attach transport to session */
81   s->connection_index = tc->c_index;
82
83   /* Attach session to transport */
84   tc->s_index = s->session_index;
85
86   /* Add to the main lookup table */
87   value = stream_session_handle (s);
88   stream_session_table_add_for_tc (tc, value);
89
90   *ret_s = s;
91
92   return 0;
93 }
94
95 /**
96  * Discards bytes from buffer chain
97  *
98  * It discards n_bytes_to_drop starting at first buffer after chain_b
99  */
100 always_inline void
101 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
102                                      vlib_buffer_t ** chain_b,
103                                      u32 n_bytes_to_drop)
104 {
105   vlib_buffer_t *next = *chain_b;
106   u32 to_drop = n_bytes_to_drop;
107   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
108   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
109     {
110       next = vlib_get_buffer (vm, next->next_buffer);
111       if (next->current_length > to_drop)
112         {
113           vlib_buffer_advance (next, to_drop);
114           to_drop = 0;
115         }
116       else
117         {
118           to_drop -= next->current_length;
119           next->current_length = 0;
120         }
121     }
122   *chain_b = next;
123
124   if (to_drop == 0)
125     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
126 }
127
128 /**
129  * Enqueue buffer chain tail
130  */
131 always_inline int
132 session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
133                             u32 offset, u8 is_in_order)
134 {
135   vlib_buffer_t *chain_b;
136   u32 chain_bi, len, diff;
137   vlib_main_t *vm = vlib_get_main ();
138   u8 *data;
139   u32 written = 0;
140   int rv = 0;
141
142   if (is_in_order && offset)
143     {
144       diff = offset - b->current_length;
145       if (diff > b->total_length_not_including_first_buffer)
146         return 0;
147       chain_b = b;
148       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
149       chain_bi = vlib_get_buffer_index (vm, chain_b);
150     }
151   else
152     chain_bi = b->next_buffer;
153
154   do
155     {
156       chain_b = vlib_get_buffer (vm, chain_bi);
157       data = vlib_buffer_get_current (chain_b);
158       len = chain_b->current_length;
159       if (!len)
160         continue;
161       if (is_in_order)
162         {
163           rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
164           if (rv == len)
165             {
166               written += rv;
167             }
168           else if (rv < len)
169             {
170               return (rv > 0) ? (written + rv) : written;
171             }
172           else if (rv > len)
173             {
174               written += rv;
175
176               /* written more than what was left in chain */
177               if (written > b->total_length_not_including_first_buffer)
178                 return written;
179
180               /* drop the bytes that have already been delivered */
181               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
182             }
183         }
184       else
185         {
186           rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
187                                              data);
188           if (rv)
189             {
190               clib_warning ("failed to enqueue multi-buffer seg");
191               return -1;
192             }
193           offset += len;
194         }
195     }
196   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
197           ? chain_b->next_buffer : 0));
198
199   if (is_in_order)
200     return written;
201
202   return 0;
203 }
204
205 /*
206  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
207  * event but on request can queue notification events for later delivery by
208  * calling stream_server_flush_enqueue_events().
209  *
210  * @param tc Transport connection which is to be enqueued data
211  * @param b Buffer to be enqueued
212  * @param offset Offset at which to start enqueueing if out-of-order
213  * @param queue_event Flag to indicate if peer is to be notified or if event
214  *                    is to be queued. The former is useful when more data is
215  *                    enqueued and only one event is to be generated.
216  * @param is_in_order Flag to indicate if data is in order
217  * @return Number of bytes enqueued or a negative value if enqueueing failed.
218  */
219 int
220 stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
221                              u32 offset, u8 queue_event, u8 is_in_order)
222 {
223   stream_session_t *s;
224   int enqueued = 0, rv, in_order_off;
225
226   s = stream_session_get (tc->s_index, tc->thread_index);
227
228   if (is_in_order)
229     {
230       enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
231                                           b->current_length,
232                                           vlib_buffer_get_current (b));
233       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
234                          && enqueued >= 0))
235         {
236           in_order_off = enqueued > b->current_length ? enqueued : 0;
237           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
238           if (rv > 0)
239             enqueued += rv;
240         }
241     }
242   else
243     {
244       rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
245                                          b->current_length,
246                                          vlib_buffer_get_current (b));
247       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
248         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
249       /* if something was enqueued, report even this as success for ooo
250        * segment handling */
251       return rv;
252     }
253
254   if (queue_event)
255     {
256       /* Queue RX event on this fifo. Eventually these will need to be flushed
257        * by calling stream_server_flush_enqueue_events () */
258       session_manager_main_t *smm = vnet_get_session_manager_main ();
259       u32 thread_index = s->thread_index;
260       u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index];
261
262       if (s->enqueue_epoch != my_enqueue_epoch)
263         {
264           s->enqueue_epoch = my_enqueue_epoch;
265           vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index],
266                     s - smm->sessions[thread_index]);
267         }
268     }
269
270   if (is_in_order)
271     return enqueued;
272
273   return 0;
274 }
275
276 /** Check if we have space in rx fifo to push more bytes */
277 u8
278 stream_session_no_space (transport_connection_t * tc, u32 thread_index,
279                          u16 data_len)
280 {
281   stream_session_t *s = stream_session_get (tc->s_index, thread_index);
282
283   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
284     return 1;
285
286   if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
287     return 1;
288
289   return 0;
290 }
291
292 u32
293 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
294 {
295   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
296   if (!s->server_tx_fifo)
297     return 0;
298   return svm_fifo_max_dequeue (s->server_tx_fifo);
299 }
300
301 int
302 stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
303                            u32 offset, u32 max_bytes)
304 {
305   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
306   return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
307 }
308
309 u32
310 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
311 {
312   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
313   return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
314 }
315
316 /**
317  * Notify session peer that new data has been enqueued.
318  *
319  * @param s Stream session for which the event is to be generated.
320  * @param block Flag to indicate if call should block if event queue is full.
321  *
322  * @return 0 on succes or negative number if failed to send notification.
323  */
324 static int
325 stream_session_enqueue_notify (stream_session_t * s, u8 block)
326 {
327   application_t *app;
328   session_fifo_event_t evt;
329   unix_shared_memory_queue_t *q;
330   static u32 serial_number;
331
332   if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
333     {
334       /* Session is closed so app will never clean up. Flush rx fifo */
335       u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
336       if (to_dequeue)
337         svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
338       return 0;
339     }
340
341   /* Get session's server */
342   app = application_get_if_valid (s->app_index);
343
344   if (PREDICT_FALSE (app == 0))
345     {
346       clib_warning ("invalid s->app_index = %d", s->app_index);
347       return 0;
348     }
349
350   /* Built-in server? Hand event to the callback... */
351   if (app->cb_fns.builtin_server_rx_callback)
352     return app->cb_fns.builtin_server_rx_callback (s);
353
354   /* If no event, send one */
355   if (svm_fifo_set_event (s->server_rx_fifo))
356     {
357       /* Fabricate event */
358       evt.fifo = s->server_rx_fifo;
359       evt.event_type = FIFO_EVENT_APP_RX;
360       evt.event_id = serial_number++;
361
362       /* Add event to server's event queue */
363       q = app->event_queue;
364
365       /* Based on request block (or not) for lack of space */
366       if (block || PREDICT_TRUE (q->cursize < q->maxsize))
367         unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
368                                       0 /* do wait for mutex */ );
369       else
370         {
371           clib_warning ("fifo full");
372           return -1;
373         }
374     }
375
376   /* *INDENT-OFF* */
377   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
378       ed->data[0] = evt.event_id;
379       ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
380   }));
381   /* *INDENT-ON* */
382
383   return 0;
384 }
385
386 /**
387  * Flushes queue of sessions that are to be notified of new data
388  * enqueued events.
389  *
390  * @param thread_index Thread index for which the flush is to be performed.
391  * @return 0 on success or a positive number indicating the number of
392  *         failures due to API queue being full.
393  */
394 int
395 session_manager_flush_enqueue_events (u32 thread_index)
396 {
397   session_manager_main_t *smm = &session_manager_main;
398   u32 *session_indices_to_enqueue;
399   int i, errors = 0;
400
401   session_indices_to_enqueue =
402     smm->session_indices_to_enqueue_by_thread[thread_index];
403
404   for (i = 0; i < vec_len (session_indices_to_enqueue); i++)
405     {
406       stream_session_t *s0;
407
408       /* Get session */
409       s0 = stream_session_get_if_valid (session_indices_to_enqueue[i],
410                                         thread_index);
411       if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ ))
412         {
413           errors++;
414         }
415     }
416
417   vec_reset_length (session_indices_to_enqueue);
418
419   smm->session_indices_to_enqueue_by_thread[thread_index] =
420     session_indices_to_enqueue;
421
422   /* Increment enqueue epoch for next round */
423   smm->current_enqueue_epoch[thread_index]++;
424
425   return errors;
426 }
427
428 /**
429  * Init fifo tail and head pointers
430  *
431  * Useful if transport uses absolute offsets for tracking ooo segments.
432  */
433 void
434 stream_session_init_fifos_pointers (transport_connection_t * tc,
435                                     u32 rx_pointer, u32 tx_pointer)
436 {
437   stream_session_t *s;
438   s = stream_session_get (tc->s_index, tc->thread_index);
439   svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
440   svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
441 }
442
443 int
444 stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
445 {
446   application_t *app;
447   stream_session_t *new_s = 0;
448   u64 handle;
449   u32 opaque = 0;
450   int error = 0;
451   u8 st;
452
453   st = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4);
454   handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
455                                                    tc->lcl_port, tc->rmt_port,
456                                                    st);
457   if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
458     {
459       clib_warning ("half-open was removed!");
460       return -1;
461     }
462
463   /* Cleanup half-open table */
464   stream_session_half_open_table_del (tc);
465
466   /* Get the app's index from the handle we stored when opening connection
467    * and the opaque (api_context for external apps) from transport session
468    * index */
469   app = application_get_if_valid (handle >> 32);
470   if (!app)
471     return -1;
472
473   opaque = tc->s_index;
474
475   if (!is_fail)
476     {
477       segment_manager_t *sm;
478       u8 alloc_fifos;
479       sm = application_get_connect_segment_manager (app);
480       alloc_fifos = application_is_proxy (app);
481       /* Create new session (svm segments are allocated if needed) */
482       if (stream_session_create_i (sm, tc, alloc_fifos, &new_s))
483         {
484           is_fail = 1;
485           error = -1;
486         }
487       else
488         new_s->app_index = app->index;
489     }
490
491   /* Notify client application */
492   if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
493                                               is_fail))
494     {
495       clib_warning ("failed to notify app");
496       if (!is_fail)
497         stream_session_disconnect (new_s);
498     }
499   else
500     {
501       if (!is_fail)
502         new_s->session_state = SESSION_STATE_READY;
503     }
504
505   return error;
506 }
507
508 void
509 stream_session_accept_notify (transport_connection_t * tc)
510 {
511   application_t *server;
512   stream_session_t *s;
513
514   s = stream_session_get (tc->s_index, tc->thread_index);
515   server = application_get (s->app_index);
516   server->cb_fns.session_accept_callback (s);
517 }
518
519 /**
520  * Notification from transport that connection is being closed.
521  *
522  * A disconnect is sent to application but state is not removed. Once
523  * disconnect is acknowledged by application, session disconnect is called.
524  * Ultimately this leads to close being called on transport (passive close).
525  */
526 void
527 stream_session_disconnect_notify (transport_connection_t * tc)
528 {
529   application_t *server;
530   stream_session_t *s;
531
532   s = stream_session_get (tc->s_index, tc->thread_index);
533   server = application_get (s->app_index);
534   server->cb_fns.session_disconnect_callback (s);
535 }
536
537 /**
538  * Cleans up session and lookup table.
539  */
540 void
541 stream_session_delete (stream_session_t * s)
542 {
543   session_manager_main_t *smm = vnet_get_session_manager_main ();
544   int rv;
545
546   /* Delete from the main lookup table. */
547   if ((rv = stream_session_table_del (s)))
548     clib_warning ("hash delete error, rv %d", rv);
549
550   /* Cleanup fifo segments */
551   segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
552                                  s->server_tx_fifo);
553
554   pool_put (smm->sessions[s->thread_index], s);
555   if (CLIB_DEBUG)
556     memset (s, 0xFA, sizeof (*s));
557 }
558
559 /**
560  * Notification from transport that connection is being deleted
561  *
562  * This removes the session if it is still valid. It should be called only on
563  * previously fully established sessions. For instance failed connects should
564  * call stream_session_connect_notify and indicate that the connect has
565  * failed.
566  */
567 void
568 stream_session_delete_notify (transport_connection_t * tc)
569 {
570   stream_session_t *s;
571
572   /* App might've been removed already */
573   s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
574   if (!s)
575     return;
576   stream_session_delete (s);
577 }
578
579 /**
580  * Notify application that connection has been reset.
581  */
582 void
583 stream_session_reset_notify (transport_connection_t * tc)
584 {
585   stream_session_t *s;
586   application_t *app;
587   s = stream_session_get (tc->s_index, tc->thread_index);
588
589   app = application_get (s->app_index);
590   app->cb_fns.session_reset_callback (s);
591 }
592
593 /**
594  * Accept a stream session. Optionally ping the server by callback.
595  */
596 int
597 stream_session_accept (transport_connection_t * tc, u32 listener_index,
598                        u8 sst, u8 notify)
599 {
600   application_t *server;
601   stream_session_t *s, *listener;
602   segment_manager_t *sm;
603
604   int rv;
605
606   /* Find the server */
607   listener = listen_session_get (sst, listener_index);
608   server = application_get (listener->app_index);
609
610   sm = application_get_listen_segment_manager (server, listener);
611   if ((rv = stream_session_create_i (sm, tc, 1, &s)))
612     return rv;
613
614   s->app_index = server->index;
615   s->listener_index = listener_index;
616   s->session_state = SESSION_STATE_ACCEPTING;
617
618   /* Shoulder-tap the server */
619   if (notify)
620     {
621       server->cb_fns.session_accept_callback (s);
622     }
623
624   return 0;
625 }
626
627 /**
628  * Ask transport to open connection to remote transport endpoint.
629  *
630  * Stores handle for matching request with reply since the call can be
631  * asynchronous. For instance, for TCP the 3-way handshake must complete
632  * before reply comes. Session is only created once connection is established.
633  *
634  * @param app_index Index of the application requesting the connect
635  * @param st Session type requested.
636  * @param tep Remote transport endpoint
637  * @param res Resulting transport connection .
638  */
639 int
640 stream_session_open (u32 app_index, session_type_t st,
641                      transport_endpoint_t * rmt,
642                      transport_connection_t ** res)
643 {
644   transport_connection_t *tc;
645   int rv;
646   u64 handle;
647
648   rv = tp_vfts[st].open (rmt);
649   if (rv < 0)
650     {
651       clib_warning ("Transport failed to open connection.");
652       return VNET_API_ERROR_SESSION_CONNECT_FAIL;
653     }
654
655   tc = tp_vfts[st].get_half_open ((u32) rv);
656
657   /* Save app and tc index. The latter is needed to help establish the
658    * connection while the former is needed when the connect notify comes
659    * and we have to notify the external app */
660   handle = (((u64) app_index) << 32) | (u64) tc->c_index;
661
662   /* Add to the half-open lookup table */
663   stream_session_half_open_table_add (tc, handle);
664
665   *res = tc;
666
667   return 0;
668 }
669
670 /**
671  * Ask transport to listen on local transport endpoint.
672  *
673  * @param s Session for which listen will be called. Note that unlike
674  *          established sessions, listen sessions are not associated to a
675  *          thread.
676  * @param tep Local endpoint to be listened on.
677  */
678 int
679 stream_session_listen (stream_session_t * s, transport_endpoint_t * tep)
680 {
681   transport_connection_t *tc;
682   u32 tci;
683
684   /* Transport bind/listen  */
685   tci = tp_vfts[s->session_type].bind (s->session_index, tep);
686
687   if (tci == (u32) ~ 0)
688     return -1;
689
690   /* Attach transport to session */
691   s->connection_index = tci;
692   tc = tp_vfts[s->session_type].get_listener (tci);
693
694   /* Weird but handle it ... */
695   if (tc == 0)
696     return -1;
697
698   /* Add to the main lookup table */
699   stream_session_table_add_for_tc (tc, s->session_index);
700
701   return 0;
702 }
703
704 /**
705  * Ask transport to stop listening on local transport endpoint.
706  *
707  * @param s Session to stop listening on. It must be in state LISTENING.
708  */
709 int
710 stream_session_stop_listen (stream_session_t * s)
711 {
712   transport_connection_t *tc;
713
714   if (s->session_state != SESSION_STATE_LISTENING)
715     {
716       clib_warning ("not a listening session");
717       return -1;
718     }
719
720   tc = tp_vfts[s->session_type].get_listener (s->connection_index);
721   if (!tc)
722     {
723       clib_warning ("no transport");
724       return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
725     }
726
727   stream_session_table_del_for_tc (tc);
728   tp_vfts[s->session_type].unbind (s->connection_index);
729   return 0;
730 }
731
732 void
733 session_send_session_evt_to_thread (u64 session_handle,
734                                     fifo_event_type_t evt_type,
735                                     u32 thread_index)
736 {
737   static u16 serial_number = 0;
738   session_fifo_event_t evt;
739   unix_shared_memory_queue_t *q;
740
741   /* Fabricate event */
742   evt.session_handle = session_handle;
743   evt.event_type = evt_type;
744   evt.event_id = serial_number++;
745
746   q = session_manager_get_vpp_event_queue (thread_index);
747
748   /* Based on request block (or not) for lack of space */
749   if (PREDICT_TRUE (q->cursize < q->maxsize))
750     {
751       if (unix_shared_memory_queue_add (q, (u8 *) & evt,
752                                         0 /* do wait for mutex */ ))
753         {
754           clib_warning ("failed to enqueue evt");
755         }
756     }
757   else
758     {
759       clib_warning ("queue full");
760       return;
761     }
762 }
763
764 /**
765  * Disconnect session and propagate to transport. This should eventually
766  * result in a delete notification that allows us to cleanup session state.
767  * Called for both active/passive disconnects.
768  *
769  * Should be called from the session's thread.
770  */
771 void
772 stream_session_disconnect (stream_session_t * s)
773 {
774   s->session_state = SESSION_STATE_CLOSED;
775   tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
776 }
777
778 /**
779  * Cleanup transport and session state.
780  *
781  * Notify transport of the cleanup, wait for a delete notify to actually
782  * remove the session state.
783  */
784 void
785 stream_session_cleanup (stream_session_t * s)
786 {
787   int rv;
788
789   s->session_state = SESSION_STATE_CLOSED;
790
791   /* Delete from the main lookup table to avoid more enqueues */
792   rv = stream_session_table_del (s);
793   if (rv)
794     clib_warning ("hash delete error, rv %d", rv);
795
796   tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
797 }
798
799 /**
800  * Allocate vpp event queue (once) per worker thread
801  */
802 void
803 session_vpp_event_queue_allocate (session_manager_main_t * smm,
804                                   u32 thread_index)
805 {
806   api_main_t *am = &api_main;
807   void *oldheap;
808   u32 event_queue_length = 2048;
809
810   if (smm->vpp_event_queues[thread_index] == 0)
811     {
812       /* Allocate event fifo in the /vpe-api shared-memory segment */
813       oldheap = svm_push_data_heap (am->vlib_rp);
814
815       if (smm->configured_event_queue_length)
816         event_queue_length = smm->configured_event_queue_length;
817
818       smm->vpp_event_queues[thread_index] =
819         unix_shared_memory_queue_init
820         (event_queue_length,
821          sizeof (session_fifo_event_t), 0 /* consumer pid */ ,
822          0 /* (do not) send signal when queue non-empty */ );
823
824       svm_pop_heap (oldheap);
825     }
826 }
827
828 session_type_t
829 session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4)
830 {
831   if (proto == TRANSPORT_PROTO_TCP)
832     {
833       if (is_ip4)
834         return SESSION_TYPE_IP4_TCP;
835       else
836         return SESSION_TYPE_IP6_TCP;
837     }
838   else
839     {
840       if (is_ip4)
841         return SESSION_TYPE_IP4_UDP;
842       else
843         return SESSION_TYPE_IP6_UDP;
844     }
845
846   return SESSION_N_TYPES;
847 }
848
849 static clib_error_t *
850 session_manager_main_enable (vlib_main_t * vm)
851 {
852   session_manager_main_t *smm = &session_manager_main;
853   vlib_thread_main_t *vtm = vlib_get_thread_main ();
854   u32 num_threads;
855   u32 preallocated_sessions_per_worker;
856   int i;
857
858   num_threads = 1 /* main thread */  + vtm->n_threads;
859
860   if (num_threads < 1)
861     return clib_error_return (0, "n_thread_stacks not set");
862
863   /* $$$ config parameters */
864   svm_fifo_segment_init (0x200000000ULL /* first segment base VA */ ,
865                          20 /* timeout in seconds */ );
866
867   /* configure per-thread ** vectors */
868   vec_validate (smm->sessions, num_threads - 1);
869   vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
870   vec_validate (smm->tx_buffers, num_threads - 1);
871   vec_validate (smm->pending_event_vector, num_threads - 1);
872   vec_validate (smm->free_event_vector, num_threads - 1);
873   vec_validate (smm->current_enqueue_epoch, num_threads - 1);
874   vec_validate (smm->vpp_event_queues, num_threads - 1);
875
876   for (i = 0; i < num_threads; i++)
877     {
878       vec_validate (smm->free_event_vector[i], 0);
879       _vec_len (smm->free_event_vector[i]) = 0;
880       vec_validate (smm->pending_event_vector[i], 0);
881       _vec_len (smm->pending_event_vector[i]) = 0;
882     }
883
884 #if SESSION_DBG
885   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
886 #endif
887
888   /* Allocate vpp event queues */
889   for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
890     session_vpp_event_queue_allocate (smm, i);
891
892   /* Preallocate sessions */
893   if (smm->preallocated_sessions)
894     {
895       if (num_threads == 1)
896         {
897           pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
898         }
899       else
900         {
901           int j;
902           preallocated_sessions_per_worker =
903             (1.1 * (f64) smm->preallocated_sessions /
904              (f64) (num_threads - 1));
905
906           for (j = 1; j < num_threads; j++)
907             {
908               pool_init_fixed (smm->sessions[j],
909                                preallocated_sessions_per_worker);
910             }
911         }
912     }
913
914   session_lookup_init ();
915
916   smm->is_enabled = 1;
917
918   /* Enable TCP transport */
919   vnet_tcp_enable_disable (vm, 1);
920
921   return 0;
922 }
923
924 void
925 session_node_enable_disable (u8 is_en)
926 {
927   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
928   /* *INDENT-OFF* */
929   foreach_vlib_main (({
930     vlib_node_set_state (this_vlib_main, session_queue_node.index,
931                          state);
932   }));
933   /* *INDENT-ON* */
934 }
935
936 clib_error_t *
937 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
938 {
939   if (is_en)
940     {
941       if (session_manager_main.is_enabled)
942         return 0;
943
944       session_node_enable_disable (is_en);
945
946       return session_manager_main_enable (vm);
947     }
948   else
949     {
950       session_manager_main.is_enabled = 0;
951       session_node_enable_disable (is_en);
952     }
953
954   return 0;
955 }
956
957 clib_error_t *
958 session_manager_main_init (vlib_main_t * vm)
959 {
960   session_manager_main_t *smm = &session_manager_main;
961   smm->is_enabled = 0;
962   return 0;
963 }
964
965 VLIB_INIT_FUNCTION (session_manager_main_init);
966
967 static clib_error_t *
968 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
969 {
970   session_manager_main_t *smm = &session_manager_main;
971   u32 nitems;
972   uword tmp;
973
974   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
975     {
976       if (unformat (input, "event-queue-length %d", &nitems))
977         {
978           if (nitems >= 2048)
979             smm->configured_event_queue_length = nitems;
980           else
981             clib_warning ("event queue length %d too small, ignored", nitems);
982         }
983       else if (unformat (input, "preallocated-sessions %d",
984                          &smm->preallocated_sessions))
985         ;
986       else if (unformat (input, "v4-session-table-buckets %d",
987                          &smm->configured_v4_session_table_buckets))
988         ;
989       else if (unformat (input, "v4-halfopen-table-buckets %d",
990                          &smm->configured_v4_halfopen_table_buckets))
991         ;
992       else if (unformat (input, "v6-session-table-buckets %d",
993                          &smm->configured_v6_session_table_buckets))
994         ;
995       else if (unformat (input, "v6-halfopen-table-buckets %d",
996                          &smm->configured_v6_halfopen_table_buckets))
997         ;
998       else if (unformat (input, "v4-session-table-memory %U",
999                          unformat_memory_size, &tmp))
1000         {
1001           if (tmp >= 0x100000000)
1002             return clib_error_return (0, "memory size %llx (%lld) too large",
1003                                       tmp, tmp);
1004           smm->configured_v4_session_table_memory = tmp;
1005         }
1006       else if (unformat (input, "v4-halfopen-table-memory %U",
1007                          unformat_memory_size, &tmp))
1008         {
1009           if (tmp >= 0x100000000)
1010             return clib_error_return (0, "memory size %llx (%lld) too large",
1011                                       tmp, tmp);
1012           smm->configured_v4_halfopen_table_memory = tmp;
1013         }
1014       else if (unformat (input, "v6-session-table-memory %U",
1015                          unformat_memory_size, &tmp))
1016         {
1017           if (tmp >= 0x100000000)
1018             return clib_error_return (0, "memory size %llx (%lld) too large",
1019                                       tmp, tmp);
1020           smm->configured_v6_session_table_memory = tmp;
1021         }
1022       else if (unformat (input, "v6-halfopen-table-memory %U",
1023                          unformat_memory_size, &tmp))
1024         {
1025           if (tmp >= 0x100000000)
1026             return clib_error_return (0, "memory size %llx (%lld) too large",
1027                                       tmp, tmp);
1028           smm->configured_v6_halfopen_table_memory = tmp;
1029         }
1030       else
1031         return clib_error_return (0, "unknown input `%U'",
1032                                   format_unformat_error, input);
1033     }
1034   return 0;
1035 }
1036
1037 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1038
1039 /*
1040  * fd.io coding-style-patch-verification: ON
1041  *
1042  * Local Variables:
1043  * eval: (c-set-style "gnu")
1044  * End:
1045  */