dc930ce87d3f5b273f4c75d0f1f80ff8682d3dab
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vlibmemory/api.h>
24 #include <vnet/dpo/load_balance.h>
25 #include <vnet/fib/ip4_fib.h>
26 #include <vnet/tcp/tcp.h>
27
28 session_manager_main_t session_manager_main;
29 extern transport_proto_vft_t *tp_vfts;
30
31 int
32 stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
33                          u8 alloc_fifos, stream_session_t ** ret_s)
34 {
35   session_manager_main_t *smm = &session_manager_main;
36   svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
37   u32 fifo_segment_index;
38   u32 pool_index;
39   stream_session_t *s;
40   u64 value;
41   u32 thread_index = tc->thread_index;
42   int rv;
43
44   ASSERT (thread_index == vlib_get_thread_index ());
45
46   /* Create the session */
47   pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
48   memset (s, 0, sizeof (*s));
49   pool_index = s - smm->sessions[thread_index];
50
51   /* Allocate fifos */
52   if (alloc_fifos)
53     {
54       if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
55                                                      &server_tx_fifo,
56                                                      &fifo_segment_index)))
57         {
58           pool_put (smm->sessions[thread_index], s);
59           return rv;
60         }
61       /* Initialize backpointers */
62       server_rx_fifo->master_session_index = pool_index;
63       server_rx_fifo->master_thread_index = thread_index;
64
65       server_tx_fifo->master_session_index = pool_index;
66       server_tx_fifo->master_thread_index = thread_index;
67
68       s->server_rx_fifo = server_rx_fifo;
69       s->server_tx_fifo = server_tx_fifo;
70       s->svm_segment_index = fifo_segment_index;
71     }
72
73   /* Initialize state machine, such as it is... */
74   s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
75                                                     tc->is_ip4);
76   s->session_state = SESSION_STATE_CONNECTING;
77   s->thread_index = thread_index;
78   s->session_index = pool_index;
79
80   /* Attach transport to session */
81   s->connection_index = tc->c_index;
82
83   /* Attach session to transport */
84   tc->s_index = s->session_index;
85
86   /* Add to the main lookup table */
87   value = stream_session_handle (s);
88   stream_session_table_add_for_tc (tc, value);
89
90   *ret_s = s;
91
92   return 0;
93 }
94
95 /**
96  * Discards bytes from buffer chain
97  *
98  * It discards n_bytes_to_drop starting at first buffer after chain_b
99  */
100 always_inline void
101 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
102                                      vlib_buffer_t ** chain_b,
103                                      u32 n_bytes_to_drop)
104 {
105   vlib_buffer_t *next = *chain_b;
106   u32 to_drop = n_bytes_to_drop;
107   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
108   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
109     {
110       next = vlib_get_buffer (vm, next->next_buffer);
111       if (next->current_length > to_drop)
112         {
113           vlib_buffer_advance (next, to_drop);
114           to_drop = 0;
115         }
116       else
117         {
118           to_drop -= next->current_length;
119           next->current_length = 0;
120         }
121     }
122   *chain_b = next;
123
124   if (to_drop == 0)
125     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
126 }
127
128 /**
129  * Enqueue buffer chain tail
130  */
131 always_inline int
132 session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
133                             u32 offset, u8 is_in_order)
134 {
135   vlib_buffer_t *chain_b;
136   u32 chain_bi, len, diff;
137   vlib_main_t *vm = vlib_get_main ();
138   u8 *data;
139   u32 written = 0;
140   int rv = 0;
141
142   if (is_in_order && offset)
143     {
144       diff = offset - b->current_length;
145       if (diff > b->total_length_not_including_first_buffer)
146         return 0;
147       chain_b = b;
148       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
149       chain_bi = vlib_get_buffer_index (vm, chain_b);
150     }
151   else
152     chain_bi = b->next_buffer;
153
154   do
155     {
156       chain_b = vlib_get_buffer (vm, chain_bi);
157       data = vlib_buffer_get_current (chain_b);
158       len = chain_b->current_length;
159       if (!len)
160         continue;
161       if (is_in_order)
162         {
163           rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
164           if (rv == len)
165             {
166               written += rv;
167             }
168           else if (rv < len)
169             {
170               return (rv > 0) ? (written + rv) : written;
171             }
172           else if (rv > len)
173             {
174               written += rv;
175
176               /* written more than what was left in chain */
177               if (written > b->total_length_not_including_first_buffer)
178                 return written;
179
180               /* drop the bytes that have already been delivered */
181               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
182             }
183         }
184       else
185         {
186           rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
187                                              data);
188           if (rv)
189             {
190               clib_warning ("failed to enqueue multi-buffer seg");
191               return -1;
192             }
193           offset += len;
194         }
195     }
196   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
197           ? chain_b->next_buffer : 0));
198
199   if (is_in_order)
200     return written;
201
202   return 0;
203 }
204
205 /*
206  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
207  * event but on request can queue notification events for later delivery by
208  * calling stream_server_flush_enqueue_events().
209  *
210  * @param tc Transport connection which is to be enqueued data
211  * @param b Buffer to be enqueued
212  * @param offset Offset at which to start enqueueing if out-of-order
213  * @param queue_event Flag to indicate if peer is to be notified or if event
214  *                    is to be queued. The former is useful when more data is
215  *                    enqueued and only one event is to be generated.
216  * @param is_in_order Flag to indicate if data is in order
217  * @return Number of bytes enqueued or a negative value if enqueueing failed.
218  */
219 int
220 stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
221                              u32 offset, u8 queue_event, u8 is_in_order)
222 {
223   stream_session_t *s;
224   int enqueued = 0, rv, in_order_off;
225
226   s = stream_session_get (tc->s_index, tc->thread_index);
227
228   if (is_in_order)
229     {
230       enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
231                                           b->current_length,
232                                           vlib_buffer_get_current (b));
233       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
234                          && enqueued >= 0))
235         {
236           in_order_off = enqueued > b->current_length ? enqueued : 0;
237           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
238           if (rv > 0)
239             enqueued += rv;
240         }
241     }
242   else
243     {
244       rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
245                                          b->current_length,
246                                          vlib_buffer_get_current (b));
247       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
248         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
249       /* if something was enqueued, report even this as success for ooo
250        * segment handling */
251       return rv;
252     }
253
254   if (queue_event)
255     {
256       /* Queue RX event on this fifo. Eventually these will need to be flushed
257        * by calling stream_server_flush_enqueue_events () */
258       session_manager_main_t *smm = vnet_get_session_manager_main ();
259       u32 thread_index = s->thread_index;
260       u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index];
261
262       if (s->enqueue_epoch != my_enqueue_epoch)
263         {
264           s->enqueue_epoch = my_enqueue_epoch;
265           vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index],
266                     s - smm->sessions[thread_index]);
267         }
268     }
269
270   return enqueued;
271 }
272
273 /** Check if we have space in rx fifo to push more bytes */
274 u8
275 stream_session_no_space (transport_connection_t * tc, u32 thread_index,
276                          u16 data_len)
277 {
278   stream_session_t *s = stream_session_get (tc->s_index, thread_index);
279
280   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
281     return 1;
282
283   if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
284     return 1;
285
286   return 0;
287 }
288
289 u32
290 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
291 {
292   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
293   if (!s->server_tx_fifo)
294     return 0;
295   return svm_fifo_max_dequeue (s->server_tx_fifo);
296 }
297
298 int
299 stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
300                            u32 offset, u32 max_bytes)
301 {
302   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
303   return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
304 }
305
306 u32
307 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
308 {
309   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
310   return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
311 }
312
313 /**
314  * Notify session peer that new data has been enqueued.
315  *
316  * @param s Stream session for which the event is to be generated.
317  * @param block Flag to indicate if call should block if event queue is full.
318  *
319  * @return 0 on succes or negative number if failed to send notification.
320  */
321 static int
322 stream_session_enqueue_notify (stream_session_t * s, u8 block)
323 {
324   application_t *app;
325   session_fifo_event_t evt;
326   unix_shared_memory_queue_t *q;
327   static u32 serial_number;
328
329   if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
330     {
331       /* Session is closed so app will never clean up. Flush rx fifo */
332       u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
333       if (to_dequeue)
334         svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
335       return 0;
336     }
337
338   /* Get session's server */
339   app = application_get_if_valid (s->app_index);
340
341   if (PREDICT_FALSE (app == 0))
342     {
343       clib_warning ("invalid s->app_index = %d", s->app_index);
344       return 0;
345     }
346
347   /* Built-in server? Hand event to the callback... */
348   if (app->cb_fns.builtin_server_rx_callback)
349     return app->cb_fns.builtin_server_rx_callback (s);
350
351   /* If no event, send one */
352   if (svm_fifo_set_event (s->server_rx_fifo))
353     {
354       /* Fabricate event */
355       evt.fifo = s->server_rx_fifo;
356       evt.event_type = FIFO_EVENT_APP_RX;
357       evt.event_id = serial_number++;
358
359       /* Add event to server's event queue */
360       q = app->event_queue;
361
362       /* Based on request block (or not) for lack of space */
363       if (block || PREDICT_TRUE (q->cursize < q->maxsize))
364         unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
365                                       0 /* do wait for mutex */ );
366       else
367         {
368           clib_warning ("fifo full");
369           return -1;
370         }
371     }
372
373   /* *INDENT-OFF* */
374   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
375       ed->data[0] = evt.event_id;
376       ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
377   }));
378   /* *INDENT-ON* */
379
380   return 0;
381 }
382
383 /**
384  * Flushes queue of sessions that are to be notified of new data
385  * enqueued events.
386  *
387  * @param thread_index Thread index for which the flush is to be performed.
388  * @return 0 on success or a positive number indicating the number of
389  *         failures due to API queue being full.
390  */
391 int
392 session_manager_flush_enqueue_events (u32 thread_index)
393 {
394   session_manager_main_t *smm = &session_manager_main;
395   u32 *session_indices_to_enqueue;
396   int i, errors = 0;
397
398   session_indices_to_enqueue =
399     smm->session_indices_to_enqueue_by_thread[thread_index];
400
401   for (i = 0; i < vec_len (session_indices_to_enqueue); i++)
402     {
403       stream_session_t *s0;
404
405       /* Get session */
406       s0 = stream_session_get_if_valid (session_indices_to_enqueue[i],
407                                         thread_index);
408       if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ ))
409         {
410           errors++;
411         }
412     }
413
414   vec_reset_length (session_indices_to_enqueue);
415
416   smm->session_indices_to_enqueue_by_thread[thread_index] =
417     session_indices_to_enqueue;
418
419   /* Increment enqueue epoch for next round */
420   smm->current_enqueue_epoch[thread_index]++;
421
422   return errors;
423 }
424
425 /**
426  * Init fifo tail and head pointers
427  *
428  * Useful if transport uses absolute offsets for tracking ooo segments.
429  */
430 void
431 stream_session_init_fifos_pointers (transport_connection_t * tc,
432                                     u32 rx_pointer, u32 tx_pointer)
433 {
434   stream_session_t *s;
435   s = stream_session_get (tc->s_index, tc->thread_index);
436   svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
437   svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
438 }
439
440 int
441 stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
442 {
443   application_t *app;
444   stream_session_t *new_s = 0;
445   u64 handle;
446   u32 opaque = 0;
447   int error = 0;
448   u8 st;
449
450   st = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4);
451   handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
452                                                    tc->lcl_port, tc->rmt_port,
453                                                    st);
454   if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
455     {
456       TCP_DBG ("half-open was removed!");
457       return -1;
458     }
459
460   /* Cleanup half-open table */
461   stream_session_half_open_table_del (tc);
462
463   /* Get the app's index from the handle we stored when opening connection
464    * and the opaque (api_context for external apps) from transport session
465    * index */
466   app = application_get_if_valid (handle >> 32);
467   if (!app)
468     return -1;
469
470   opaque = tc->s_index;
471
472   if (!is_fail)
473     {
474       segment_manager_t *sm;
475       u8 alloc_fifos;
476       sm = application_get_connect_segment_manager (app);
477       alloc_fifos = application_is_proxy (app);
478       /* Create new session (svm segments are allocated if needed) */
479       if (stream_session_create_i (sm, tc, alloc_fifos, &new_s))
480         {
481           is_fail = 1;
482           error = -1;
483         }
484       else
485         new_s->app_index = app->index;
486     }
487
488   /* Notify client application */
489   if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
490                                               is_fail))
491     {
492       clib_warning ("failed to notify app");
493       if (!is_fail)
494         stream_session_disconnect (new_s);
495     }
496   else
497     {
498       if (!is_fail)
499         new_s->session_state = SESSION_STATE_READY;
500     }
501
502   return error;
503 }
504
505 void
506 stream_session_accept_notify (transport_connection_t * tc)
507 {
508   application_t *server;
509   stream_session_t *s;
510
511   s = stream_session_get (tc->s_index, tc->thread_index);
512   server = application_get (s->app_index);
513   server->cb_fns.session_accept_callback (s);
514 }
515
516 /**
517  * Notification from transport that connection is being closed.
518  *
519  * A disconnect is sent to application but state is not removed. Once
520  * disconnect is acknowledged by application, session disconnect is called.
521  * Ultimately this leads to close being called on transport (passive close).
522  */
523 void
524 stream_session_disconnect_notify (transport_connection_t * tc)
525 {
526   application_t *server;
527   stream_session_t *s;
528
529   s = stream_session_get (tc->s_index, tc->thread_index);
530   server = application_get (s->app_index);
531   server->cb_fns.session_disconnect_callback (s);
532 }
533
534 /**
535  * Cleans up session and lookup table.
536  */
537 void
538 stream_session_delete (stream_session_t * s)
539 {
540   session_manager_main_t *smm = vnet_get_session_manager_main ();
541   int rv;
542
543   /* Delete from the main lookup table. */
544   if ((rv = stream_session_table_del (s)))
545     clib_warning ("hash delete error, rv %d", rv);
546
547   /* Cleanup fifo segments */
548   segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
549                                  s->server_tx_fifo);
550
551   pool_put (smm->sessions[s->thread_index], s);
552   if (CLIB_DEBUG)
553     memset (s, 0xFA, sizeof (*s));
554 }
555
556 /**
557  * Notification from transport that connection is being deleted
558  *
559  * This removes the session if it is still valid. It should be called only on
560  * previously fully established sessions. For instance failed connects should
561  * call stream_session_connect_notify and indicate that the connect has
562  * failed.
563  */
564 void
565 stream_session_delete_notify (transport_connection_t * tc)
566 {
567   stream_session_t *s;
568
569   /* App might've been removed already */
570   s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
571   if (!s)
572     return;
573   stream_session_delete (s);
574 }
575
576 /**
577  * Notify application that connection has been reset.
578  */
579 void
580 stream_session_reset_notify (transport_connection_t * tc)
581 {
582   stream_session_t *s;
583   application_t *app;
584   s = stream_session_get (tc->s_index, tc->thread_index);
585
586   app = application_get (s->app_index);
587   app->cb_fns.session_reset_callback (s);
588 }
589
590 /**
591  * Accept a stream session. Optionally ping the server by callback.
592  */
593 int
594 stream_session_accept (transport_connection_t * tc, u32 listener_index,
595                        u8 sst, u8 notify)
596 {
597   application_t *server;
598   stream_session_t *s, *listener;
599   segment_manager_t *sm;
600
601   int rv;
602
603   /* Find the server */
604   listener = listen_session_get (sst, listener_index);
605   server = application_get (listener->app_index);
606
607   sm = application_get_listen_segment_manager (server, listener);
608   if ((rv = stream_session_create_i (sm, tc, 1, &s)))
609     return rv;
610
611   s->app_index = server->index;
612   s->listener_index = listener_index;
613   s->session_state = SESSION_STATE_ACCEPTING;
614
615   /* Shoulder-tap the server */
616   if (notify)
617     {
618       server->cb_fns.session_accept_callback (s);
619     }
620
621   return 0;
622 }
623
624 /**
625  * Ask transport to open connection to remote transport endpoint.
626  *
627  * Stores handle for matching request with reply since the call can be
628  * asynchronous. For instance, for TCP the 3-way handshake must complete
629  * before reply comes. Session is only created once connection is established.
630  *
631  * @param app_index Index of the application requesting the connect
632  * @param st Session type requested.
633  * @param tep Remote transport endpoint
634  * @param res Resulting transport connection .
635  */
636 int
637 stream_session_open (u32 app_index, session_type_t st,
638                      transport_endpoint_t * rmt,
639                      transport_connection_t ** res)
640 {
641   transport_connection_t *tc;
642   int rv;
643   u64 handle;
644
645   rv = tp_vfts[st].open (rmt);
646   if (rv < 0)
647     {
648       clib_warning ("Transport failed to open connection.");
649       return VNET_API_ERROR_SESSION_CONNECT_FAIL;
650     }
651
652   tc = tp_vfts[st].get_half_open ((u32) rv);
653
654   /* Save app and tc index. The latter is needed to help establish the
655    * connection while the former is needed when the connect notify comes
656    * and we have to notify the external app */
657   handle = (((u64) app_index) << 32) | (u64) tc->c_index;
658
659   /* Add to the half-open lookup table */
660   stream_session_half_open_table_add (tc, handle);
661
662   *res = tc;
663
664   return 0;
665 }
666
667 /**
668  * Ask transport to listen on local transport endpoint.
669  *
670  * @param s Session for which listen will be called. Note that unlike
671  *          established sessions, listen sessions are not associated to a
672  *          thread.
673  * @param tep Local endpoint to be listened on.
674  */
675 int
676 stream_session_listen (stream_session_t * s, transport_endpoint_t * tep)
677 {
678   transport_connection_t *tc;
679   u32 tci;
680
681   /* Transport bind/listen  */
682   tci = tp_vfts[s->session_type].bind (s->session_index, tep);
683
684   if (tci == (u32) ~ 0)
685     return -1;
686
687   /* Attach transport to session */
688   s->connection_index = tci;
689   tc = tp_vfts[s->session_type].get_listener (tci);
690
691   /* Weird but handle it ... */
692   if (tc == 0)
693     return -1;
694
695   /* Add to the main lookup table */
696   stream_session_table_add_for_tc (tc, s->session_index);
697
698   return 0;
699 }
700
701 /**
702  * Ask transport to stop listening on local transport endpoint.
703  *
704  * @param s Session to stop listening on. It must be in state LISTENING.
705  */
706 int
707 stream_session_stop_listen (stream_session_t * s)
708 {
709   transport_connection_t *tc;
710
711   if (s->session_state != SESSION_STATE_LISTENING)
712     {
713       clib_warning ("not a listening session");
714       return -1;
715     }
716
717   tc = tp_vfts[s->session_type].get_listener (s->connection_index);
718   if (!tc)
719     {
720       clib_warning ("no transport");
721       return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
722     }
723
724   stream_session_table_del_for_tc (tc);
725   tp_vfts[s->session_type].unbind (s->connection_index);
726   return 0;
727 }
728
729 void
730 session_send_session_evt_to_thread (u64 session_handle,
731                                     fifo_event_type_t evt_type,
732                                     u32 thread_index)
733 {
734   static u16 serial_number = 0;
735   u32 tries = 0;
736   session_fifo_event_t evt;
737   unix_shared_memory_queue_t *q;
738
739   /* Fabricate event */
740   evt.session_handle = session_handle;
741   evt.event_type = evt_type;
742   evt.event_id = serial_number++;
743
744   q = session_manager_get_vpp_event_queue (thread_index);
745   while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1))
746     {
747       if (tries++ == 3)
748         {
749           TCP_DBG ("failed to enqueue evt");
750           break;
751         }
752     }
753 }
754
755 /**
756  * Disconnect session and propagate to transport. This should eventually
757  * result in a delete notification that allows us to cleanup session state.
758  * Called for both active/passive disconnects.
759  *
760  * Should be called from the session's thread.
761  */
762 void
763 stream_session_disconnect (stream_session_t * s)
764 {
765   s->session_state = SESSION_STATE_CLOSED;
766   tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
767 }
768
769 /**
770  * Cleanup transport and session state.
771  *
772  * Notify transport of the cleanup, wait for a delete notify to actually
773  * remove the session state.
774  */
775 void
776 stream_session_cleanup (stream_session_t * s)
777 {
778   int rv;
779
780   s->session_state = SESSION_STATE_CLOSED;
781
782   /* Delete from the main lookup table to avoid more enqueues */
783   rv = stream_session_table_del (s);
784   if (rv)
785     clib_warning ("hash delete error, rv %d", rv);
786
787   tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
788 }
789
790 /**
791  * Allocate vpp event queue (once) per worker thread
792  */
793 void
794 session_vpp_event_queue_allocate (session_manager_main_t * smm,
795                                   u32 thread_index)
796 {
797   api_main_t *am = &api_main;
798   void *oldheap;
799   u32 event_queue_length = 2048;
800
801   if (smm->vpp_event_queues[thread_index] == 0)
802     {
803       /* Allocate event fifo in the /vpe-api shared-memory segment */
804       oldheap = svm_push_data_heap (am->vlib_rp);
805
806       if (smm->configured_event_queue_length)
807         event_queue_length = smm->configured_event_queue_length;
808
809       smm->vpp_event_queues[thread_index] =
810         unix_shared_memory_queue_init
811         (event_queue_length,
812          sizeof (session_fifo_event_t), 0 /* consumer pid */ ,
813          0 /* (do not) send signal when queue non-empty */ );
814
815       svm_pop_heap (oldheap);
816     }
817 }
818
819 session_type_t
820 session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4)
821 {
822   if (proto == TRANSPORT_PROTO_TCP)
823     {
824       if (is_ip4)
825         return SESSION_TYPE_IP4_TCP;
826       else
827         return SESSION_TYPE_IP6_TCP;
828     }
829   else
830     {
831       if (is_ip4)
832         return SESSION_TYPE_IP4_UDP;
833       else
834         return SESSION_TYPE_IP6_UDP;
835     }
836
837   return SESSION_N_TYPES;
838 }
839
840 static clib_error_t *
841 session_manager_main_enable (vlib_main_t * vm)
842 {
843   session_manager_main_t *smm = &session_manager_main;
844   vlib_thread_main_t *vtm = vlib_get_thread_main ();
845   u32 num_threads;
846   u32 preallocated_sessions_per_worker;
847   int i;
848
849   num_threads = 1 /* main thread */  + vtm->n_threads;
850
851   if (num_threads < 1)
852     return clib_error_return (0, "n_thread_stacks not set");
853
854   /* $$$ config parameters */
855   svm_fifo_segment_init (0x200000000ULL /* first segment base VA */ ,
856                          20 /* timeout in seconds */ );
857
858   /* configure per-thread ** vectors */
859   vec_validate (smm->sessions, num_threads - 1);
860   vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
861   vec_validate (smm->tx_buffers, num_threads - 1);
862   vec_validate (smm->pending_event_vector, num_threads - 1);
863   vec_validate (smm->free_event_vector, num_threads - 1);
864   vec_validate (smm->current_enqueue_epoch, num_threads - 1);
865   vec_validate (smm->vpp_event_queues, num_threads - 1);
866
867   for (i = 0; i < num_threads; i++)
868     {
869       vec_validate (smm->free_event_vector[i], 0);
870       _vec_len (smm->free_event_vector[i]) = 0;
871       vec_validate (smm->pending_event_vector[i], 0);
872       _vec_len (smm->pending_event_vector[i]) = 0;
873     }
874
875 #if SESSION_DBG
876   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
877 #endif
878
879   /* Allocate vpp event queues */
880   for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
881     session_vpp_event_queue_allocate (smm, i);
882
883   /* Preallocate sessions */
884   if (smm->preallocated_sessions)
885     {
886       if (num_threads == 1)
887         {
888           pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
889         }
890       else
891         {
892           int j;
893           preallocated_sessions_per_worker =
894             (1.1 * (f64) smm->preallocated_sessions /
895              (f64) (num_threads - 1));
896
897           for (j = 1; j < num_threads; j++)
898             {
899               pool_init_fixed (smm->sessions[j],
900                                preallocated_sessions_per_worker);
901             }
902         }
903     }
904
905   session_lookup_init ();
906
907   smm->is_enabled = 1;
908
909   /* Enable TCP transport */
910   vnet_tcp_enable_disable (vm, 1);
911
912   return 0;
913 }
914
915 void
916 session_node_enable_disable (u8 is_en)
917 {
918   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
919   /* *INDENT-OFF* */
920   foreach_vlib_main (({
921     vlib_node_set_state (this_vlib_main, session_queue_node.index,
922                          state);
923   }));
924   /* *INDENT-ON* */
925 }
926
927 clib_error_t *
928 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
929 {
930   if (is_en)
931     {
932       if (session_manager_main.is_enabled)
933         return 0;
934
935       session_node_enable_disable (is_en);
936
937       return session_manager_main_enable (vm);
938     }
939   else
940     {
941       session_manager_main.is_enabled = 0;
942       session_node_enable_disable (is_en);
943     }
944
945   return 0;
946 }
947
948 clib_error_t *
949 session_manager_main_init (vlib_main_t * vm)
950 {
951   session_manager_main_t *smm = &session_manager_main;
952   smm->is_enabled = 0;
953   return 0;
954 }
955
956 VLIB_INIT_FUNCTION (session_manager_main_init);
957
958 static clib_error_t *
959 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
960 {
961   session_manager_main_t *smm = &session_manager_main;
962   u32 nitems;
963   uword tmp;
964
965   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
966     {
967       if (unformat (input, "event-queue-length %d", &nitems))
968         {
969           if (nitems >= 2048)
970             smm->configured_event_queue_length = nitems;
971           else
972             clib_warning ("event queue length %d too small, ignored", nitems);
973         }
974       else if (unformat (input, "preallocated-sessions %d",
975                          &smm->preallocated_sessions))
976         ;
977       else if (unformat (input, "v4-session-table-buckets %d",
978                          &smm->configured_v4_session_table_buckets))
979         ;
980       else if (unformat (input, "v4-halfopen-table-buckets %d",
981                          &smm->configured_v4_halfopen_table_buckets))
982         ;
983       else if (unformat (input, "v6-session-table-buckets %d",
984                          &smm->configured_v6_session_table_buckets))
985         ;
986       else if (unformat (input, "v6-halfopen-table-buckets %d",
987                          &smm->configured_v6_halfopen_table_buckets))
988         ;
989       else if (unformat (input, "v4-session-table-memory %U",
990                          unformat_memory_size, &tmp))
991         {
992           if (tmp >= 0x100000000)
993             return clib_error_return (0, "memory size %llx (%lld) too large",
994                                       tmp, tmp);
995           smm->configured_v4_session_table_memory = tmp;
996         }
997       else if (unformat (input, "v4-halfopen-table-memory %U",
998                          unformat_memory_size, &tmp))
999         {
1000           if (tmp >= 0x100000000)
1001             return clib_error_return (0, "memory size %llx (%lld) too large",
1002                                       tmp, tmp);
1003           smm->configured_v4_halfopen_table_memory = tmp;
1004         }
1005       else if (unformat (input, "v6-session-table-memory %U",
1006                          unformat_memory_size, &tmp))
1007         {
1008           if (tmp >= 0x100000000)
1009             return clib_error_return (0, "memory size %llx (%lld) too large",
1010                                       tmp, tmp);
1011           smm->configured_v6_session_table_memory = tmp;
1012         }
1013       else if (unformat (input, "v6-halfopen-table-memory %U",
1014                          unformat_memory_size, &tmp))
1015         {
1016           if (tmp >= 0x100000000)
1017             return clib_error_return (0, "memory size %llx (%lld) too large",
1018                                       tmp, tmp);
1019           smm->configured_v6_halfopen_table_memory = tmp;
1020         }
1021       else
1022         return clib_error_return (0, "unknown input `%U'",
1023                                   format_unformat_error, input);
1024     }
1025   return 0;
1026 }
1027
1028 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1029
1030 /*
1031  * fd.io coding-style-patch-verification: ON
1032  *
1033  * Local Variables:
1034  * eval: (c-set-style "gnu")
1035  * End:
1036  */