session: fix formatting of half open sessions
[vpp.git] / src / vnet / session / transport.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/transport.h>
17 #include <vnet/session/session.h>
18 #include <vnet/fib/fib.h>
19
20 /**
21  * Per-type vector of transport protocol virtual function tables
22  */
23 transport_proto_vft_t *tp_vfts;
24
25 typedef struct local_endpoint_
26 {
27   transport_endpoint_t ep;
28   transport_proto_t proto;
29   int refcnt;
30 } local_endpoint_t;
31
32 typedef struct transport_main_
33 {
34   transport_endpoint_table_t local_endpoints_table;
35   local_endpoint_t *local_endpoints;
36   u32 *lcl_endpts_freelist;
37   u32 port_allocator_seed;
38   u8 lcl_endpts_cleanup_pending;
39   clib_spinlock_t local_endpoints_lock;
40 } transport_main_t;
41
42 static transport_main_t tp_main;
43
44 u8 *
45 format_transport_proto (u8 * s, va_list * args)
46 {
47   u32 transport_proto = va_arg (*args, u32);
48
49   if (tp_vfts[transport_proto].transport_options.name)
50     s = format (s, "%s", tp_vfts[transport_proto].transport_options.name);
51   else
52     s = format (s, "n/a");
53
54   return s;
55 }
56
57 u8 *
58 format_transport_proto_short (u8 * s, va_list * args)
59 {
60   u32 transport_proto = va_arg (*args, u32);
61   char *short_name;
62
63   short_name = tp_vfts[transport_proto].transport_options.short_name;
64   if (short_name)
65     s = format (s, "%s", short_name);
66   else
67     s = format (s, "NA");
68
69   return s;
70 }
71
72 const char *transport_flags_str[] = {
73 #define _(sym, str) str,
74   foreach_transport_connection_flag
75 #undef _
76 };
77
78 u8 *
79 format_transport_flags (u8 *s, va_list *args)
80 {
81   transport_connection_flags_t flags;
82   int i, last = -1;
83
84   flags = va_arg (*args, transport_connection_flags_t);
85
86   for (i = 0; i < TRANSPORT_CONNECTION_N_FLAGS; i++)
87     if (flags & (1 << i))
88       last = i;
89
90   for (i = 0; i < last; i++)
91     {
92       if (flags & (1 << i))
93         s = format (s, "%s, ", transport_flags_str[i]);
94     }
95   if (last >= 0)
96     s = format (s, "%s", transport_flags_str[last]);
97
98   return s;
99 }
100
101 u8 *
102 format_transport_connection (u8 * s, va_list * args)
103 {
104   u32 transport_proto = va_arg (*args, u32);
105   u32 conn_index = va_arg (*args, u32);
106   u32 thread_index = va_arg (*args, u32);
107   u32 verbose = va_arg (*args, u32);
108   transport_proto_vft_t *tp_vft;
109   transport_connection_t *tc;
110   u32 indent;
111
112   tp_vft = transport_protocol_get_vft (transport_proto);
113   if (!tp_vft)
114     return s;
115
116   s = format (s, "%U", tp_vft->format_connection, conn_index, thread_index,
117               verbose);
118   tc = tp_vft->get_connection (conn_index, thread_index);
119   if (tc && verbose > 1)
120     {
121       indent = format_get_indent (s) + 1;
122       if (transport_connection_is_tx_paced (tc))
123         s = format (s, "%Upacer: %U\n", format_white_space, indent,
124                     format_transport_pacer, &tc->pacer, tc->thread_index);
125       s = format (s, "%Utransport: flags: %U\n", format_white_space, indent,
126                   format_transport_flags, tc->flags);
127     }
128   return s;
129 }
130
131 u8 *
132 format_transport_listen_connection (u8 * s, va_list * args)
133 {
134   u32 transport_proto = va_arg (*args, u32);
135   transport_proto_vft_t *tp_vft;
136
137   tp_vft = transport_protocol_get_vft (transport_proto);
138   if (!tp_vft)
139     return s;
140
141   s = (tp_vft->format_listener) (s, args);
142   return s;
143 }
144
145 u8 *
146 format_transport_half_open_connection (u8 * s, va_list * args)
147 {
148   u32 transport_proto = va_arg (*args, u32);
149   transport_proto_vft_t *tp_vft;
150
151   tp_vft = transport_protocol_get_vft (transport_proto);
152   if (!tp_vft)
153     return s;
154
155   s = (tp_vft->format_half_open) (s, args);
156   return s;
157 }
158
159 static u8
160 unformat_transport_str_match (unformat_input_t * input, const char *str)
161 {
162   int i;
163
164   if (strlen (str) > vec_len (input->buffer) - input->index)
165     return 0;
166
167   for (i = 0; i < strlen (str); i++)
168     {
169       if (input->buffer[i + input->index] != str[i])
170         return 0;
171     }
172   return 1;
173 }
174
175 uword
176 unformat_transport_proto (unformat_input_t * input, va_list * args)
177 {
178   u32 *proto = va_arg (*args, u32 *);
179   transport_proto_vft_t *tp_vft;
180   u8 longest_match = 0, match;
181   char *str, *str_match = 0;
182   transport_proto_t tp;
183
184   for (tp = 0; tp < vec_len (tp_vfts); tp++)
185     {
186       tp_vft = &tp_vfts[tp];
187       str = tp_vft->transport_options.name;
188       if (!str)
189         continue;
190       if (unformat_transport_str_match (input, str))
191         {
192           match = strlen (str);
193           if (match > longest_match)
194             {
195               *proto = tp;
196               longest_match = match;
197               str_match = str;
198             }
199         }
200     }
201   if (longest_match)
202     {
203       (void) unformat (input, str_match);
204       return 1;
205     }
206
207   return 0;
208 }
209
210 u8 *
211 format_transport_protos (u8 * s, va_list * args)
212 {
213   transport_proto_vft_t *tp_vft;
214
215   vec_foreach (tp_vft, tp_vfts)
216     s = format (s, "%s\n", tp_vft->transport_options.name);
217
218   return s;
219 }
220
221 u32
222 transport_endpoint_lookup (transport_endpoint_table_t * ht, u8 proto,
223                            ip46_address_t * ip, u16 port)
224 {
225   clib_bihash_kv_24_8_t kv;
226   int rv;
227
228   kv.key[0] = ip->as_u64[0];
229   kv.key[1] = ip->as_u64[1];
230   kv.key[2] = (u64) port << 8 | (u64) proto;
231
232   rv = clib_bihash_search_inline_24_8 (ht, &kv);
233   if (rv == 0)
234     return kv.value;
235
236   return ENDPOINT_INVALID_INDEX;
237 }
238
239 void
240 transport_endpoint_table_add (transport_endpoint_table_t * ht, u8 proto,
241                               transport_endpoint_t * te, u32 value)
242 {
243   clib_bihash_kv_24_8_t kv;
244
245   kv.key[0] = te->ip.as_u64[0];
246   kv.key[1] = te->ip.as_u64[1];
247   kv.key[2] = (u64) te->port << 8 | (u64) proto;
248   kv.value = value;
249
250   clib_bihash_add_del_24_8 (ht, &kv, 1);
251 }
252
253 void
254 transport_endpoint_table_del (transport_endpoint_table_t * ht, u8 proto,
255                               transport_endpoint_t * te)
256 {
257   clib_bihash_kv_24_8_t kv;
258
259   kv.key[0] = te->ip.as_u64[0];
260   kv.key[1] = te->ip.as_u64[1];
261   kv.key[2] = (u64) te->port << 8 | (u64) proto;
262
263   clib_bihash_add_del_24_8 (ht, &kv, 0);
264 }
265
266 void
267 transport_register_protocol (transport_proto_t transport_proto,
268                              const transport_proto_vft_t * vft,
269                              fib_protocol_t fib_proto, u32 output_node)
270 {
271   u8 is_ip4 = fib_proto == FIB_PROTOCOL_IP4;
272
273   vec_validate (tp_vfts, transport_proto);
274   tp_vfts[transport_proto] = *vft;
275
276   session_register_transport (transport_proto, vft, is_ip4, output_node);
277 }
278
279 transport_proto_t
280 transport_register_new_protocol (const transport_proto_vft_t * vft,
281                                  fib_protocol_t fib_proto, u32 output_node)
282 {
283   transport_proto_t transport_proto;
284   u8 is_ip4;
285
286   transport_proto = session_add_transport_proto ();
287   is_ip4 = fib_proto == FIB_PROTOCOL_IP4;
288
289   vec_validate (tp_vfts, transport_proto);
290   tp_vfts[transport_proto] = *vft;
291
292   session_register_transport (transport_proto, vft, is_ip4, output_node);
293
294   return transport_proto;
295 }
296
297 /**
298  * Get transport virtual function table
299  *
300  * @param type - session type (not protocol type)
301  */
302 transport_proto_vft_t *
303 transport_protocol_get_vft (transport_proto_t transport_proto)
304 {
305   if (transport_proto >= vec_len (tp_vfts))
306     return 0;
307   return &tp_vfts[transport_proto];
308 }
309
310 transport_service_type_t
311 transport_protocol_service_type (transport_proto_t tp)
312 {
313   return tp_vfts[tp].transport_options.service_type;
314 }
315
316 transport_tx_fn_type_t
317 transport_protocol_tx_fn_type (transport_proto_t tp)
318 {
319   return tp_vfts[tp].transport_options.tx_type;
320 }
321
322 void
323 transport_cleanup (transport_proto_t tp, u32 conn_index, u8 thread_index)
324 {
325   tp_vfts[tp].cleanup (conn_index, thread_index);
326 }
327
328 void
329 transport_cleanup_half_open (transport_proto_t tp, u32 conn_index)
330 {
331   if (tp_vfts[tp].cleanup_ho)
332     tp_vfts[tp].cleanup_ho (conn_index);
333 }
334
335 int
336 transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep)
337 {
338   if (PREDICT_FALSE (!tp_vfts[tp].connect))
339     return SESSION_E_TRANSPORT_NO_REG;
340   return tp_vfts[tp].connect (tep);
341 }
342
343 void
344 transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
345 {
346   if (tp_vfts[tp].half_close)
347     tp_vfts[tp].half_close (conn_index, thread_index);
348 }
349
350 void
351 transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
352 {
353   tp_vfts[tp].close (conn_index, thread_index);
354 }
355
356 void
357 transport_reset (transport_proto_t tp, u32 conn_index, u8 thread_index)
358 {
359   if (tp_vfts[tp].reset)
360     tp_vfts[tp].reset (conn_index, thread_index);
361   else
362     tp_vfts[tp].close (conn_index, thread_index);
363 }
364
365 u32
366 transport_start_listen (transport_proto_t tp, u32 session_index,
367                         transport_endpoint_cfg_t *tep)
368 {
369   if (PREDICT_FALSE (!tp_vfts[tp].start_listen))
370     return SESSION_E_TRANSPORT_NO_REG;
371   return tp_vfts[tp].start_listen (session_index, tep);
372 }
373
374 u32
375 transport_stop_listen (transport_proto_t tp, u32 conn_index)
376 {
377   return tp_vfts[tp].stop_listen (conn_index);
378 }
379
380 u8
381 transport_protocol_is_cl (transport_proto_t tp)
382 {
383   return (tp_vfts[tp].transport_options.service_type == TRANSPORT_SERVICE_CL);
384 }
385
386 always_inline void
387 default_get_transport_endpoint (transport_connection_t * tc,
388                                 transport_endpoint_t * tep, u8 is_lcl)
389 {
390   if (is_lcl)
391     {
392       tep->port = tc->lcl_port;
393       tep->is_ip4 = tc->is_ip4;
394       clib_memcpy_fast (&tep->ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
395     }
396   else
397     {
398       tep->port = tc->rmt_port;
399       tep->is_ip4 = tc->is_ip4;
400       clib_memcpy_fast (&tep->ip, &tc->rmt_ip, sizeof (tc->rmt_ip));
401     }
402 }
403
404 void
405 transport_get_endpoint (transport_proto_t tp, u32 conn_index,
406                         u32 thread_index, transport_endpoint_t * tep,
407                         u8 is_lcl)
408 {
409   if (tp_vfts[tp].get_transport_endpoint)
410     tp_vfts[tp].get_transport_endpoint (conn_index, thread_index, tep,
411                                         is_lcl);
412   else
413     {
414       transport_connection_t *tc;
415       tc = transport_get_connection (tp, conn_index, thread_index);
416       default_get_transport_endpoint (tc, tep, is_lcl);
417     }
418 }
419
420 void
421 transport_get_listener_endpoint (transport_proto_t tp, u32 conn_index,
422                                  transport_endpoint_t * tep, u8 is_lcl)
423 {
424   if (tp_vfts[tp].get_transport_listener_endpoint)
425     tp_vfts[tp].get_transport_listener_endpoint (conn_index, tep, is_lcl);
426   else
427     {
428       transport_connection_t *tc;
429       tc = transport_get_listener (tp, conn_index);
430       default_get_transport_endpoint (tc, tep, is_lcl);
431     }
432 }
433
434 int
435 transport_connection_attribute (transport_proto_t tp, u32 conn_index,
436                                 u8 thread_index, u8 is_get,
437                                 transport_endpt_attr_t *attr)
438 {
439   if (!tp_vfts[tp].attribute)
440     return -1;
441
442   return tp_vfts[tp].attribute (conn_index, thread_index, is_get, attr);
443 }
444
445 #define PORT_MASK ((1 << 16)- 1)
446
447 void
448 transport_endpoint_free (u32 tepi)
449 {
450   transport_main_t *tm = &tp_main;
451   pool_put_index (tm->local_endpoints, tepi);
452 }
453
454 always_inline local_endpoint_t *
455 transport_endpoint_alloc (void)
456 {
457   transport_main_t *tm = &tp_main;
458   local_endpoint_t *lep;
459
460   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
461
462   pool_get_aligned_safe (tm->local_endpoints, lep, 0);
463   return lep;
464 }
465
466 static void
467 transport_cleanup_freelist (void)
468 {
469   transport_main_t *tm = &tp_main;
470   local_endpoint_t *lep;
471   u32 *lep_indexp;
472
473   clib_spinlock_lock (&tm->local_endpoints_lock);
474
475   vec_foreach (lep_indexp, tm->lcl_endpts_freelist)
476     {
477       lep = pool_elt_at_index (tm->local_endpoints, *lep_indexp);
478
479       /* Port re-shared after attempt to cleanup */
480       if (lep->refcnt > 0)
481         continue;
482
483       transport_endpoint_table_del (&tm->local_endpoints_table, lep->proto,
484                                     &lep->ep);
485       transport_endpoint_free (*lep_indexp);
486     }
487
488   vec_reset_length (tm->lcl_endpts_freelist);
489
490   tm->lcl_endpts_cleanup_pending = 0;
491
492   clib_spinlock_unlock (&tm->local_endpoints_lock);
493 }
494
495 void
496 transport_program_endpoint_cleanup (u32 lepi)
497 {
498   transport_main_t *tm = &tp_main;
499   u8 flush_fl = 0;
500
501   /* All workers can free connections. Synchronize access to freelist */
502   clib_spinlock_lock (&tm->local_endpoints_lock);
503
504   vec_add1 (tm->lcl_endpts_freelist, lepi);
505
506   /* Avoid accumulating lots of endpoints for cleanup */
507   if (!tm->lcl_endpts_cleanup_pending &&
508       vec_len (tm->lcl_endpts_freelist) > 32)
509     {
510       tm->lcl_endpts_cleanup_pending = 1;
511       flush_fl = 1;
512     }
513
514   clib_spinlock_unlock (&tm->local_endpoints_lock);
515
516   if (flush_fl)
517     session_send_rpc_evt_to_thread_force (transport_cl_thread (),
518                                           transport_cleanup_freelist, 0);
519 }
520
521 int
522 transport_release_local_endpoint (u8 proto, ip46_address_t *lcl_ip, u16 port)
523 {
524   transport_main_t *tm = &tp_main;
525   local_endpoint_t *lep;
526   u32 lepi;
527
528   lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip,
529                                     clib_net_to_host_u16 (port));
530   if (lepi == ENDPOINT_INVALID_INDEX)
531     return -1;
532
533   lep = pool_elt_at_index (tm->local_endpoints, lepi);
534
535   /* Local endpoint no longer in use, program cleanup */
536   if (!clib_atomic_sub_fetch (&lep->refcnt, 1))
537     {
538       transport_program_endpoint_cleanup (lepi);
539       return 0;
540     }
541
542   /* Not an error, just in idication that endpoint was not cleaned up */
543   return -1;
544 }
545
546 static int
547 transport_endpoint_mark_used (u8 proto, ip46_address_t *ip, u16 port)
548 {
549   transport_main_t *tm = &tp_main;
550   local_endpoint_t *lep;
551   u32 tei;
552
553   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
554
555   tei =
556     transport_endpoint_lookup (&tm->local_endpoints_table, proto, ip, port);
557   if (tei != ENDPOINT_INVALID_INDEX)
558     return SESSION_E_PORTINUSE;
559
560   /* Pool reallocs with worker barrier */
561   lep = transport_endpoint_alloc ();
562   clib_memcpy_fast (&lep->ep.ip, ip, sizeof (*ip));
563   lep->ep.port = port;
564   lep->proto = proto;
565   lep->refcnt = 1;
566
567   transport_endpoint_table_add (&tm->local_endpoints_table, proto, &lep->ep,
568                                 lep - tm->local_endpoints);
569
570   return 0;
571 }
572
573 void
574 transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port)
575 {
576   transport_main_t *tm = &tp_main;
577   local_endpoint_t *lep;
578   u32 lepi;
579
580   /* Active opens should call this only from a control thread, which are also
581    * used to allocate and free ports. So, pool has only one writer and
582    * potentially many readers. Listeners are allocated with barrier */
583   lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip,
584                                     clib_net_to_host_u16 (port));
585   if (lepi != ENDPOINT_INVALID_INDEX)
586     {
587       lep = pool_elt_at_index (tm->local_endpoints, lepi);
588       clib_atomic_add_fetch (&lep->refcnt, 1);
589     }
590 }
591
592 /**
593  * Allocate local port and add if successful add entry to local endpoint
594  * table to mark the pair as used.
595  */
596 int
597 transport_alloc_local_port (u8 proto, ip46_address_t *lcl_addr,
598                             transport_endpoint_cfg_t *rmt)
599 {
600   u16 min = 1024, max = 65535;  /* XXX configurable ? */
601   transport_main_t *tm = &tp_main;
602   int tries, limit;
603
604   limit = max - min;
605
606   /* Only support active opens from one of ctrl threads */
607   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
608
609   /* Search for first free slot */
610   for (tries = 0; tries < limit; tries++)
611     {
612       u16 port = 0;
613
614       /* Find a port in the specified range */
615       while (1)
616         {
617           port = random_u32 (&tm->port_allocator_seed) & PORT_MASK;
618           if (PREDICT_TRUE (port >= min && port < max))
619             break;
620         }
621
622       if (!transport_endpoint_mark_used (proto, lcl_addr, port))
623         return port;
624
625       /* IP:port pair already in use, check if 6-tuple available */
626       if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port,
627                                      rmt->port, proto, rmt->is_ip4))
628         continue;
629
630       /* 6-tuple is available so increment lcl endpoint refcount */
631       transport_share_local_endpoint (proto, lcl_addr, port);
632
633       return port;
634     }
635   return -1;
636 }
637
638 static session_error_t
639 transport_get_interface_ip (u32 sw_if_index, u8 is_ip4, ip46_address_t * addr)
640 {
641   if (is_ip4)
642     {
643       ip4_address_t *ip4;
644       ip4 = ip_interface_get_first_ip (sw_if_index, 1);
645       if (!ip4)
646         return SESSION_E_NOIP;
647       addr->ip4.as_u32 = ip4->as_u32;
648     }
649   else
650     {
651       ip6_address_t *ip6;
652       ip6 = ip_interface_get_first_ip (sw_if_index, 0);
653       if (ip6 == 0)
654         return SESSION_E_NOIP;
655       clib_memcpy_fast (&addr->ip6, ip6, sizeof (*ip6));
656     }
657   return 0;
658 }
659
660 static session_error_t
661 transport_find_local_ip_for_remote (u32 *sw_if_index,
662                                     transport_endpoint_t *rmt,
663                                     ip46_address_t *lcl_addr)
664 {
665   fib_node_index_t fei;
666   fib_prefix_t prefix;
667
668   if (*sw_if_index == ENDPOINT_INVALID_INDEX)
669     {
670       /* Find a FIB path to the destination */
671       clib_memcpy_fast (&prefix.fp_addr, &rmt->ip, sizeof (rmt->ip));
672       prefix.fp_proto = rmt->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6;
673       prefix.fp_len = rmt->is_ip4 ? 32 : 128;
674
675       ASSERT (rmt->fib_index != ENDPOINT_INVALID_INDEX);
676       fei = fib_table_lookup (rmt->fib_index, &prefix);
677
678       /* Couldn't find route to destination. Bail out. */
679       if (fei == FIB_NODE_INDEX_INVALID)
680         return SESSION_E_NOROUTE;
681
682       *sw_if_index = fib_entry_get_resolving_interface (fei);
683       if (*sw_if_index == ENDPOINT_INVALID_INDEX)
684         return SESSION_E_NOINTF;
685     }
686
687   clib_memset (lcl_addr, 0, sizeof (*lcl_addr));
688   return transport_get_interface_ip (*sw_if_index, rmt->is_ip4, lcl_addr);
689 }
690
691 int
692 transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg,
693                                 ip46_address_t * lcl_addr, u16 * lcl_port)
694 {
695   transport_endpoint_t *rmt = (transport_endpoint_t *) rmt_cfg;
696   transport_main_t *tm = &tp_main;
697   session_error_t error;
698   int port;
699
700   /*
701    * Find the local address
702    */
703   if (ip_is_zero (&rmt_cfg->peer.ip, rmt_cfg->peer.is_ip4))
704     {
705       error = transport_find_local_ip_for_remote (&rmt_cfg->peer.sw_if_index,
706                                                   rmt, lcl_addr);
707       if (error)
708         return error;
709     }
710   else
711     {
712       /* Assume session layer vetted this address */
713       clib_memcpy_fast (lcl_addr, &rmt_cfg->peer.ip,
714                         sizeof (rmt_cfg->peer.ip));
715     }
716
717   /* Cleanup freelist if need be */
718   if (vec_len (tm->lcl_endpts_freelist))
719     transport_cleanup_freelist ();
720
721   /*
722    * Allocate source port
723    */
724   if (rmt_cfg->peer.port == 0)
725     {
726       port = transport_alloc_local_port (proto, lcl_addr, rmt_cfg);
727       if (port < 1)
728         return SESSION_E_NOPORT;
729       *lcl_port = port;
730     }
731   else
732     {
733       port = clib_net_to_host_u16 (rmt_cfg->peer.port);
734       *lcl_port = port;
735
736       if (!transport_endpoint_mark_used (proto, lcl_addr, port))
737         return 0;
738
739       /* IP:port pair already in use, check if 6-tuple available */
740       if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port,
741                                      rmt->port, proto, rmt->is_ip4))
742         return SESSION_E_PORTINUSE;
743
744       /* 6-tuple is available so increment lcl endpoint refcount */
745       transport_share_local_endpoint (proto, lcl_addr, port);
746
747       return 0;
748     }
749
750   return 0;
751 }
752
753 u8 *
754 format_clib_us_time (u8 * s, va_list * args)
755 {
756   clib_us_time_t t = va_arg (*args, clib_us_time_t);
757   if (t < 1e3)
758     s = format (s, "%u us", t);
759   else
760     s = format (s, "%.3f s", (f64) t * CLIB_US_TIME_PERIOD);
761   return s;
762 }
763
764 u8 *
765 format_transport_pacer (u8 * s, va_list * args)
766 {
767   spacer_t *pacer = va_arg (*args, spacer_t *);
768   u32 thread_index = va_arg (*args, int);
769   clib_us_time_t now, diff;
770
771   now = transport_us_time_now (thread_index);
772   diff = now - pacer->last_update;
773   s = format (s, "rate %lu bucket %ld t/p %.3f last_update %U burst %u",
774               pacer->bytes_per_sec, pacer->bucket, pacer->tokens_per_period,
775               format_clib_us_time, diff, pacer->max_burst);
776   return s;
777 }
778
779 static inline u32
780 spacer_max_burst (spacer_t * pacer, clib_us_time_t time_now)
781 {
782   u64 n_periods = (time_now - pacer->last_update);
783   i64 inc;
784
785   if ((inc = (f32) n_periods * pacer->tokens_per_period) > 10)
786     {
787       pacer->last_update = time_now;
788       pacer->bucket = clib_min (pacer->bucket + inc, (i64) pacer->max_burst);
789     }
790
791   return pacer->bucket >= 0 ? pacer->max_burst : 0;
792 }
793
794 static inline void
795 spacer_update_bucket (spacer_t * pacer, u32 bytes)
796 {
797   pacer->bucket -= bytes;
798 }
799
800 static inline void
801 spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec,
802                       clib_us_time_t rtt, clib_time_type_t sec_per_loop)
803 {
804   clib_us_time_t max_time;
805
806   ASSERT (rate_bytes_per_sec != 0);
807   pacer->bytes_per_sec = rate_bytes_per_sec;
808   pacer->tokens_per_period = rate_bytes_per_sec * CLIB_US_TIME_PERIOD;
809
810   /* Allow a min number of bursts per rtt, if their size is acceptable. Goal
811    * is to spread the sending of data over the rtt but to also allow for some
812    * coalescing that can potentially
813    * 1) reduce load on session layer by reducing scheduling frequency for a
814    *    session and
815    * 2) optimize sending when tso if available
816    *
817    * Max "time-length" of a burst cannot be less than 1us or more than 1ms.
818    */
819   max_time = clib_max (rtt / TRANSPORT_PACER_BURSTS_PER_RTT,
820                        (clib_us_time_t) (sec_per_loop * CLIB_US_TIME_FREQ));
821   max_time = clib_clamp (max_time, 1 /* 1us */ , 1000 /* 1ms */ );
822   pacer->max_burst = (rate_bytes_per_sec * max_time) * CLIB_US_TIME_PERIOD;
823   pacer->max_burst = clib_clamp (pacer->max_burst, TRANSPORT_PACER_MIN_BURST,
824                                  TRANSPORT_PACER_MAX_BURST);
825 }
826
827 static inline u64
828 spacer_pace_rate (spacer_t * pacer)
829 {
830   return pacer->bytes_per_sec;
831 }
832
833 static inline void
834 spacer_reset (spacer_t * pacer, clib_us_time_t time_now, u64 bucket)
835 {
836   pacer->last_update = time_now;
837   pacer->bucket = bucket;
838 }
839
840 void
841 transport_connection_tx_pacer_reset (transport_connection_t * tc,
842                                      u64 rate_bytes_per_sec, u32 start_bucket,
843                                      clib_us_time_t rtt)
844 {
845   spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec, rtt,
846                         transport_seconds_per_loop (tc->thread_index));
847   spacer_reset (&tc->pacer, transport_us_time_now (tc->thread_index),
848                 start_bucket);
849 }
850
851 void
852 transport_connection_tx_pacer_reset_bucket (transport_connection_t * tc,
853                                             u32 bucket)
854 {
855   spacer_reset (&tc->pacer, transport_us_time_now (tc->thread_index), bucket);
856 }
857
858 void
859 transport_connection_tx_pacer_init (transport_connection_t * tc,
860                                     u64 rate_bytes_per_sec,
861                                     u32 initial_bucket)
862 {
863   tc->flags |= TRANSPORT_CONNECTION_F_IS_TX_PACED;
864   transport_connection_tx_pacer_reset (tc, rate_bytes_per_sec,
865                                        initial_bucket, 1e6);
866 }
867
868 void
869 transport_connection_tx_pacer_update (transport_connection_t * tc,
870                                       u64 bytes_per_sec, clib_us_time_t rtt)
871 {
872   spacer_set_pace_rate (&tc->pacer, bytes_per_sec, rtt,
873                         transport_seconds_per_loop (tc->thread_index));
874 }
875
876 u32
877 transport_connection_tx_pacer_burst (transport_connection_t * tc)
878 {
879   return spacer_max_burst (&tc->pacer,
880                            transport_us_time_now (tc->thread_index));
881 }
882
883 u64
884 transport_connection_tx_pacer_rate (transport_connection_t * tc)
885 {
886   return spacer_pace_rate (&tc->pacer);
887 }
888
889 void
890 transport_connection_update_tx_bytes (transport_connection_t * tc, u32 bytes)
891 {
892   if (transport_connection_is_tx_paced (tc))
893     spacer_update_bucket (&tc->pacer, bytes);
894 }
895
896 void
897 transport_connection_tx_pacer_update_bytes (transport_connection_t * tc,
898                                             u32 bytes)
899 {
900   spacer_update_bucket (&tc->pacer, bytes);
901 }
902
903 void
904 transport_update_pacer_time (u32 thread_index, clib_time_type_t now)
905 {
906   session_wrk_update_time (session_main_get_worker (thread_index), now);
907 }
908
909 void
910 transport_connection_reschedule (transport_connection_t * tc)
911 {
912   tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
913   transport_connection_tx_pacer_reset_bucket (tc, 0 /* bucket */);
914   if (transport_max_tx_dequeue (tc))
915     sesssion_reschedule_tx (tc);
916   else
917     {
918       session_t *s = session_get (tc->s_index, tc->thread_index);
919       svm_fifo_unset_event (s->tx_fifo);
920       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
921         if (svm_fifo_set_event (s->tx_fifo))
922           sesssion_reschedule_tx (tc);
923     }
924 }
925
926 void
927 transport_fifos_init_ooo (transport_connection_t * tc)
928 {
929   session_t *s = session_get (tc->s_index, tc->thread_index);
930   svm_fifo_init_ooo_lookup (s->rx_fifo, 0 /* ooo enq */ );
931   svm_fifo_init_ooo_lookup (s->tx_fifo, 1 /* ooo deq */ );
932 }
933
934 void
935 transport_update_time (clib_time_type_t time_now, u8 thread_index)
936 {
937   transport_proto_vft_t *vft;
938   vec_foreach (vft, tp_vfts)
939   {
940     if (vft->update_time)
941       (vft->update_time) (time_now, thread_index);
942   }
943 }
944
945 void
946 transport_enable_disable (vlib_main_t * vm, u8 is_en)
947 {
948   transport_proto_vft_t *vft;
949   vec_foreach (vft, tp_vfts)
950   {
951     if (vft->enable)
952       (vft->enable) (vm, is_en);
953
954     if (vft->update_time)
955       session_register_update_time_fn (vft->update_time, is_en);
956   }
957 }
958
959 void
960 transport_init (void)
961 {
962   vlib_thread_main_t *vtm = vlib_get_thread_main ();
963   session_main_t *smm = vnet_get_session_main ();
964   transport_main_t *tm = &tp_main;
965   u32 num_threads;
966
967   if (smm->local_endpoints_table_buckets == 0)
968     smm->local_endpoints_table_buckets = 250000;
969   if (smm->local_endpoints_table_memory == 0)
970     smm->local_endpoints_table_memory = 512 << 20;
971
972   /* Initialize [port-allocator] random number seed */
973   tm->port_allocator_seed = (u32) clib_cpu_time_now ();
974
975   clib_bihash_init_24_8 (&tm->local_endpoints_table, "local endpoints table",
976                          smm->local_endpoints_table_buckets,
977                          smm->local_endpoints_table_memory);
978   clib_spinlock_init (&tm->local_endpoints_lock);
979
980   num_threads = 1 /* main thread */  + vtm->n_threads;
981   if (num_threads > 1)
982     {
983       /* Main not polled if there are workers */
984       smm->transport_cl_thread = 1;
985     }
986 }
987
988 /*
989  * fd.io coding-style-patch-verification: ON
990  *
991  * Local Variables:
992  * eval: (c-set-style "gnu")
993  * End:
994  */