7e40164a9f87d88fa083c9ec5735d514d2d9864a
[vpp.git] / src / vnet / session / transport.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/transport.h>
17 #include <vnet/session/session.h>
18 #include <vnet/fib/fib.h>
19
20 /**
21  * Per-type vector of transport protocol virtual function tables
22  */
23 transport_proto_vft_t *tp_vfts;
24
25 typedef struct local_endpoint_
26 {
27   transport_endpoint_t ep;
28   transport_proto_t proto;
29   int refcnt;
30 } local_endpoint_t;
31
32 typedef struct transport_main_
33 {
34   transport_endpoint_table_t local_endpoints_table;
35   local_endpoint_t *local_endpoints;
36   u32 *lcl_endpts_freelist;
37   u32 port_allocator_seed;
38   u8 lcl_endpts_cleanup_pending;
39   clib_spinlock_t local_endpoints_lock;
40 } transport_main_t;
41
42 static transport_main_t tp_main;
43
44 u8 *
45 format_transport_proto (u8 * s, va_list * args)
46 {
47   u32 transport_proto = va_arg (*args, u32);
48
49   if (tp_vfts[transport_proto].transport_options.name)
50     s = format (s, "%s", tp_vfts[transport_proto].transport_options.name);
51   else
52     s = format (s, "n/a");
53
54   return s;
55 }
56
57 u8 *
58 format_transport_proto_short (u8 * s, va_list * args)
59 {
60   u32 transport_proto = va_arg (*args, u32);
61   char *short_name;
62
63   short_name = tp_vfts[transport_proto].transport_options.short_name;
64   if (short_name)
65     s = format (s, "%s", short_name);
66   else
67     s = format (s, "NA");
68
69   return s;
70 }
71
72 const char *transport_flags_str[] = {
73 #define _(sym, str) str,
74   foreach_transport_connection_flag
75 #undef _
76 };
77
78 u8 *
79 format_transport_flags (u8 *s, va_list *args)
80 {
81   transport_connection_flags_t flags;
82   int i, last = -1;
83
84   flags = va_arg (*args, transport_connection_flags_t);
85
86   for (i = 0; i < TRANSPORT_CONNECTION_N_FLAGS; i++)
87     if (flags & (1 << i))
88       last = i;
89
90   for (i = 0; i < last; i++)
91     {
92       if (flags & (1 << i))
93         s = format (s, "%s, ", transport_flags_str[i]);
94     }
95   if (last >= 0)
96     s = format (s, "%s", transport_flags_str[last]);
97
98   return s;
99 }
100
101 u8 *
102 format_transport_connection (u8 * s, va_list * args)
103 {
104   u32 transport_proto = va_arg (*args, u32);
105   u32 conn_index = va_arg (*args, u32);
106   u32 thread_index = va_arg (*args, u32);
107   u32 verbose = va_arg (*args, u32);
108   transport_proto_vft_t *tp_vft;
109   transport_connection_t *tc;
110   u32 indent;
111
112   tp_vft = transport_protocol_get_vft (transport_proto);
113   if (!tp_vft)
114     return s;
115
116   s = format (s, "%U", tp_vft->format_connection, conn_index, thread_index,
117               verbose);
118   tc = tp_vft->get_connection (conn_index, thread_index);
119   if (tc && verbose > 1)
120     {
121       indent = format_get_indent (s) + 1;
122       if (transport_connection_is_tx_paced (tc))
123         s = format (s, "%Upacer: %U\n", format_white_space, indent,
124                     format_transport_pacer, &tc->pacer, tc->thread_index);
125       s = format (s, "%Utransport: flags: %U\n", format_white_space, indent,
126                   format_transport_flags, tc->flags);
127     }
128   return s;
129 }
130
131 u8 *
132 format_transport_listen_connection (u8 * s, va_list * args)
133 {
134   u32 transport_proto = va_arg (*args, u32);
135   transport_proto_vft_t *tp_vft;
136
137   tp_vft = transport_protocol_get_vft (transport_proto);
138   if (!tp_vft)
139     return s;
140
141   s = (tp_vft->format_listener) (s, args);
142   return s;
143 }
144
145 u8 *
146 format_transport_half_open_connection (u8 * s, va_list * args)
147 {
148   u32 transport_proto = va_arg (*args, u32);
149   u32 ho_index = va_arg (*args, u32);
150   transport_proto_vft_t *tp_vft;
151
152   tp_vft = transport_protocol_get_vft (transport_proto);
153   if (!tp_vft)
154     return s;
155
156   s = format (s, "%U", tp_vft->format_half_open, ho_index);
157   return s;
158 }
159
160 static u8
161 unformat_transport_str_match (unformat_input_t * input, const char *str)
162 {
163   int i;
164
165   if (strlen (str) > vec_len (input->buffer) - input->index)
166     return 0;
167
168   for (i = 0; i < strlen (str); i++)
169     {
170       if (input->buffer[i + input->index] != str[i])
171         return 0;
172     }
173   return 1;
174 }
175
176 uword
177 unformat_transport_proto (unformat_input_t * input, va_list * args)
178 {
179   u32 *proto = va_arg (*args, u32 *);
180   transport_proto_vft_t *tp_vft;
181   u8 longest_match = 0, match;
182   char *str, *str_match = 0;
183   transport_proto_t tp;
184
185   for (tp = 0; tp < vec_len (tp_vfts); tp++)
186     {
187       tp_vft = &tp_vfts[tp];
188       str = tp_vft->transport_options.name;
189       if (!str)
190         continue;
191       if (unformat_transport_str_match (input, str))
192         {
193           match = strlen (str);
194           if (match > longest_match)
195             {
196               *proto = tp;
197               longest_match = match;
198               str_match = str;
199             }
200         }
201     }
202   if (longest_match)
203     {
204       (void) unformat (input, str_match);
205       return 1;
206     }
207
208   return 0;
209 }
210
211 u8 *
212 format_transport_protos (u8 * s, va_list * args)
213 {
214   transport_proto_vft_t *tp_vft;
215
216   vec_foreach (tp_vft, tp_vfts)
217     s = format (s, "%s\n", tp_vft->transport_options.name);
218
219   return s;
220 }
221
222 u32
223 transport_endpoint_lookup (transport_endpoint_table_t * ht, u8 proto,
224                            ip46_address_t * ip, u16 port)
225 {
226   clib_bihash_kv_24_8_t kv;
227   int rv;
228
229   kv.key[0] = ip->as_u64[0];
230   kv.key[1] = ip->as_u64[1];
231   kv.key[2] = (u64) port << 8 | (u64) proto;
232
233   rv = clib_bihash_search_inline_24_8 (ht, &kv);
234   if (rv == 0)
235     return kv.value;
236
237   return ENDPOINT_INVALID_INDEX;
238 }
239
240 void
241 transport_endpoint_table_add (transport_endpoint_table_t * ht, u8 proto,
242                               transport_endpoint_t * te, u32 value)
243 {
244   clib_bihash_kv_24_8_t kv;
245
246   kv.key[0] = te->ip.as_u64[0];
247   kv.key[1] = te->ip.as_u64[1];
248   kv.key[2] = (u64) te->port << 8 | (u64) proto;
249   kv.value = value;
250
251   clib_bihash_add_del_24_8 (ht, &kv, 1);
252 }
253
254 void
255 transport_endpoint_table_del (transport_endpoint_table_t * ht, u8 proto,
256                               transport_endpoint_t * te)
257 {
258   clib_bihash_kv_24_8_t kv;
259
260   kv.key[0] = te->ip.as_u64[0];
261   kv.key[1] = te->ip.as_u64[1];
262   kv.key[2] = (u64) te->port << 8 | (u64) proto;
263
264   clib_bihash_add_del_24_8 (ht, &kv, 0);
265 }
266
267 void
268 transport_register_protocol (transport_proto_t transport_proto,
269                              const transport_proto_vft_t * vft,
270                              fib_protocol_t fib_proto, u32 output_node)
271 {
272   u8 is_ip4 = fib_proto == FIB_PROTOCOL_IP4;
273
274   vec_validate (tp_vfts, transport_proto);
275   tp_vfts[transport_proto] = *vft;
276
277   session_register_transport (transport_proto, vft, is_ip4, output_node);
278 }
279
280 transport_proto_t
281 transport_register_new_protocol (const transport_proto_vft_t * vft,
282                                  fib_protocol_t fib_proto, u32 output_node)
283 {
284   transport_proto_t transport_proto;
285   u8 is_ip4;
286
287   transport_proto = session_add_transport_proto ();
288   is_ip4 = fib_proto == FIB_PROTOCOL_IP4;
289
290   vec_validate (tp_vfts, transport_proto);
291   tp_vfts[transport_proto] = *vft;
292
293   session_register_transport (transport_proto, vft, is_ip4, output_node);
294
295   return transport_proto;
296 }
297
298 /**
299  * Get transport virtual function table
300  *
301  * @param type - session type (not protocol type)
302  */
303 transport_proto_vft_t *
304 transport_protocol_get_vft (transport_proto_t transport_proto)
305 {
306   if (transport_proto >= vec_len (tp_vfts))
307     return 0;
308   return &tp_vfts[transport_proto];
309 }
310
311 transport_service_type_t
312 transport_protocol_service_type (transport_proto_t tp)
313 {
314   return tp_vfts[tp].transport_options.service_type;
315 }
316
317 transport_tx_fn_type_t
318 transport_protocol_tx_fn_type (transport_proto_t tp)
319 {
320   return tp_vfts[tp].transport_options.tx_type;
321 }
322
323 void
324 transport_cleanup (transport_proto_t tp, u32 conn_index, u8 thread_index)
325 {
326   tp_vfts[tp].cleanup (conn_index, thread_index);
327 }
328
329 void
330 transport_cleanup_half_open (transport_proto_t tp, u32 conn_index)
331 {
332   if (tp_vfts[tp].cleanup_ho)
333     tp_vfts[tp].cleanup_ho (conn_index);
334 }
335
336 int
337 transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep)
338 {
339   if (PREDICT_FALSE (!tp_vfts[tp].connect))
340     return SESSION_E_TRANSPORT_NO_REG;
341   return tp_vfts[tp].connect (tep);
342 }
343
344 void
345 transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
346 {
347   if (tp_vfts[tp].half_close)
348     tp_vfts[tp].half_close (conn_index, thread_index);
349 }
350
351 void
352 transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
353 {
354   tp_vfts[tp].close (conn_index, thread_index);
355 }
356
357 void
358 transport_reset (transport_proto_t tp, u32 conn_index, u8 thread_index)
359 {
360   if (tp_vfts[tp].reset)
361     tp_vfts[tp].reset (conn_index, thread_index);
362   else
363     tp_vfts[tp].close (conn_index, thread_index);
364 }
365
366 u32
367 transport_start_listen (transport_proto_t tp, u32 session_index,
368                         transport_endpoint_cfg_t *tep)
369 {
370   if (PREDICT_FALSE (!tp_vfts[tp].start_listen))
371     return SESSION_E_TRANSPORT_NO_REG;
372   return tp_vfts[tp].start_listen (session_index, tep);
373 }
374
375 u32
376 transport_stop_listen (transport_proto_t tp, u32 conn_index)
377 {
378   return tp_vfts[tp].stop_listen (conn_index);
379 }
380
381 u8
382 transport_protocol_is_cl (transport_proto_t tp)
383 {
384   return (tp_vfts[tp].transport_options.service_type == TRANSPORT_SERVICE_CL);
385 }
386
387 always_inline void
388 default_get_transport_endpoint (transport_connection_t * tc,
389                                 transport_endpoint_t * tep, u8 is_lcl)
390 {
391   if (is_lcl)
392     {
393       tep->port = tc->lcl_port;
394       tep->is_ip4 = tc->is_ip4;
395       clib_memcpy_fast (&tep->ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
396     }
397   else
398     {
399       tep->port = tc->rmt_port;
400       tep->is_ip4 = tc->is_ip4;
401       clib_memcpy_fast (&tep->ip, &tc->rmt_ip, sizeof (tc->rmt_ip));
402     }
403 }
404
405 void
406 transport_get_endpoint (transport_proto_t tp, u32 conn_index,
407                         u32 thread_index, transport_endpoint_t * tep,
408                         u8 is_lcl)
409 {
410   if (tp_vfts[tp].get_transport_endpoint)
411     tp_vfts[tp].get_transport_endpoint (conn_index, thread_index, tep,
412                                         is_lcl);
413   else
414     {
415       transport_connection_t *tc;
416       tc = transport_get_connection (tp, conn_index, thread_index);
417       default_get_transport_endpoint (tc, tep, is_lcl);
418     }
419 }
420
421 void
422 transport_get_listener_endpoint (transport_proto_t tp, u32 conn_index,
423                                  transport_endpoint_t * tep, u8 is_lcl)
424 {
425   if (tp_vfts[tp].get_transport_listener_endpoint)
426     tp_vfts[tp].get_transport_listener_endpoint (conn_index, tep, is_lcl);
427   else
428     {
429       transport_connection_t *tc;
430       tc = transport_get_listener (tp, conn_index);
431       default_get_transport_endpoint (tc, tep, is_lcl);
432     }
433 }
434
435 int
436 transport_connection_attribute (transport_proto_t tp, u32 conn_index,
437                                 u8 thread_index, u8 is_get,
438                                 transport_endpt_attr_t *attr)
439 {
440   if (!tp_vfts[tp].attribute)
441     return -1;
442
443   return tp_vfts[tp].attribute (conn_index, thread_index, is_get, attr);
444 }
445
446 #define PORT_MASK ((1 << 16)- 1)
447
448 void
449 transport_endpoint_free (u32 tepi)
450 {
451   transport_main_t *tm = &tp_main;
452   pool_put_index (tm->local_endpoints, tepi);
453 }
454
455 always_inline local_endpoint_t *
456 transport_endpoint_alloc (void)
457 {
458   transport_main_t *tm = &tp_main;
459   local_endpoint_t *lep;
460
461   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
462
463   pool_get_aligned_safe (tm->local_endpoints, lep, 0);
464   return lep;
465 }
466
467 static void
468 transport_cleanup_freelist (void)
469 {
470   transport_main_t *tm = &tp_main;
471   local_endpoint_t *lep;
472   u32 *lep_indexp;
473
474   clib_spinlock_lock (&tm->local_endpoints_lock);
475
476   vec_foreach (lep_indexp, tm->lcl_endpts_freelist)
477     {
478       lep = pool_elt_at_index (tm->local_endpoints, *lep_indexp);
479
480       /* Port re-shared after attempt to cleanup */
481       if (lep->refcnt > 0)
482         continue;
483
484       transport_endpoint_table_del (&tm->local_endpoints_table, lep->proto,
485                                     &lep->ep);
486       transport_endpoint_free (*lep_indexp);
487     }
488
489   vec_reset_length (tm->lcl_endpts_freelist);
490
491   tm->lcl_endpts_cleanup_pending = 0;
492
493   clib_spinlock_unlock (&tm->local_endpoints_lock);
494 }
495
496 void
497 transport_program_endpoint_cleanup (u32 lepi)
498 {
499   transport_main_t *tm = &tp_main;
500   u8 flush_fl = 0;
501
502   /* All workers can free connections. Synchronize access to freelist */
503   clib_spinlock_lock (&tm->local_endpoints_lock);
504
505   vec_add1 (tm->lcl_endpts_freelist, lepi);
506
507   /* Avoid accumulating lots of endpoints for cleanup */
508   if (!tm->lcl_endpts_cleanup_pending &&
509       vec_len (tm->lcl_endpts_freelist) > 32)
510     {
511       tm->lcl_endpts_cleanup_pending = 1;
512       flush_fl = 1;
513     }
514
515   clib_spinlock_unlock (&tm->local_endpoints_lock);
516
517   if (flush_fl)
518     session_send_rpc_evt_to_thread_force (transport_cl_thread (),
519                                           transport_cleanup_freelist, 0);
520 }
521
522 int
523 transport_release_local_endpoint (u8 proto, ip46_address_t *lcl_ip, u16 port)
524 {
525   transport_main_t *tm = &tp_main;
526   local_endpoint_t *lep;
527   u32 lepi;
528
529   lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip,
530                                     clib_net_to_host_u16 (port));
531   if (lepi == ENDPOINT_INVALID_INDEX)
532     return -1;
533
534   lep = pool_elt_at_index (tm->local_endpoints, lepi);
535
536   /* Local endpoint no longer in use, program cleanup */
537   if (!clib_atomic_sub_fetch (&lep->refcnt, 1))
538     {
539       transport_program_endpoint_cleanup (lepi);
540       return 0;
541     }
542
543   /* Not an error, just in idication that endpoint was not cleaned up */
544   return -1;
545 }
546
547 static int
548 transport_endpoint_mark_used (u8 proto, ip46_address_t *ip, u16 port)
549 {
550   transport_main_t *tm = &tp_main;
551   local_endpoint_t *lep;
552   u32 tei;
553
554   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
555
556   tei =
557     transport_endpoint_lookup (&tm->local_endpoints_table, proto, ip, port);
558   if (tei != ENDPOINT_INVALID_INDEX)
559     return SESSION_E_PORTINUSE;
560
561   /* Pool reallocs with worker barrier */
562   lep = transport_endpoint_alloc ();
563   clib_memcpy_fast (&lep->ep.ip, ip, sizeof (*ip));
564   lep->ep.port = port;
565   lep->proto = proto;
566   lep->refcnt = 1;
567
568   transport_endpoint_table_add (&tm->local_endpoints_table, proto, &lep->ep,
569                                 lep - tm->local_endpoints);
570
571   return 0;
572 }
573
574 void
575 transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port)
576 {
577   transport_main_t *tm = &tp_main;
578   local_endpoint_t *lep;
579   u32 lepi;
580
581   /* Active opens should call this only from a control thread, which are also
582    * used to allocate and free ports. So, pool has only one writer and
583    * potentially many readers. Listeners are allocated with barrier */
584   lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip,
585                                     clib_net_to_host_u16 (port));
586   if (lepi != ENDPOINT_INVALID_INDEX)
587     {
588       lep = pool_elt_at_index (tm->local_endpoints, lepi);
589       clib_atomic_add_fetch (&lep->refcnt, 1);
590     }
591 }
592
593 /**
594  * Allocate local port and add if successful add entry to local endpoint
595  * table to mark the pair as used.
596  */
597 int
598 transport_alloc_local_port (u8 proto, ip46_address_t *lcl_addr,
599                             transport_endpoint_cfg_t *rmt)
600 {
601   u16 min = 1024, max = 65535;  /* XXX configurable ? */
602   transport_main_t *tm = &tp_main;
603   int tries, limit;
604
605   limit = max - min;
606
607   /* Only support active opens from one of ctrl threads */
608   ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
609
610   /* Search for first free slot */
611   for (tries = 0; tries < limit; tries++)
612     {
613       u16 port = 0;
614
615       /* Find a port in the specified range */
616       while (1)
617         {
618           port = random_u32 (&tm->port_allocator_seed) & PORT_MASK;
619           if (PREDICT_TRUE (port >= min && port < max))
620             break;
621         }
622
623       if (!transport_endpoint_mark_used (proto, lcl_addr, port))
624         return port;
625
626       /* IP:port pair already in use, check if 6-tuple available */
627       if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port,
628                                      rmt->port, proto, rmt->is_ip4))
629         continue;
630
631       /* 6-tuple is available so increment lcl endpoint refcount */
632       transport_share_local_endpoint (proto, lcl_addr, port);
633
634       return port;
635     }
636   return -1;
637 }
638
639 static session_error_t
640 transport_get_interface_ip (u32 sw_if_index, u8 is_ip4, ip46_address_t * addr)
641 {
642   if (is_ip4)
643     {
644       ip4_address_t *ip4;
645       ip4 = ip_interface_get_first_ip (sw_if_index, 1);
646       if (!ip4)
647         return SESSION_E_NOIP;
648       addr->ip4.as_u32 = ip4->as_u32;
649     }
650   else
651     {
652       ip6_address_t *ip6;
653       ip6 = ip_interface_get_first_ip (sw_if_index, 0);
654       if (ip6 == 0)
655         return SESSION_E_NOIP;
656       clib_memcpy_fast (&addr->ip6, ip6, sizeof (*ip6));
657     }
658   return 0;
659 }
660
661 static session_error_t
662 transport_find_local_ip_for_remote (u32 *sw_if_index,
663                                     transport_endpoint_t *rmt,
664                                     ip46_address_t *lcl_addr)
665 {
666   fib_node_index_t fei;
667   fib_prefix_t prefix;
668
669   if (*sw_if_index == ENDPOINT_INVALID_INDEX)
670     {
671       /* Find a FIB path to the destination */
672       clib_memcpy_fast (&prefix.fp_addr, &rmt->ip, sizeof (rmt->ip));
673       prefix.fp_proto = rmt->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6;
674       prefix.fp_len = rmt->is_ip4 ? 32 : 128;
675
676       ASSERT (rmt->fib_index != ENDPOINT_INVALID_INDEX);
677       fei = fib_table_lookup (rmt->fib_index, &prefix);
678
679       /* Couldn't find route to destination. Bail out. */
680       if (fei == FIB_NODE_INDEX_INVALID)
681         return SESSION_E_NOROUTE;
682
683       *sw_if_index = fib_entry_get_resolving_interface (fei);
684       if (*sw_if_index == ENDPOINT_INVALID_INDEX)
685         return SESSION_E_NOINTF;
686     }
687
688   clib_memset (lcl_addr, 0, sizeof (*lcl_addr));
689   return transport_get_interface_ip (*sw_if_index, rmt->is_ip4, lcl_addr);
690 }
691
692 int
693 transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg,
694                                 ip46_address_t * lcl_addr, u16 * lcl_port)
695 {
696   transport_endpoint_t *rmt = (transport_endpoint_t *) rmt_cfg;
697   transport_main_t *tm = &tp_main;
698   session_error_t error;
699   int port;
700
701   /*
702    * Find the local address
703    */
704   if (ip_is_zero (&rmt_cfg->peer.ip, rmt_cfg->peer.is_ip4))
705     {
706       error = transport_find_local_ip_for_remote (&rmt_cfg->peer.sw_if_index,
707                                                   rmt, lcl_addr);
708       if (error)
709         return error;
710     }
711   else
712     {
713       /* Assume session layer vetted this address */
714       clib_memcpy_fast (lcl_addr, &rmt_cfg->peer.ip,
715                         sizeof (rmt_cfg->peer.ip));
716     }
717
718   /* Cleanup freelist if need be */
719   if (vec_len (tm->lcl_endpts_freelist))
720     transport_cleanup_freelist ();
721
722   /*
723    * Allocate source port
724    */
725   if (rmt_cfg->peer.port == 0)
726     {
727       port = transport_alloc_local_port (proto, lcl_addr, rmt_cfg);
728       if (port < 1)
729         return SESSION_E_NOPORT;
730       *lcl_port = port;
731     }
732   else
733     {
734       port = clib_net_to_host_u16 (rmt_cfg->peer.port);
735       *lcl_port = port;
736
737       if (!transport_endpoint_mark_used (proto, lcl_addr, port))
738         return 0;
739
740       /* IP:port pair already in use, check if 6-tuple available */
741       if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port,
742                                      rmt->port, proto, rmt->is_ip4))
743         return SESSION_E_PORTINUSE;
744
745       /* 6-tuple is available so increment lcl endpoint refcount */
746       transport_share_local_endpoint (proto, lcl_addr, port);
747
748       return 0;
749     }
750
751   return 0;
752 }
753
754 u8 *
755 format_clib_us_time (u8 * s, va_list * args)
756 {
757   clib_us_time_t t = va_arg (*args, clib_us_time_t);
758   if (t < 1e3)
759     s = format (s, "%u us", t);
760   else
761     s = format (s, "%.3f s", (f64) t * CLIB_US_TIME_PERIOD);
762   return s;
763 }
764
765 u8 *
766 format_transport_pacer (u8 * s, va_list * args)
767 {
768   spacer_t *pacer = va_arg (*args, spacer_t *);
769   u32 thread_index = va_arg (*args, int);
770   clib_us_time_t now, diff;
771
772   now = transport_us_time_now (thread_index);
773   diff = now - pacer->last_update;
774   s = format (s, "rate %lu bucket %ld t/p %.3f last_update %U burst %u",
775               pacer->bytes_per_sec, pacer->bucket, pacer->tokens_per_period,
776               format_clib_us_time, diff, pacer->max_burst);
777   return s;
778 }
779
780 static inline u32
781 spacer_max_burst (spacer_t * pacer, clib_us_time_t time_now)
782 {
783   u64 n_periods = (time_now - pacer->last_update);
784   i64 inc;
785
786   if ((inc = (f32) n_periods * pacer->tokens_per_period) > 10)
787     {
788       pacer->last_update = time_now;
789       pacer->bucket = clib_min (pacer->bucket + inc, (i64) pacer->max_burst);
790     }
791
792   return pacer->bucket >= 0 ? pacer->max_burst : 0;
793 }
794
795 static inline void
796 spacer_update_bucket (spacer_t * pacer, u32 bytes)
797 {
798   pacer->bucket -= bytes;
799 }
800
801 static inline void
802 spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec,
803                       clib_us_time_t rtt, clib_time_type_t sec_per_loop)
804 {
805   clib_us_time_t max_time;
806
807   ASSERT (rate_bytes_per_sec != 0);
808   pacer->bytes_per_sec = rate_bytes_per_sec;
809   pacer->tokens_per_period = rate_bytes_per_sec * CLIB_US_TIME_PERIOD;
810
811   /* Allow a min number of bursts per rtt, if their size is acceptable. Goal
812    * is to spread the sending of data over the rtt but to also allow for some
813    * coalescing that can potentially
814    * 1) reduce load on session layer by reducing scheduling frequency for a
815    *    session and
816    * 2) optimize sending when tso if available
817    *
818    * Max "time-length" of a burst cannot be less than 1us or more than 1ms.
819    */
820   max_time = clib_max (rtt / TRANSPORT_PACER_BURSTS_PER_RTT,
821                        (clib_us_time_t) (sec_per_loop * CLIB_US_TIME_FREQ));
822   max_time = clib_clamp (max_time, 1 /* 1us */ , 1000 /* 1ms */ );
823   pacer->max_burst = (rate_bytes_per_sec * max_time) * CLIB_US_TIME_PERIOD;
824   pacer->max_burst = clib_clamp (pacer->max_burst, TRANSPORT_PACER_MIN_BURST,
825                                  TRANSPORT_PACER_MAX_BURST);
826 }
827
828 static inline u64
829 spacer_pace_rate (spacer_t * pacer)
830 {
831   return pacer->bytes_per_sec;
832 }
833
834 static inline void
835 spacer_reset (spacer_t * pacer, clib_us_time_t time_now, u64 bucket)
836 {
837   pacer->last_update = time_now;
838   pacer->bucket = bucket;
839 }
840
841 void
842 transport_connection_tx_pacer_reset (transport_connection_t * tc,
843                                      u64 rate_bytes_per_sec, u32 start_bucket,
844                                      clib_us_time_t rtt)
845 {
846   spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec, rtt,
847                         transport_seconds_per_loop (tc->thread_index));
848   spacer_reset (&tc->pacer, transport_us_time_now (tc->thread_index),
849                 start_bucket);
850 }
851
852 void
853 transport_connection_tx_pacer_reset_bucket (transport_connection_t * tc,
854                                             u32 bucket)
855 {
856   spacer_reset (&tc->pacer, transport_us_time_now (tc->thread_index), bucket);
857 }
858
859 void
860 transport_connection_tx_pacer_init (transport_connection_t * tc,
861                                     u64 rate_bytes_per_sec,
862                                     u32 initial_bucket)
863 {
864   tc->flags |= TRANSPORT_CONNECTION_F_IS_TX_PACED;
865   transport_connection_tx_pacer_reset (tc, rate_bytes_per_sec,
866                                        initial_bucket, 1e6);
867 }
868
869 void
870 transport_connection_tx_pacer_update (transport_connection_t * tc,
871                                       u64 bytes_per_sec, clib_us_time_t rtt)
872 {
873   spacer_set_pace_rate (&tc->pacer, bytes_per_sec, rtt,
874                         transport_seconds_per_loop (tc->thread_index));
875 }
876
877 u32
878 transport_connection_tx_pacer_burst (transport_connection_t * tc)
879 {
880   return spacer_max_burst (&tc->pacer,
881                            transport_us_time_now (tc->thread_index));
882 }
883
884 u64
885 transport_connection_tx_pacer_rate (transport_connection_t * tc)
886 {
887   return spacer_pace_rate (&tc->pacer);
888 }
889
890 void
891 transport_connection_update_tx_bytes (transport_connection_t * tc, u32 bytes)
892 {
893   if (transport_connection_is_tx_paced (tc))
894     spacer_update_bucket (&tc->pacer, bytes);
895 }
896
897 void
898 transport_connection_tx_pacer_update_bytes (transport_connection_t * tc,
899                                             u32 bytes)
900 {
901   spacer_update_bucket (&tc->pacer, bytes);
902 }
903
904 void
905 transport_update_pacer_time (u32 thread_index, clib_time_type_t now)
906 {
907   session_wrk_update_time (session_main_get_worker (thread_index), now);
908 }
909
910 void
911 transport_connection_reschedule (transport_connection_t * tc)
912 {
913   tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
914   transport_connection_tx_pacer_reset_bucket (tc, 0 /* bucket */);
915   if (transport_max_tx_dequeue (tc))
916     sesssion_reschedule_tx (tc);
917   else
918     {
919       session_t *s = session_get (tc->s_index, tc->thread_index);
920       svm_fifo_unset_event (s->tx_fifo);
921       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
922         if (svm_fifo_set_event (s->tx_fifo))
923           sesssion_reschedule_tx (tc);
924     }
925 }
926
927 void
928 transport_fifos_init_ooo (transport_connection_t * tc)
929 {
930   session_t *s = session_get (tc->s_index, tc->thread_index);
931   svm_fifo_init_ooo_lookup (s->rx_fifo, 0 /* ooo enq */ );
932   svm_fifo_init_ooo_lookup (s->tx_fifo, 1 /* ooo deq */ );
933 }
934
935 void
936 transport_update_time (clib_time_type_t time_now, u8 thread_index)
937 {
938   transport_proto_vft_t *vft;
939   vec_foreach (vft, tp_vfts)
940   {
941     if (vft->update_time)
942       (vft->update_time) (time_now, thread_index);
943   }
944 }
945
946 void
947 transport_enable_disable (vlib_main_t * vm, u8 is_en)
948 {
949   transport_proto_vft_t *vft;
950   vec_foreach (vft, tp_vfts)
951   {
952     if (vft->enable)
953       (vft->enable) (vm, is_en);
954
955     if (vft->update_time)
956       session_register_update_time_fn (vft->update_time, is_en);
957   }
958 }
959
960 void
961 transport_init (void)
962 {
963   vlib_thread_main_t *vtm = vlib_get_thread_main ();
964   session_main_t *smm = vnet_get_session_main ();
965   transport_main_t *tm = &tp_main;
966   u32 num_threads;
967
968   if (smm->local_endpoints_table_buckets == 0)
969     smm->local_endpoints_table_buckets = 250000;
970   if (smm->local_endpoints_table_memory == 0)
971     smm->local_endpoints_table_memory = 512 << 20;
972
973   /* Initialize [port-allocator] random number seed */
974   tm->port_allocator_seed = (u32) clib_cpu_time_now ();
975
976   clib_bihash_init_24_8 (&tm->local_endpoints_table, "local endpoints table",
977                          smm->local_endpoints_table_buckets,
978                          smm->local_endpoints_table_memory);
979   clib_spinlock_init (&tm->local_endpoints_lock);
980
981   num_threads = 1 /* main thread */  + vtm->n_threads;
982   if (num_threads > 1)
983     {
984       /* Main not polled if there are workers */
985       smm->transport_cl_thread = 1;
986     }
987 }
988
989 /*
990  * fd.io coding-style-patch-verification: ON
991  *
992  * Local Variables:
993  * eval: (c-set-style "gnu")
994  * End:
995  */