session: segment manager improvements
[vpp.git] / src / vnet / session / session.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Session and session manager
18  */
19
20 #include <vnet/session/session.h>
21 #include <vnet/session/session_debug.h>
22 #include <vnet/session/application.h>
23 #include <vlibmemory/api.h>
24 #include <vnet/dpo/load_balance.h>
25 #include <vnet/fib/ip4_fib.h>
26 #include <vnet/tcp/tcp.h>
27
28 session_manager_main_t session_manager_main;
29 extern transport_proto_vft_t *tp_vfts;
30
31 int
32 stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
33                          u8 alloc_fifos, stream_session_t ** ret_s)
34 {
35   session_manager_main_t *smm = &session_manager_main;
36   svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
37   u32 fifo_segment_index;
38   u32 pool_index;
39   stream_session_t *s;
40   u64 value;
41   u32 thread_index = tc->thread_index;
42   int rv;
43
44   ASSERT (thread_index == vlib_get_thread_index ());
45
46   /* Create the session */
47   pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
48   memset (s, 0, sizeof (*s));
49   pool_index = s - smm->sessions[thread_index];
50
51   /* Allocate fifos */
52   if (alloc_fifos)
53     {
54       if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
55                                                      &server_tx_fifo,
56                                                      &fifo_segment_index)))
57         {
58           pool_put (smm->sessions[thread_index], s);
59           return rv;
60         }
61       /* Initialize backpointers */
62       server_rx_fifo->master_session_index = pool_index;
63       server_rx_fifo->master_thread_index = thread_index;
64
65       server_tx_fifo->master_session_index = pool_index;
66       server_tx_fifo->master_thread_index = thread_index;
67
68       s->server_rx_fifo = server_rx_fifo;
69       s->server_tx_fifo = server_tx_fifo;
70       s->svm_segment_index = fifo_segment_index;
71     }
72
73   /* Initialize state machine, such as it is... */
74   s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
75                                                     tc->is_ip4);
76   s->session_state = SESSION_STATE_CONNECTING;
77   s->thread_index = thread_index;
78   s->session_index = pool_index;
79
80   /* Attach transport to session */
81   s->connection_index = tc->c_index;
82
83   /* Attach session to transport */
84   tc->s_index = s->session_index;
85
86   /* Add to the main lookup table */
87   value = stream_session_handle (s);
88   stream_session_table_add_for_tc (tc, value);
89
90   *ret_s = s;
91
92   return 0;
93 }
94
95 /**
96  * Discards bytes from buffer chain
97  *
98  * It discards n_bytes_to_drop starting at first buffer after chain_b
99  */
100 always_inline void
101 session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
102                                      vlib_buffer_t ** chain_b,
103                                      u32 n_bytes_to_drop)
104 {
105   vlib_buffer_t *next = *chain_b;
106   u32 to_drop = n_bytes_to_drop;
107   ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
108   while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
109     {
110       next = vlib_get_buffer (vm, next->next_buffer);
111       if (next->current_length > to_drop)
112         {
113           vlib_buffer_advance (next, to_drop);
114           to_drop = 0;
115         }
116       else
117         {
118           to_drop -= next->current_length;
119           next->current_length = 0;
120         }
121     }
122   *chain_b = next;
123
124   if (to_drop == 0)
125     b->total_length_not_including_first_buffer -= n_bytes_to_drop;
126 }
127
128 /**
129  * Enqueue buffer chain tail
130  */
131 always_inline int
132 session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
133                             u32 offset, u8 is_in_order)
134 {
135   vlib_buffer_t *chain_b;
136   u32 chain_bi, len, diff;
137   vlib_main_t *vm = vlib_get_main ();
138   u8 *data;
139   u32 written = 0;
140   int rv = 0;
141
142   if (is_in_order && offset)
143     {
144       diff = offset - b->current_length;
145       if (diff > b->total_length_not_including_first_buffer)
146         return 0;
147       chain_b = b;
148       session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
149       chain_bi = vlib_get_buffer_index (vm, chain_b);
150     }
151   else
152     chain_bi = b->next_buffer;
153
154   do
155     {
156       chain_b = vlib_get_buffer (vm, chain_bi);
157       data = vlib_buffer_get_current (chain_b);
158       len = chain_b->current_length;
159       if (!len)
160         continue;
161       if (is_in_order)
162         {
163           rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
164           if (rv == len)
165             {
166               written += rv;
167             }
168           else if (rv < len)
169             {
170               return (rv > 0) ? (written + rv) : written;
171             }
172           else if (rv > len)
173             {
174               written += rv;
175
176               /* written more than what was left in chain */
177               if (written > b->total_length_not_including_first_buffer)
178                 return written;
179
180               /* drop the bytes that have already been delivered */
181               session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
182             }
183         }
184       else
185         {
186           rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
187                                              data);
188           if (rv)
189             {
190               clib_warning ("failed to enqueue multi-buffer seg");
191               return -1;
192             }
193           offset += len;
194         }
195     }
196   while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
197           ? chain_b->next_buffer : 0));
198
199   if (is_in_order)
200     return written;
201
202   return 0;
203 }
204
205 /*
206  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
207  * event but on request can queue notification events for later delivery by
208  * calling stream_server_flush_enqueue_events().
209  *
210  * @param tc Transport connection which is to be enqueued data
211  * @param b Buffer to be enqueued
212  * @param offset Offset at which to start enqueueing if out-of-order
213  * @param queue_event Flag to indicate if peer is to be notified or if event
214  *                    is to be queued. The former is useful when more data is
215  *                    enqueued and only one event is to be generated.
216  * @param is_in_order Flag to indicate if data is in order
217  * @return Number of bytes enqueued or a negative value if enqueueing failed.
218  */
219 int
220 stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
221                              u32 offset, u8 queue_event, u8 is_in_order)
222 {
223   stream_session_t *s;
224   int enqueued = 0, rv, in_order_off;
225
226   s = stream_session_get (tc->s_index, tc->thread_index);
227
228   if (is_in_order)
229     {
230       enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
231                                           b->current_length,
232                                           vlib_buffer_get_current (b));
233       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
234                          && enqueued >= 0))
235         {
236           in_order_off = enqueued > b->current_length ? enqueued : 0;
237           rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
238           if (rv > 0)
239             enqueued += rv;
240         }
241     }
242   else
243     {
244       rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
245                                          b->current_length,
246                                          vlib_buffer_get_current (b));
247       if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
248         session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
249       /* if something was enqueued, report even this as success for ooo
250        * segment handling */
251       return rv;
252     }
253
254   if (queue_event)
255     {
256       /* Queue RX event on this fifo. Eventually these will need to be flushed
257        * by calling stream_server_flush_enqueue_events () */
258       session_manager_main_t *smm = vnet_get_session_manager_main ();
259       u32 thread_index = s->thread_index;
260       u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index];
261
262       if (s->enqueue_epoch != my_enqueue_epoch)
263         {
264           s->enqueue_epoch = my_enqueue_epoch;
265           vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index],
266                     s - smm->sessions[thread_index]);
267         }
268     }
269
270   if (is_in_order)
271     return enqueued;
272
273   return 0;
274 }
275
276 /** Check if we have space in rx fifo to push more bytes */
277 u8
278 stream_session_no_space (transport_connection_t * tc, u32 thread_index,
279                          u16 data_len)
280 {
281   stream_session_t *s = stream_session_get (tc->s_index, thread_index);
282
283   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
284     return 1;
285
286   if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo))
287     return 1;
288
289   return 0;
290 }
291
292 u32
293 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
294 {
295   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
296   if (!s->server_tx_fifo)
297     return 0;
298   return svm_fifo_max_dequeue (s->server_tx_fifo);
299 }
300
301 int
302 stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
303                            u32 offset, u32 max_bytes)
304 {
305   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
306   return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
307 }
308
309 u32
310 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
311 {
312   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
313   return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
314 }
315
316 /**
317  * Notify session peer that new data has been enqueued.
318  *
319  * @param s Stream session for which the event is to be generated.
320  * @param block Flag to indicate if call should block if event queue is full.
321  *
322  * @return 0 on succes or negative number if failed to send notification.
323  */
324 static int
325 stream_session_enqueue_notify (stream_session_t * s, u8 block)
326 {
327   application_t *app;
328   session_fifo_event_t evt;
329   unix_shared_memory_queue_t *q;
330   static u32 serial_number;
331
332   if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
333     return 0;
334
335   /* Get session's server */
336   app = application_get_if_valid (s->app_index);
337
338   if (PREDICT_FALSE (app == 0))
339     {
340       clib_warning ("invalid s->app_index = %d", s->app_index);
341       return 0;
342     }
343
344   /* Built-in server? Hand event to the callback... */
345   if (app->cb_fns.builtin_server_rx_callback)
346     return app->cb_fns.builtin_server_rx_callback (s);
347
348   /* If no event, send one */
349   if (svm_fifo_set_event (s->server_rx_fifo))
350     {
351       /* Fabricate event */
352       evt.fifo = s->server_rx_fifo;
353       evt.event_type = FIFO_EVENT_APP_RX;
354       evt.event_id = serial_number++;
355
356       /* Add event to server's event queue */
357       q = app->event_queue;
358
359       /* Based on request block (or not) for lack of space */
360       if (block || PREDICT_TRUE (q->cursize < q->maxsize))
361         unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
362                                       0 /* do wait for mutex */ );
363       else
364         {
365           clib_warning ("fifo full");
366           return -1;
367         }
368     }
369
370   /* *INDENT-OFF* */
371   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
372       ed->data[0] = evt.event_id;
373       ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
374   }));
375   /* *INDENT-ON* */
376
377   return 0;
378 }
379
380 /**
381  * Flushes queue of sessions that are to be notified of new data
382  * enqueued events.
383  *
384  * @param thread_index Thread index for which the flush is to be performed.
385  * @return 0 on success or a positive number indicating the number of
386  *         failures due to API queue being full.
387  */
388 int
389 session_manager_flush_enqueue_events (u32 thread_index)
390 {
391   session_manager_main_t *smm = &session_manager_main;
392   u32 *session_indices_to_enqueue;
393   int i, errors = 0;
394
395   session_indices_to_enqueue =
396     smm->session_indices_to_enqueue_by_thread[thread_index];
397
398   for (i = 0; i < vec_len (session_indices_to_enqueue); i++)
399     {
400       stream_session_t *s0;
401
402       /* Get session */
403       s0 = stream_session_get_if_valid (session_indices_to_enqueue[i],
404                                         thread_index);
405       if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ ))
406         {
407           errors++;
408         }
409     }
410
411   vec_reset_length (session_indices_to_enqueue);
412
413   smm->session_indices_to_enqueue_by_thread[thread_index] =
414     session_indices_to_enqueue;
415
416   /* Increment enqueue epoch for next round */
417   smm->current_enqueue_epoch[thread_index]++;
418
419   return errors;
420 }
421
422 /**
423  * Init fifo tail and head pointers
424  *
425  * Useful if transport uses absolute offsets for tracking ooo segments.
426  */
427 void
428 stream_session_init_fifos_pointers (transport_connection_t * tc,
429                                     u32 rx_pointer, u32 tx_pointer)
430 {
431   stream_session_t *s;
432   s = stream_session_get (tc->s_index, tc->thread_index);
433   svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
434   svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
435 }
436
437 int
438 stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
439 {
440   application_t *app;
441   stream_session_t *new_s = 0;
442   u64 handle;
443   u32 opaque = 0;
444   int error = 0;
445   u8 st;
446
447   st = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4);
448   handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
449                                                    tc->lcl_port, tc->rmt_port,
450                                                    st);
451   if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
452     {
453       clib_warning ("This can't be good!");
454       return -1;
455     }
456
457   /* Get the app's index from the handle we stored when opening connection
458    * and the opaque (api_context for external apps) from transport session
459    * index*/
460   app = application_get_if_valid (handle >> 32);
461   if (!app)
462     return -1;
463
464   opaque = tc->s_index;
465
466   if (!is_fail)
467     {
468       segment_manager_t *sm;
469       u8 alloc_fifos;
470       sm = application_get_connect_segment_manager (app);
471       alloc_fifos = application_is_proxy (app);
472       /* Create new session (svm segments are allocated if needed) */
473       if (stream_session_create_i (sm, tc, alloc_fifos, &new_s))
474         {
475           is_fail = 1;
476           error = -1;
477         }
478       else
479         new_s->app_index = app->index;
480     }
481
482   /* Notify client application */
483   if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
484                                               is_fail))
485     {
486       clib_warning ("failed to notify app");
487       if (!is_fail)
488         stream_session_disconnect (new_s);
489     }
490   else
491     {
492       if (!is_fail)
493         new_s->session_state = SESSION_STATE_READY;
494     }
495
496   /* Cleanup session lookup */
497   stream_session_half_open_table_del (tc);
498
499   return error;
500 }
501
502 void
503 stream_session_accept_notify (transport_connection_t * tc)
504 {
505   application_t *server;
506   stream_session_t *s;
507
508   s = stream_session_get (tc->s_index, tc->thread_index);
509   server = application_get (s->app_index);
510   server->cb_fns.session_accept_callback (s);
511 }
512
513 /**
514  * Notification from transport that connection is being closed.
515  *
516  * A disconnect is sent to application but state is not removed. Once
517  * disconnect is acknowledged by application, session disconnect is called.
518  * Ultimately this leads to close being called on transport (passive close).
519  */
520 void
521 stream_session_disconnect_notify (transport_connection_t * tc)
522 {
523   application_t *server;
524   stream_session_t *s;
525
526   s = stream_session_get (tc->s_index, tc->thread_index);
527   server = application_get (s->app_index);
528   server->cb_fns.session_disconnect_callback (s);
529 }
530
531 /**
532  * Cleans up session and associated app if needed.
533  */
534 void
535 stream_session_delete (stream_session_t * s)
536 {
537   session_manager_main_t *smm = vnet_get_session_manager_main ();
538   int rv;
539
540   /* Delete from the main lookup table. */
541   if ((rv = stream_session_table_del (s)))
542     clib_warning ("hash delete error, rv %d", rv);
543
544   /* Cleanup fifo segments */
545   segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
546                                  s->server_tx_fifo);
547
548   pool_put (smm->sessions[s->thread_index], s);
549   if (CLIB_DEBUG)
550     memset (s, 0xFA, sizeof (*s));
551 }
552
553 /**
554  * Notification from transport that connection is being deleted
555  *
556  * This should be called only on previously fully established sessions. For
557  * instance failed connects should call stream_session_connect_notify and
558  * indicate that the connect has failed.
559  */
560 void
561 stream_session_delete_notify (transport_connection_t * tc)
562 {
563   stream_session_t *s;
564
565   /* App might've been removed already */
566   s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
567   if (!s)
568     return;
569   stream_session_delete (s);
570 }
571
572 /**
573  * Notify application that connection has been reset.
574  */
575 void
576 stream_session_reset_notify (transport_connection_t * tc)
577 {
578   stream_session_t *s;
579   application_t *app;
580   s = stream_session_get (tc->s_index, tc->thread_index);
581
582   app = application_get (s->app_index);
583   app->cb_fns.session_reset_callback (s);
584 }
585
586 /**
587  * Accept a stream session. Optionally ping the server by callback.
588  */
589 int
590 stream_session_accept (transport_connection_t * tc, u32 listener_index,
591                        u8 sst, u8 notify)
592 {
593   application_t *server;
594   stream_session_t *s, *listener;
595   segment_manager_t *sm;
596
597   int rv;
598
599   /* Find the server */
600   listener = listen_session_get (sst, listener_index);
601   server = application_get (listener->app_index);
602
603   sm = application_get_listen_segment_manager (server, listener);
604   if ((rv = stream_session_create_i (sm, tc, 1, &s)))
605     return rv;
606
607   s->app_index = server->index;
608   s->listener_index = listener_index;
609   s->session_state = SESSION_STATE_ACCEPTING;
610
611   /* Shoulder-tap the server */
612   if (notify)
613     {
614       server->cb_fns.session_accept_callback (s);
615     }
616
617   return 0;
618 }
619
620 /**
621  * Ask transport to open connection to remote transport endpoint.
622  *
623  * Stores handle for matching request with reply since the call can be
624  * asynchronous. For instance, for TCP the 3-way handshake must complete
625  * before reply comes. Session is only created once connection is established.
626  *
627  * @param app_index Index of the application requesting the connect
628  * @param st Session type requested.
629  * @param tep Remote transport endpoint
630  * @param res Resulting transport connection .
631  */
632 int
633 stream_session_open (u32 app_index, session_type_t st,
634                      transport_endpoint_t * rmt,
635                      transport_connection_t ** res)
636 {
637   transport_connection_t *tc;
638   int rv;
639   u64 handle;
640
641   rv = tp_vfts[st].open (rmt);
642   if (rv < 0)
643     {
644       clib_warning ("Transport failed to open connection.");
645       return VNET_API_ERROR_SESSION_CONNECT_FAIL;
646     }
647
648   tc = tp_vfts[st].get_half_open ((u32) rv);
649
650   /* Save app and tc index. The latter is needed to help establish the
651    * connection while the former is needed when the connect notify comes
652    * and we have to notify the external app */
653   handle = (((u64) app_index) << 32) | (u64) tc->c_index;
654
655   /* Add to the half-open lookup table */
656   stream_session_half_open_table_add (tc, handle);
657
658   *res = tc;
659
660   return 0;
661 }
662
663 /**
664  * Ask transport to listen on local transport endpoint.
665  *
666  * @param s Session for which listen will be called. Note that unlike
667  *          established sessions, listen sessions are not associated to a
668  *          thread.
669  * @param tep Local endpoint to be listened on.
670  */
671 int
672 stream_session_listen (stream_session_t * s, transport_endpoint_t * tep)
673 {
674   transport_connection_t *tc;
675   u32 tci;
676
677   /* Transport bind/listen  */
678   tci = tp_vfts[s->session_type].bind (s->session_index, tep);
679
680   if (tci == (u32) ~ 0)
681     return -1;
682
683   /* Attach transport to session */
684   s->connection_index = tci;
685   tc = tp_vfts[s->session_type].get_listener (tci);
686
687   /* Weird but handle it ... */
688   if (tc == 0)
689     return -1;
690
691   /* Add to the main lookup table */
692   stream_session_table_add_for_tc (tc, s->session_index);
693
694   return 0;
695 }
696
697 /**
698  * Ask transport to stop listening on local transport endpoint.
699  *
700  * @param s Session to stop listening on. It must be in state LISTENING.
701  */
702 int
703 stream_session_stop_listen (stream_session_t * s)
704 {
705   transport_connection_t *tc;
706
707   if (s->session_state != SESSION_STATE_LISTENING)
708     {
709       clib_warning ("not a listening session");
710       return -1;
711     }
712
713   tc = tp_vfts[s->session_type].get_listener (s->connection_index);
714   if (!tc)
715     {
716       clib_warning ("no transport");
717       return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
718     }
719
720   stream_session_table_del_for_tc (tc);
721   tp_vfts[s->session_type].unbind (s->connection_index);
722   return 0;
723 }
724
725 void
726 session_send_session_evt_to_thread (u64 session_handle,
727                                     fifo_event_type_t evt_type,
728                                     u32 thread_index)
729 {
730   static u16 serial_number = 0;
731   session_fifo_event_t evt;
732   unix_shared_memory_queue_t *q;
733
734   /* Fabricate event */
735   evt.session_handle = session_handle;
736   evt.event_type = evt_type;
737   evt.event_id = serial_number++;
738
739   q = session_manager_get_vpp_event_queue (thread_index);
740
741   /* Based on request block (or not) for lack of space */
742   if (PREDICT_TRUE (q->cursize < q->maxsize))
743     {
744       if (unix_shared_memory_queue_add (q, (u8 *) & evt,
745                                         1 /* do wait for mutex */ ))
746         {
747           clib_warning ("failed to enqueue evt");
748         }
749     }
750   else
751     {
752       clib_warning ("queue full");
753       return;
754     }
755 }
756
757 /**
758  * Disconnect session and propagate to transport. This should eventually
759  * result in a delete notification that allows us to cleanup session state.
760  * Called for both active/passive disconnects.
761  *
762  * Should be called from the session's thread.
763  */
764 void
765 stream_session_disconnect (stream_session_t * s)
766 {
767   s->session_state = SESSION_STATE_CLOSED;
768   tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
769 }
770
771 /**
772  * Cleanup transport and session state.
773  *
774  * Notify transport of the cleanup, wait for a delete notify to actually
775  * remove the session state.
776  */
777 void
778 stream_session_cleanup (stream_session_t * s)
779 {
780   int rv;
781
782   s->session_state = SESSION_STATE_CLOSED;
783
784   /* Delete from the main lookup table to avoid more enqueues */
785   rv = stream_session_table_del (s);
786   if (rv)
787     clib_warning ("hash delete error, rv %d", rv);
788
789   tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
790 }
791
792 /**
793  * Allocate vpp event queue (once) per worker thread
794  */
795 void
796 session_vpp_event_queue_allocate (session_manager_main_t * smm,
797                                   u32 thread_index)
798 {
799   api_main_t *am = &api_main;
800   void *oldheap;
801   u32 event_queue_length = 2048;
802
803   if (smm->vpp_event_queues[thread_index] == 0)
804     {
805       /* Allocate event fifo in the /vpe-api shared-memory segment */
806       oldheap = svm_push_data_heap (am->vlib_rp);
807
808       if (smm->configured_event_queue_length)
809         event_queue_length = smm->configured_event_queue_length;
810
811       smm->vpp_event_queues[thread_index] =
812         unix_shared_memory_queue_init
813         (event_queue_length,
814          sizeof (session_fifo_event_t), 0 /* consumer pid */ ,
815          0 /* (do not) send signal when queue non-empty */ );
816
817       svm_pop_heap (oldheap);
818     }
819 }
820
821 session_type_t
822 session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4)
823 {
824   if (proto == TRANSPORT_PROTO_TCP)
825     {
826       if (is_ip4)
827         return SESSION_TYPE_IP4_TCP;
828       else
829         return SESSION_TYPE_IP6_TCP;
830     }
831   else
832     {
833       if (is_ip4)
834         return SESSION_TYPE_IP4_UDP;
835       else
836         return SESSION_TYPE_IP6_UDP;
837     }
838
839   return SESSION_N_TYPES;
840 }
841
842 static clib_error_t *
843 session_manager_main_enable (vlib_main_t * vm)
844 {
845   session_manager_main_t *smm = &session_manager_main;
846   vlib_thread_main_t *vtm = vlib_get_thread_main ();
847   u32 num_threads;
848   u32 preallocated_sessions_per_worker;
849   int i;
850
851   num_threads = 1 /* main thread */  + vtm->n_threads;
852
853   if (num_threads < 1)
854     return clib_error_return (0, "n_thread_stacks not set");
855
856   /* $$$ config parameters */
857   svm_fifo_segment_init (0x200000000ULL /* first segment base VA */ ,
858                          20 /* timeout in seconds */ );
859
860   /* configure per-thread ** vectors */
861   vec_validate (smm->sessions, num_threads - 1);
862   vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
863   vec_validate (smm->tx_buffers, num_threads - 1);
864   vec_validate (smm->pending_event_vector, num_threads - 1);
865   vec_validate (smm->free_event_vector, num_threads - 1);
866   vec_validate (smm->current_enqueue_epoch, num_threads - 1);
867   vec_validate (smm->vpp_event_queues, num_threads - 1);
868
869   for (i = 0; i < num_threads; i++)
870     {
871       vec_validate (smm->free_event_vector[i], 0);
872       _vec_len (smm->free_event_vector[i]) = 0;
873       vec_validate (smm->pending_event_vector[i], 0);
874       _vec_len (smm->pending_event_vector[i]) = 0;
875     }
876
877 #if SESSION_DBG
878   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
879 #endif
880
881   /* Allocate vpp event queues */
882   for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
883     session_vpp_event_queue_allocate (smm, i);
884
885   /* Preallocate sessions */
886   if (num_threads == 1)
887     {
888       for (i = 0; i < smm->preallocated_sessions; i++)
889         {
890           stream_session_t *ss __attribute__ ((unused));
891           pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES);
892         }
893
894       for (i = 0; i < smm->preallocated_sessions; i++)
895         pool_put_index (smm->sessions[0], i);
896     }
897   else
898     {
899       int j;
900       preallocated_sessions_per_worker = smm->preallocated_sessions /
901         (num_threads - 1);
902
903       for (j = 1; j < num_threads; j++)
904         {
905           for (i = 0; i < preallocated_sessions_per_worker; i++)
906             {
907               stream_session_t *ss __attribute__ ((unused));
908               pool_get_aligned (smm->sessions[j], ss, CLIB_CACHE_LINE_BYTES);
909             }
910           for (i = 0; i < preallocated_sessions_per_worker; i++)
911             pool_put_index (smm->sessions[j], i);
912         }
913     }
914
915   session_lookup_init ();
916
917   smm->is_enabled = 1;
918
919   /* Enable TCP transport */
920   vnet_tcp_enable_disable (vm, 1);
921
922   return 0;
923 }
924
925 void
926 session_node_enable_disable (u8 is_en)
927 {
928   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
929   /* *INDENT-OFF* */
930   foreach_vlib_main (({
931     vlib_node_set_state (this_vlib_main, session_queue_node.index,
932                          state);
933   }));
934   /* *INDENT-ON* */
935 }
936
937 clib_error_t *
938 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
939 {
940   if (is_en)
941     {
942       if (session_manager_main.is_enabled)
943         return 0;
944
945       session_node_enable_disable (is_en);
946
947       return session_manager_main_enable (vm);
948     }
949   else
950     {
951       session_manager_main.is_enabled = 0;
952       session_node_enable_disable (is_en);
953     }
954
955   return 0;
956 }
957
958 clib_error_t *
959 session_manager_main_init (vlib_main_t * vm)
960 {
961   session_manager_main_t *smm = &session_manager_main;
962   smm->is_enabled = 0;
963   return 0;
964 }
965
966 VLIB_INIT_FUNCTION (session_manager_main_init);
967
968 static clib_error_t *
969 session_config_fn (vlib_main_t * vm, unformat_input_t * input)
970 {
971   session_manager_main_t *smm = &session_manager_main;
972   u32 nitems;
973   uword tmp;
974
975   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
976     {
977       if (unformat (input, "event-queue-length %d", &nitems))
978         {
979           if (nitems >= 2048)
980             smm->configured_event_queue_length = nitems;
981           else
982             clib_warning ("event queue length %d too small, ignored", nitems);
983         }
984       else if (unformat (input, "preallocated-sessions %d",
985                          &smm->preallocated_sessions))
986         ;
987       else if (unformat (input, "v4-session-table-buckets %d",
988                          &smm->configured_v4_session_table_buckets))
989         ;
990       else if (unformat (input, "v4-halfopen-table-buckets %d",
991                          &smm->configured_v4_halfopen_table_buckets))
992         ;
993       else if (unformat (input, "v6-session-table-buckets %d",
994                          &smm->configured_v6_session_table_buckets))
995         ;
996       else if (unformat (input, "v6-halfopen-table-buckets %d",
997                          &smm->configured_v6_halfopen_table_buckets))
998         ;
999       else if (unformat (input, "v4-session-table-memory %U",
1000                          unformat_memory_size, &tmp))
1001         {
1002           if (tmp >= 0x100000000)
1003             return clib_error_return (0, "memory size %llx (%lld) too large",
1004                                       tmp, tmp);
1005           smm->configured_v4_session_table_memory = tmp;
1006         }
1007       else if (unformat (input, "v4-halfopen-table-memory %U",
1008                          unformat_memory_size, &tmp))
1009         {
1010           if (tmp >= 0x100000000)
1011             return clib_error_return (0, "memory size %llx (%lld) too large",
1012                                       tmp, tmp);
1013           smm->configured_v4_halfopen_table_memory = tmp;
1014         }
1015       else if (unformat (input, "v6-session-table-memory %U",
1016                          unformat_memory_size, &tmp))
1017         {
1018           if (tmp >= 0x100000000)
1019             return clib_error_return (0, "memory size %llx (%lld) too large",
1020                                       tmp, tmp);
1021           smm->configured_v6_session_table_memory = tmp;
1022         }
1023       else if (unformat (input, "v6-halfopen-table-memory %U",
1024                          unformat_memory_size, &tmp))
1025         {
1026           if (tmp >= 0x100000000)
1027             return clib_error_return (0, "memory size %llx (%lld) too large",
1028                                       tmp, tmp);
1029           smm->configured_v6_halfopen_table_memory = tmp;
1030         }
1031       else
1032         return clib_error_return (0, "unknown input `%U'",
1033                                   format_unformat_error, input);
1034     }
1035   return 0;
1036 }
1037
1038 VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1039
1040 /*
1041  * fd.io coding-style-patch-verification: ON
1042  *
1043  * Local Variables:
1044  * eval: (c-set-style "gnu")
1045  * End:
1046  */