tcp: re-enable persist timer if no data available to send
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vlibmemory/api.h>
24 #include <vnet/dpo/load_balance.h>
25 #include <vnet/fib/ip4_fib.h>
26 #include <vnet/tcp/tcp.h>
27
28 session_manager_main_t session_manager_main;
29 extern transport_proto_vft_t *tp_vfts;
30
31 int
32 stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
33                          u8 alloc_fifos, stream_session_t ** ret_s)
34 {
35   session_manager_main_t *smm = &session_manager_main;
36   svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
37   u32 fifo_segment_index;
38   u32 pool_index;
39   stream_session_t *s;
40   u64 value;
41   u32 thread_index = tc->thread_index;
42   int rv;
43
44   ASSERT (thread_index == vlib_get_thread_index ());
45
46   /* Create the session */
47   pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
48   memset (s, 0, sizeof (*s));
49   pool_index = s - smm->sessions[thread_index];
50
51   /* Allocate fifos */
52   if (alloc_fifos)
53     {
54       if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
55                                                      &server_tx_fifo,
56                                                      &fifo_segment_index)))
57         {
58           pool_put (smm->sessions[thread_index], s);
59           return rv;
60         }
61       /* Initialize backpointers */
62       server_rx_fifo->master_session_index = pool_index;
63       server_rx_fifo->master_thread_index = thread_index;
64
65       server_tx_fifo->master_session_index = pool_index;
66       server_tx_fifo->master_thread_index = thread_index;
67
68       s->server_rx_fifo = server_rx_fifo;
69       s->server_tx_fifo = server_tx_fifo;
70       s->svm_segment_index = fifo_segment_index;
71     }
72
73   /* Initialize state machine, such as it is... */
74   s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
75                                                     tc->is_ip4);
76   s->session_state = SESSION_STATE_CONNECTING;
77   s->thread_index = thread_index;
78   s->session_index = pool_index;
79
80   /* Attach transport to session */
81   s->connection_index = tc->c_index;
82
83   /* Attach session to transport */
84   tc->s_index = s->session_index;
85
86   /* Add to the main lookup table */
87   value = stream_session_handle (s);
88   stream_session_table_add_for_tc (tc, value);
89
90   *ret_s = s;
91
92   return 0;
93 }
94
95 /**
96  * Discards bytes from buffer chain
97  *
98  * It discards n_bytes_to_drop starting at first buffer after chain_b
99  */
100 always_inline void
101 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
102                                      vlib_buffer_t ** chain_b,
103                                      u32 n_bytes_to_drop)
104 {
105   vlib_buffer_t *next = *chain_b;
106   u32 to_drop = n_bytes_to_drop;
107   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
108   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
109     {
110       next = vlib_get_buffer (vm, next->next_buffer);
111       if (next->current_length > to_drop)
112         {
113           vlib_buffer_advance (next, to_drop);
114           to_drop = 0;
115         }
116       else
117         {
118           to_drop -= next->current_length;
119           next->current_length = 0;
120         }
121     }
122   *chain_b = next;
123
124   if (to_drop == 0)
125     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
126 }
127
128 /**
129  * Enqueue buffer chain tail
130  */
131 always_inline int
132 session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
133                             u32 offset, u8 is_in_order)
134 {
135   vlib_buffer_t *chain_b;
136   u32 chain_bi, len, diff;
137   vlib_main_t *vm = vlib_get_main ();
138   u8 *data;
139   u32 written = 0;
140   int rv = 0;
141
142   if (is_in_order && offset)
143     {
144       diff = offset - b->current_length;
145       if (diff > b->total_length_not_including_first_buffer)
146         return 0;
147       chain_b = b;
148       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
149       chain_bi = vlib_get_buffer_index (vm, chain_b);
150     }
151   else
152     chain_bi = b->next_buffer;
153
154   do
155     {
156       chain_b = vlib_get_buffer (vm, chain_bi);
157       data = vlib_buffer_get_current (chain_b);
158       len = chain_b->current_length;
159       if (!len)
160         continue;
161       if (is_in_order)
162         {
163           rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
164           if (rv == len)
165             {
166               written += rv;
167             }
168           else if (rv < len)
169             {
170               return (rv > 0) ? (written + rv) : written;
171             }
172           else if (rv > len)
173             {
174               written += rv;
175
176               /* written more than what was left in chain */
177               if (written > b->total_length_not_including_first_buffer)
178                 return written;
179
180               /* drop the bytes that have already been delivered */
181               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
182             }
183         }
184       else
185         {
186           rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
187                                              data);
188           if (rv)
189             {
190               clib_warning ("failed to enqueue multi-buffer seg");
191               return -1;
192             }
193           offset += len;
194         }
195     }
196   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
197           ? chain_b->next_buffer : 0));
198
199   if (is_in_order)
200     return written;
201
202   return 0;
203 }
204
205 /*
206  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
207  * event but on request can queue notification events for later delivery by
208  * calling stream_server_flush_enqueue_events().
209  *
210  * @param tc Transport connection which is to be enqueued data
211  * @param b Buffer to be enqueued
212  * @param offset Offset at which to start enqueueing if out-of-order
213  * @param queue_event Flag to indicate if peer is to be notified or if event
214  *                    is to be queued. The former is useful when more data is
215  *                    enqueued and only one event is to be generated.
216  * @param is_in_order Flag to indicate if data is in order
217  * @return Number of bytes enqueued or a negative value if enqueueing failed.
218  */
219 int
220 stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
221                              u32 offset, u8 queue_event, u8 is_in_order)
222 {
223   stream_session_t *s;
224   int enqueued = 0, rv, in_order_off;
225
226   s = stream_session_get (tc->s_index, tc->thread_index);
227
228   if (is_in_order)
229     {
230       enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
231                                           b->current_length,
232                                           vlib_buffer_get_current (b));
233       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
234                          && enqueued >= 0))
235         {
236           in_order_off = enqueued > b->current_length ? enqueued : 0;
237           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
238           if (rv > 0)
239             enqueued += rv;
240         }
241     }
242   else
243     {
244       rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
245                                          b->current_length,
246                                          vlib_buffer_get_current (b));
247       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
248         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
249       /* if something was enqueued, report even this as success for ooo
250        * segment handling */
251       return rv;
252     }
253
254   if (queue_event)
255     {
256       /* Queue RX event on this fifo. Eventually these will need to be flushed
257        * by calling stream_server_flush_enqueue_events () */
258       session_manager_main_t *smm = vnet_get_session_manager_main ();
259       u32 thread_index = s->thread_index;
260       u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index];
261
262       if (s->enqueue_epoch != my_enqueue_epoch)
263         {
264           s->enqueue_epoch = my_enqueue_epoch;
265           vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index],
266                     s - smm->sessions[thread_index]);
267         }
268     }
269
270   if (is_in_order)
271     return enqueued;
272
273   return 0;
274 }
275
276 /** Check if we have space in rx fifo to push more bytes */
277 u8
278 stream_session_no_space (transport_connection_t * tc, u32 thread_index,
279                          u16 data_len)
280 {
281   stream_session_t *s = stream_session_get (tc->s_index, thread_index);
282
283   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
284     return 1;
285
286   if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
287     return 1;
288
289   return 0;
290 }
291
292 u32
293 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
294 {
295   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
296   if (!s->server_tx_fifo)
297     return 0;
298   return svm_fifo_max_dequeue (s->server_tx_fifo);
299 }
300
301 int
302 stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
303                            u32 offset, u32 max_bytes)
304 {
305   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
306   return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
307 }
308
309 u32
310 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
311 {
312   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
313   return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
314 }
315
316 /**
317  * Notify session peer that new data has been enqueued.
318  *
319  * @param s Stream session for which the event is to be generated.
320  * @param block Flag to indicate if call should block if event queue is full.
321  *
322  * @return 0 on succes or negative number if failed to send notification.
323  */
324 static int
325 stream_session_enqueue_notify (stream_session_t * s, u8 block)
326 {
327   application_t *app;
328   session_fifo_event_t evt;
329   unix_shared_memory_queue_t *q;
330   static u32 serial_number;
331
332   if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
333     {
334       /* Session is closed so app will never clean up. Flush rx fifo */
335       u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
336       if (to_dequeue)
337         svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
338       return 0;
339     }
340
341   /* Get session's server */
342   app = application_get_if_valid (s->app_index);
343
344   if (PREDICT_FALSE (app == 0))
345     {
346       clib_warning ("invalid s->app_index = %d", s->app_index);
347       return 0;
348     }
349
350   /* Built-in server? Hand event to the callback... */
351   if (app->cb_fns.builtin_server_rx_callback)
352     return app->cb_fns.builtin_server_rx_callback (s);
353
354   /* If no event, send one */
355   if (svm_fifo_set_event (s->server_rx_fifo))
356     {
357       /* Fabricate event */
358       evt.fifo = s->server_rx_fifo;
359       evt.event_type = FIFO_EVENT_APP_RX;
360       evt.event_id = serial_number++;
361
362       /* Add event to server's event queue */
363       q = app->event_queue;
364
365       /* Based on request block (or not) for lack of space */
366       if (block || PREDICT_TRUE (q->cursize < q->maxsize))
367         unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
368                                       0 /* do wait for mutex */ );
369       else
370         {
371           clib_warning ("fifo full");
372           return -1;
373         }
374     }
375
376   /* *INDENT-OFF* */
377   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
378       ed->data[0] = evt.event_id;
379       ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
380   }));
381   /* *INDENT-ON* */
382
383   return 0;
384 }
385
386 /**
387  * Flushes queue of sessions that are to be notified of new data
388  * enqueued events.
389  *
390  * @param thread_index Thread index for which the flush is to be performed.
391  * @return 0 on success or a positive number indicating the number of
392  *         failures due to API queue being full.
393  */
394 int
395 session_manager_flush_enqueue_events (u32 thread_index)
396 {
397   session_manager_main_t *smm = &session_manager_main;
398   u32 *session_indices_to_enqueue;
399   int i, errors = 0;
400
401   session_indices_to_enqueue =
402     smm->session_indices_to_enqueue_by_thread[thread_index];
403
404   for (i = 0; i < vec_len (session_indices_to_enqueue); i++)
405     {
406       stream_session_t *s0;
407
408       /* Get session */
409       s0 = stream_session_get_if_valid (session_indices_to_enqueue[i],
410                                         thread_index);
411       if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ ))
412         {
413           errors++;
414         }
415     }
416
417   vec_reset_length (session_indices_to_enqueue);
418
419   smm->session_indices_to_enqueue_by_thread[thread_index] =
420     session_indices_to_enqueue;
421
422   /* Increment enqueue epoch for next round */
423   smm->current_enqueue_epoch[thread_index]++;
424
425   return errors;
426 }
427
428 /**
429  * Init fifo tail and head pointers
430  *
431  * Useful if transport uses absolute offsets for tracking ooo segments.
432  */
433 void
434 stream_session_init_fifos_pointers (transport_connection_t * tc,
435                                     u32 rx_pointer, u32 tx_pointer)
436 {
437   stream_session_t *s;
438   s = stream_session_get (tc->s_index, tc->thread_index);
439   svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
440   svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
441 }
442
443 int
444 stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
445 {
446   application_t *app;
447   stream_session_t *new_s = 0;
448   u64 handle;
449   u32 opaque = 0;
450   int error = 0;
451   u8 st;
452
453   st = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4);
454   handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
455                                                    tc->lcl_port, tc->rmt_port,
456                                                    st);
457   if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
458     {
459       clib_warning ("This can't be good!");
460       return -1;
461     }
462
463   /* Get the app's index from the handle we stored when opening connection
464    * and the opaque (api_context for external apps) from transport session
465    * index*/
466   app = application_get_if_valid (handle >> 32);
467   if (!app)
468     return -1;
469
470   opaque = tc->s_index;
471
472   if (!is_fail)
473     {
474       segment_manager_t *sm;
475       u8 alloc_fifos;
476       sm = application_get_connect_segment_manager (app);
477       alloc_fifos = application_is_proxy (app);
478       /* Create new session (svm segments are allocated if needed) */
479       if (stream_session_create_i (sm, tc, alloc_fifos, &new_s))
480         {
481           is_fail = 1;
482           error = -1;
483         }
484       else
485         new_s->app_index = app->index;
486     }
487
488   /* Notify client application */
489   if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
490                                               is_fail))
491     {
492       clib_warning ("failed to notify app");
493       if (!is_fail)
494         stream_session_disconnect (new_s);
495     }
496   else
497     {
498       if (!is_fail)
499         new_s->session_state = SESSION_STATE_READY;
500     }
501
502   /* Cleanup session lookup */
503   stream_session_half_open_table_del (tc);
504
505   return error;
506 }
507
508 void
509 stream_session_accept_notify (transport_connection_t * tc)
510 {
511   application_t *server;
512   stream_session_t *s;
513
514   s = stream_session_get (tc->s_index, tc->thread_index);
515   server = application_get (s->app_index);
516   server->cb_fns.session_accept_callback (s);
517 }
518
519 /**
520  * Notification from transport that connection is being closed.
521  *
522  * A disconnect is sent to application but state is not removed. Once
523  * disconnect is acknowledged by application, session disconnect is called.
524  * Ultimately this leads to close being called on transport (passive close).
525  */
526 void
527 stream_session_disconnect_notify (transport_connection_t * tc)
528 {
529   application_t *server;
530   stream_session_t *s;
531
532   s = stream_session_get (tc->s_index, tc->thread_index);
533   server = application_get (s->app_index);
534   server->cb_fns.session_disconnect_callback (s);
535 }
536
537 /**
538  * Cleans up session and associated app if needed.
539  */
540 void
541 stream_session_delete (stream_session_t * s)
542 {
543   session_manager_main_t *smm = vnet_get_session_manager_main ();
544   int rv;
545
546   /* Delete from the main lookup table. */
547   if ((rv = stream_session_table_del (s)))
548     clib_warning ("hash delete error, rv %d", rv);
549
550   /* Cleanup fifo segments */
551   segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
552                                  s->server_tx_fifo);
553
554   pool_put (smm->sessions[s->thread_index], s);
555   if (CLIB_DEBUG)
556     memset (s, 0xFA, sizeof (*s));
557 }
558
559 /**
560  * Notification from transport that connection is being deleted
561  *
562  * This should be called only on previously fully established sessions. For
563  * instance failed connects should call stream_session_connect_notify and
564  * indicate that the connect has failed.
565  */
566 void
567 stream_session_delete_notify (transport_connection_t * tc)
568 {
569   stream_session_t *s;
570
571   /* App might've been removed already */
572   s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
573   if (!s)
574     return;
575   stream_session_delete (s);
576 }
577
578 /**
579  * Notify application that connection has been reset.
580  */
581 void
582 stream_session_reset_notify (transport_connection_t * tc)
583 {
584   stream_session_t *s;
585   application_t *app;
586   s = stream_session_get (tc->s_index, tc->thread_index);
587
588   app = application_get (s->app_index);
589   app->cb_fns.session_reset_callback (s);
590 }
591
592 /**
593  * Accept a stream session. Optionally ping the server by callback.
594  */
595 int
596 stream_session_accept (transport_connection_t * tc, u32 listener_index,
597                        u8 sst, u8 notify)
598 {
599   application_t *server;
600   stream_session_t *s, *listener;
601   segment_manager_t *sm;
602
603   int rv;
604
605   /* Find the server */
606   listener = listen_session_get (sst, listener_index);
607   server = application_get (listener->app_index);
608
609   sm = application_get_listen_segment_manager (server, listener);
610   if ((rv = stream_session_create_i (sm, tc, 1, &s)))
611     return rv;
612
613   s->app_index = server->index;
614   s->listener_index = listener_index;
615   s->session_state = SESSION_STATE_ACCEPTING;
616
617   /* Shoulder-tap the server */
618   if (notify)
619     {
620       server->cb_fns.session_accept_callback (s);
621     }
622
623   return 0;
624 }
625
626 /**
627  * Ask transport to open connection to remote transport endpoint.
628  *
629  * Stores handle for matching request with reply since the call can be
630  * asynchronous. For instance, for TCP the 3-way handshake must complete
631  * before reply comes. Session is only created once connection is established.
632  *
633  * @param app_index Index of the application requesting the connect
634  * @param st Session type requested.
635  * @param tep Remote transport endpoint
636  * @param res Resulting transport connection .
637  */
638 int
639 stream_session_open (u32 app_index, session_type_t st,
640                      transport_endpoint_t * rmt,
641                      transport_connection_t ** res)
642 {
643   transport_connection_t *tc;
644   int rv;
645   u64 handle;
646
647   rv = tp_vfts[st].open (rmt);
648   if (rv < 0)
649     {
650       clib_warning ("Transport failed to open connection.");
651       return VNET_API_ERROR_SESSION_CONNECT_FAIL;
652     }
653
654   tc = tp_vfts[st].get_half_open ((u32) rv);
655
656   /* Save app and tc index. The latter is needed to help establish the
657    * connection while the former is needed when the connect notify comes
658    * and we have to notify the external app */
659   handle = (((u64) app_index) << 32) | (u64) tc->c_index;
660
661   /* Add to the half-open lookup table */
662   stream_session_half_open_table_add (tc, handle);
663
664   *res = tc;
665
666   return 0;
667 }
668
669 /**
670  * Ask transport to listen on local transport endpoint.
671  *
672  * @param s Session for which listen will be called. Note that unlike
673  *          established sessions, listen sessions are not associated to a
674  *          thread.
675  * @param tep Local endpoint to be listened on.
676  */
677 int
678 stream_session_listen (stream_session_t * s, transport_endpoint_t * tep)
679 {
680   transport_connection_t *tc;
681   u32 tci;
682
683   /* Transport bind/listen  */
684   tci = tp_vfts[s->session_type].bind (s->session_index, tep);
685
686   if (tci == (u32) ~ 0)
687     return -1;
688
689   /* Attach transport to session */
690   s->connection_index = tci;
691   tc = tp_vfts[s->session_type].get_listener (tci);
692
693   /* Weird but handle it ... */
694   if (tc == 0)
695     return -1;
696
697   /* Add to the main lookup table */
698   stream_session_table_add_for_tc (tc, s->session_index);
699
700   return 0;
701 }
702
703 /**
704  * Ask transport to stop listening on local transport endpoint.
705  *
706  * @param s Session to stop listening on. It must be in state LISTENING.
707  */
708 int
709 stream_session_stop_listen (stream_session_t * s)
710 {
711   transport_connection_t *tc;
712
713   if (s->session_state != SESSION_STATE_LISTENING)
714     {
715       clib_warning ("not a listening session");
716       return -1;
717     }
718
719   tc = tp_vfts[s->session_type].get_listener (s->connection_index);
720   if (!tc)
721     {
722       clib_warning ("no transport");
723       return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
724     }
725
726   stream_session_table_del_for_tc (tc);
727   tp_vfts[s->session_type].unbind (s->connection_index);
728   return 0;
729 }
730
731 void
732 session_send_session_evt_to_thread (u64 session_handle,
733                                     fifo_event_type_t evt_type,
734                                     u32 thread_index)
735 {
736   static u16 serial_number = 0;
737   session_fifo_event_t evt;
738   unix_shared_memory_queue_t *q;
739
740   /* Fabricate event */
741   evt.session_handle = session_handle;
742   evt.event_type = evt_type;
743   evt.event_id = serial_number++;
744
745   q = session_manager_get_vpp_event_queue (thread_index);
746
747   /* Based on request block (or not) for lack of space */
748   if (PREDICT_TRUE (q->cursize < q->maxsize))
749     {
750       if (unix_shared_memory_queue_add (q, (u8 *) & evt,
751                                         1 /* do wait for mutex */ ))
752         {
753           clib_warning ("failed to enqueue evt");
754         }
755     }
756   else
757     {
758       clib_warning ("queue full");
759       return;
760     }
761 }
762
763 /**
764  * Disconnect session and propagate to transport. This should eventually
765  * result in a delete notification that allows us to cleanup session state.
766  * Called for both active/passive disconnects.
767  *
768  * Should be called from the session's thread.
769  */
770 void
771 stream_session_disconnect (stream_session_t * s)
772 {
773   s->session_state = SESSION_STATE_CLOSED;
774   tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
775 }
776
777 /**
778  * Cleanup transport and session state.
779  *
780  * Notify transport of the cleanup, wait for a delete notify to actually
781  * remove the session state.
782  */
783 void
784 stream_session_cleanup (stream_session_t * s)
785 {
786   int rv;
787
788   s->session_state = SESSION_STATE_CLOSED;
789
790   /* Delete from the main lookup table to avoid more enqueues */
791   rv = stream_session_table_del (s);
792   if (rv)
793     clib_warning ("hash delete error, rv %d", rv);
794
795   tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
796 }
797
798 /**
799  * Allocate vpp event queue (once) per worker thread
800  */
801 void
802 session_vpp_event_queue_allocate (session_manager_main_t * smm,
803                                   u32 thread_index)
804 {
805   api_main_t *am = &api_main;
806   void *oldheap;
807   u32 event_queue_length = 2048;
808
809   if (smm->vpp_event_queues[thread_index] == 0)
810     {
811       /* Allocate event fifo in the /vpe-api shared-memory segment */
812       oldheap = svm_push_data_heap (am->vlib_rp);
813
814       if (smm->configured_event_queue_length)
815         event_queue_length = smm->configured_event_queue_length;
816
817       smm->vpp_event_queues[thread_index] =
818         unix_shared_memory_queue_init
819         (event_queue_length,
820          sizeof (session_fifo_event_t), 0 /* consumer pid */ ,
821          0 /* (do not) send signal when queue non-empty */ );
822
823       svm_pop_heap (oldheap);
824     }
825 }
826
827 session_type_t
828 session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4)
829 {
830   if (proto == TRANSPORT_PROTO_TCP)
831     {
832       if (is_ip4)
833         return SESSION_TYPE_IP4_TCP;
834       else
835         return SESSION_TYPE_IP6_TCP;
836     }
837   else
838     {
839       if (is_ip4)
840         return SESSION_TYPE_IP4_UDP;
841       else
842         return SESSION_TYPE_IP6_UDP;
843     }
844
845   return SESSION_N_TYPES;
846 }
847
848 static clib_error_t *
849 session_manager_main_enable (vlib_main_t * vm)
850 {
851   session_manager_main_t *smm = &session_manager_main;
852   vlib_thread_main_t *vtm = vlib_get_thread_main ();
853   u32 num_threads;
854   u32 preallocated_sessions_per_worker;
855   int i;
856
857   num_threads = 1 /* main thread */  + vtm->n_threads;
858
859   if (num_threads < 1)
860     return clib_error_return (0, "n_thread_stacks not set");
861
862   /* $$$ config parameters */
863   svm_fifo_segment_init (0x200000000ULL /* first segment base VA */ ,
864                          20 /* timeout in seconds */ );
865
866   /* configure per-thread ** vectors */
867   vec_validate (smm->sessions, num_threads - 1);
868   vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
869   vec_validate (smm->tx_buffers, num_threads - 1);
870   vec_validate (smm->pending_event_vector, num_threads - 1);
871   vec_validate (smm->free_event_vector, num_threads - 1);
872   vec_validate (smm->current_enqueue_epoch, num_threads - 1);
873   vec_validate (smm->vpp_event_queues, num_threads - 1);
874
875   for (i = 0; i < num_threads; i++)
876     {
877       vec_validate (smm->free_event_vector[i], 0);
878       _vec_len (smm->free_event_vector[i]) = 0;
879       vec_validate (smm->pending_event_vector[i], 0);
880       _vec_len (smm->pending_event_vector[i]) = 0;
881     }
882
883 #if SESSION_DBG
884   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
885 #endif
886
887   /* Allocate vpp event queues */
888   for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
889     session_vpp_event_queue_allocate (smm, i);
890
891   /* Preallocate sessions */
892   if (num_threads == 1)
893     {
894       for (i = 0; i < smm->preallocated_sessions; i++)
895         {
896           stream_session_t *ss __attribute__ ((unused));
897           pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES);
898         }
899
900       for (i = 0; i < smm->preallocated_sessions; i++)
901         pool_put_index (smm->sessions[0], i);
902     }
903   else
904     {
905       int j;
906       preallocated_sessions_per_worker = smm->preallocated_sessions /
907         (num_threads - 1);
908
909       for (j = 1; j < num_threads; j++)
910         {
911           for (i = 0; i < preallocated_sessions_per_worker; i++)
912             {
913               stream_session_t *ss __attribute__ ((unused));
914               pool_get_aligned (smm->sessions[j], ss, CLIB_CACHE_LINE_BYTES);
915             }
916           for (i = 0; i < preallocated_sessions_per_worker; i++)
917             pool_put_index (smm->sessions[j], i);
918         }
919     }
920
921   session_lookup_init ();
922
923   smm->is_enabled = 1;
924
925   /* Enable TCP transport */
926   vnet_tcp_enable_disable (vm, 1);
927
928   return 0;
929 }
930
931 void
932 session_node_enable_disable (u8 is_en)
933 {
934   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
935   /* *INDENT-OFF* */
936   foreach_vlib_main (({
937     vlib_node_set_state (this_vlib_main, session_queue_node.index,
938                          state);
939   }));
940   /* *INDENT-ON* */
941 }
942
943 clib_error_t *
944 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
945 {
946   if (is_en)
947     {
948       if (session_manager_main.is_enabled)
949         return 0;
950
951       session_node_enable_disable (is_en);
952
953       return session_manager_main_enable (vm);
954     }
955   else
956     {
957       session_manager_main.is_enabled = 0;
958       session_node_enable_disable (is_en);
959     }
960
961   return 0;
962 }
963
964 clib_error_t *
965 session_manager_main_init (vlib_main_t * vm)
966 {
967   session_manager_main_t *smm = &session_manager_main;
968   smm->is_enabled = 0;
969   return 0;
970 }
971
972 VLIB_INIT_FUNCTION (session_manager_main_init);
973
974 static clib_error_t *
975 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
976 {
977   session_manager_main_t *smm = &session_manager_main;
978   u32 nitems;
979   uword tmp;
980
981   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
982     {
983       if (unformat (input, "event-queue-length %d", &nitems))
984         {
985           if (nitems >= 2048)
986             smm->configured_event_queue_length = nitems;
987           else
988             clib_warning ("event queue length %d too small, ignored", nitems);
989         }
990       else if (unformat (input, "preallocated-sessions %d",
991                          &smm->preallocated_sessions))
992         ;
993       else if (unformat (input, "v4-session-table-buckets %d",
994                          &smm->configured_v4_session_table_buckets))
995         ;
996       else if (unformat (input, "v4-halfopen-table-buckets %d",
997                          &smm->configured_v4_halfopen_table_buckets))
998         ;
999       else if (unformat (input, "v6-session-table-buckets %d",
1000                          &smm->configured_v6_session_table_buckets))
1001         ;
1002       else if (unformat (input, "v6-halfopen-table-buckets %d",
1003                          &smm->configured_v6_halfopen_table_buckets))
1004         ;
1005       else if (unformat (input, "v4-session-table-memory %U",
1006                          unformat_memory_size, &tmp))
1007         {
1008           if (tmp >= 0x100000000)
1009             return clib_error_return (0, "memory size %llx (%lld) too large",
1010                                       tmp, tmp);
1011           smm->configured_v4_session_table_memory = tmp;
1012         }
1013       else if (unformat (input, "v4-halfopen-table-memory %U",
1014                          unformat_memory_size, &tmp))
1015         {
1016           if (tmp >= 0x100000000)
1017             return clib_error_return (0, "memory size %llx (%lld) too large",
1018                                       tmp, tmp);
1019           smm->configured_v4_halfopen_table_memory = tmp;
1020         }
1021       else if (unformat (input, "v6-session-table-memory %U",
1022                          unformat_memory_size, &tmp))
1023         {
1024           if (tmp >= 0x100000000)
1025             return clib_error_return (0, "memory size %llx (%lld) too large",
1026                                       tmp, tmp);
1027           smm->configured_v6_session_table_memory = tmp;
1028         }
1029       else if (unformat (input, "v6-halfopen-table-memory %U",
1030                          unformat_memory_size, &tmp))
1031         {
1032           if (tmp >= 0x100000000)
1033             return clib_error_return (0, "memory size %llx (%lld) too large",
1034                                       tmp, tmp);
1035           smm->configured_v6_halfopen_table_memory = tmp;
1036         }
1037       else
1038         return clib_error_return (0, "unknown input `%U'",
1039                                   format_unformat_error, input);
1040     }
1041   return 0;
1042 }
1043
1044 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1045
1046 /*
1047  * fd.io coding-style-patch-verification: ON
1048  *
1049  * Local Variables:
1050  * eval: (c-set-style "gnu")
1051  * End:
1052  */