X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Ftransport.c;h=0012ba288bfb428ce77d80263c8f1e82d28c4ae8;hb=e111bbd12;hp=8e5df3f84181f4d57ae900b3f0d20697fb937ae9;hpb=36d49391aadeb10b9f3626b62c5c019c4fddf5ed;p=vpp.git diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index 8e5df3f8418..0012ba288bf 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -17,36 +17,31 @@ #include #include -typedef struct local_endpoint_ -{ - transport_endpoint_t ep; - int refcnt; -} local_endpoint_t; - /** * Per-type vector of transport protocol virtual function tables */ transport_proto_vft_t *tp_vfts; -/* - * Port allocator seed - */ -static u32 port_allocator_seed; - -/* - * Local endpoints table - */ -static transport_endpoint_table_t local_endpoints_table; +typedef struct local_endpoint_ +{ + transport_endpoint_t ep; + transport_proto_t proto; + int refcnt; +} local_endpoint_t; -/* - * Pool of local endpoints - */ -static local_endpoint_t *local_endpoints; +typedef struct transport_main_ +{ + transport_endpoint_table_t local_endpoints_table; + local_endpoint_t *local_endpoints; + u32 *lcl_endpts_freelist; + u32 port_allocator_seed; + u16 port_allocator_min_src_port; + u16 port_allocator_max_src_port; + u8 lcl_endpts_cleanup_pending; + clib_spinlock_t local_endpoints_lock; +} transport_main_t; -/* - * Local endpoints pool lock - */ -static clib_spinlock_t local_endpoints_lock; +static transport_main_t tp_main; u8 * format_transport_proto (u8 * s, va_list * args) @@ -76,6 +71,35 @@ format_transport_proto_short (u8 * s, va_list * args) return s; } +const char *transport_flags_str[] = { +#define _(sym, str) str, + foreach_transport_connection_flag +#undef _ +}; + +u8 * +format_transport_flags (u8 *s, va_list *args) +{ + transport_connection_flags_t flags; + int i, last = -1; + + flags = va_arg (*args, transport_connection_flags_t); + + for (i = 0; i < TRANSPORT_CONNECTION_N_FLAGS; i++) + if (flags & (1 << i)) + last = i; + + for (i = 0; i < last; i++) + { + if (flags & (1 << i)) + s = format (s, "%s, ", transport_flags_str[i]); + } + if (last >= 0) + s = format (s, "%s", transport_flags_str[last]); + + return s; +} + u8 * format_transport_connection (u8 * s, va_list * args) { @@ -100,8 +124,8 @@ format_transport_connection (u8 * s, va_list * args) if (transport_connection_is_tx_paced (tc)) s = format (s, "%Upacer: %U\n", format_white_space, indent, format_transport_pacer, &tc->pacer, tc->thread_index); - s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent, - tc->flags); + s = format (s, "%Utransport: flags: %U\n", format_white_space, indent, + format_transport_flags, tc->flags); } return s; } @@ -124,14 +148,13 @@ u8 * format_transport_half_open_connection (u8 * s, va_list * args) { u32 transport_proto = va_arg (*args, u32); - u32 listen_index = va_arg (*args, u32); transport_proto_vft_t *tp_vft; tp_vft = transport_protocol_get_vft (transport_proto); if (!tp_vft) return s; - s = format (s, "%U", tp_vft->format_half_open, listen_index); + s = (tp_vft->format_half_open) (s, args); return s; } @@ -307,16 +330,25 @@ transport_cleanup (transport_proto_t tp, u32 conn_index, u8 thread_index) void transport_cleanup_half_open (transport_proto_t tp, u32 conn_index) { - if (tp_vfts[tp].cleanup) + if (tp_vfts[tp].cleanup_ho) tp_vfts[tp].cleanup_ho (conn_index); } int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep) { + if (PREDICT_FALSE (!tp_vfts[tp].connect)) + return SESSION_E_TRANSPORT_NO_REG; return tp_vfts[tp].connect (tep); } +void +transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index) +{ + if (tp_vfts[tp].half_close) + tp_vfts[tp].half_close (conn_index, thread_index); +} + void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index) { @@ -334,8 +366,10 @@ transport_reset (transport_proto_t tp, u32 conn_index, u8 thread_index) u32 transport_start_listen (transport_proto_t tp, u32 session_index, - transport_endpoint_t * tep) + transport_endpoint_cfg_t *tep) { + if (PREDICT_FALSE (!tp_vfts[tp].start_listen)) + return SESSION_E_TRANSPORT_NO_REG; return tp_vfts[tp].start_listen (session_index, tep); } @@ -399,70 +433,160 @@ transport_get_listener_endpoint (transport_proto_t tp, u32 conn_index, } } +int +transport_connection_attribute (transport_proto_t tp, u32 conn_index, + u8 thread_index, u8 is_get, + transport_endpt_attr_t *attr) +{ + if (!tp_vfts[tp].attribute) + return -1; + + return tp_vfts[tp].attribute (conn_index, thread_index, is_get, attr); +} + #define PORT_MASK ((1 << 16)- 1) void -transport_endpoint_del (u32 tepi) +transport_endpoint_free (u32 tepi) { - clib_spinlock_lock_if_init (&local_endpoints_lock); - pool_put_index (local_endpoints, tepi); - clib_spinlock_unlock_if_init (&local_endpoints_lock); + transport_main_t *tm = &tp_main; + pool_put_index (tm->local_endpoints, tepi); } always_inline local_endpoint_t * -transport_endpoint_new (void) +transport_endpoint_alloc (void) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; - pool_get_zero (local_endpoints, lep); + + ASSERT (vlib_get_thread_index () <= transport_cl_thread ()); + + pool_get_aligned_safe (tm->local_endpoints, lep, 0); return lep; } +static void +transport_cleanup_freelist (void) +{ + transport_main_t *tm = &tp_main; + local_endpoint_t *lep; + u32 *lep_indexp; + + clib_spinlock_lock (&tm->local_endpoints_lock); + + vec_foreach (lep_indexp, tm->lcl_endpts_freelist) + { + lep = pool_elt_at_index (tm->local_endpoints, *lep_indexp); + + /* Port re-shared after attempt to cleanup */ + if (lep->refcnt > 0) + continue; + + transport_endpoint_table_del (&tm->local_endpoints_table, lep->proto, + &lep->ep); + transport_endpoint_free (*lep_indexp); + } + + vec_reset_length (tm->lcl_endpts_freelist); + + tm->lcl_endpts_cleanup_pending = 0; + + clib_spinlock_unlock (&tm->local_endpoints_lock); +} + void -transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port) +transport_program_endpoint_cleanup (u32 lepi) { + transport_main_t *tm = &tp_main; + u8 flush_fl = 0; + + /* All workers can free connections. Synchronize access to freelist */ + clib_spinlock_lock (&tm->local_endpoints_lock); + + vec_add1 (tm->lcl_endpts_freelist, lepi); + + /* Avoid accumulating lots of endpoints for cleanup */ + if (!tm->lcl_endpts_cleanup_pending && + vec_len (tm->lcl_endpts_freelist) > 32) + { + tm->lcl_endpts_cleanup_pending = 1; + flush_fl = 1; + } + + clib_spinlock_unlock (&tm->local_endpoints_lock); + + if (flush_fl) + session_send_rpc_evt_to_thread_force (transport_cl_thread (), + transport_cleanup_freelist, 0); +} + +int +transport_release_local_endpoint (u8 proto, ip46_address_t *lcl_ip, u16 port) +{ + transport_main_t *tm = &tp_main; local_endpoint_t *lep; u32 lepi; - /* Cleanup local endpoint if this was an active connect */ - lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip, + lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip, clib_net_to_host_u16 (port)); - if (lepi != ENDPOINT_INVALID_INDEX) + if (lepi == ENDPOINT_INVALID_INDEX) + return -1; + + lep = pool_elt_at_index (tm->local_endpoints, lepi); + + /* Local endpoint no longer in use, program cleanup */ + if (!clib_atomic_sub_fetch (&lep->refcnt, 1)) { - lep = pool_elt_at_index (local_endpoints, lepi); - if (!clib_atomic_sub_fetch (&lep->refcnt, 1)) - { - transport_endpoint_table_del (&local_endpoints_table, proto, - &lep->ep); - transport_endpoint_del (lepi); - } + transport_program_endpoint_cleanup (lepi); + return 0; } + + /* Not an error, just in idication that endpoint was not cleaned up */ + return -1; } -static void -transport_endpoint_mark_used (u8 proto, ip46_address_t * ip, u16 port) +static int +transport_endpoint_mark_used (u8 proto, ip46_address_t *ip, u16 port) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; - clib_spinlock_lock_if_init (&local_endpoints_lock); - lep = transport_endpoint_new (); + u32 tei; + + ASSERT (vlib_get_thread_index () <= transport_cl_thread ()); + + tei = + transport_endpoint_lookup (&tm->local_endpoints_table, proto, ip, port); + if (tei != ENDPOINT_INVALID_INDEX) + return SESSION_E_PORTINUSE; + + /* Pool reallocs with worker barrier */ + lep = transport_endpoint_alloc (); clib_memcpy_fast (&lep->ep.ip, ip, sizeof (*ip)); lep->ep.port = port; + lep->proto = proto; lep->refcnt = 1; - transport_endpoint_table_add (&local_endpoints_table, proto, &lep->ep, - lep - local_endpoints); - clib_spinlock_unlock_if_init (&local_endpoints_lock); + + transport_endpoint_table_add (&tm->local_endpoints_table, proto, &lep->ep, + lep - tm->local_endpoints); + + return 0; } void transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; u32 lepi; - lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip, + /* Active opens should call this only from a control thread, which are also + * used to allocate and free ports. So, pool has only one writer and + * potentially many readers. Listeners are allocated with barrier */ + lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip, clib_net_to_host_u16 (port)); if (lepi != ENDPOINT_INVALID_INDEX) { - lep = pool_elt_at_index (local_endpoints, lepi); + lep = pool_elt_at_index (tm->local_endpoints, lepi); clib_atomic_add_fetch (&lep->refcnt, 1); } } @@ -472,16 +596,18 @@ transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port) * table to mark the pair as used. */ int -transport_alloc_local_port (u8 proto, ip46_address_t * ip) +transport_alloc_local_port (u8 proto, ip46_address_t *lcl_addr, + transport_endpoint_cfg_t *rmt) { - u16 min = 1024, max = 65535; /* XXX configurable ? */ + transport_main_t *tm = &tp_main; + u16 min = tm->port_allocator_min_src_port; + u16 max = tm->port_allocator_max_src_port; int tries, limit; - u32 tei; limit = max - min; - /* Only support active opens from thread 0 */ - ASSERT (vlib_get_thread_index () == 0); + /* Only support active opens from one of ctrl threads */ + ASSERT (vlib_get_thread_index () <= transport_cl_thread ()); /* Search for first free slot */ for (tries = 0; tries < limit; tries++) @@ -491,19 +617,23 @@ transport_alloc_local_port (u8 proto, ip46_address_t * ip) /* Find a port in the specified range */ while (1) { - port = random_u32 (&port_allocator_seed) & PORT_MASK; + port = random_u32 (&tm->port_allocator_seed) & PORT_MASK; if (PREDICT_TRUE (port >= min && port < max)) break; } - /* Look it up. If not found, we're done */ - tei = transport_endpoint_lookup (&local_endpoints_table, proto, ip, - port); - if (tei == ENDPOINT_INVALID_INDEX) - { - transport_endpoint_mark_used (proto, ip, port); - return port; - } + if (!transport_endpoint_mark_used (proto, lcl_addr, port)) + return port; + + /* IP:port pair already in use, check if 6-tuple available */ + if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port, + rmt->port, proto, rmt->is_ip4)) + continue; + + /* 6-tuple is available so increment lcl endpoint refcount */ + transport_share_local_endpoint (proto, lcl_addr, port); + + return port; } return -1; } @@ -531,14 +661,14 @@ transport_get_interface_ip (u32 sw_if_index, u8 is_ip4, ip46_address_t * addr) } static session_error_t -transport_find_local_ip_for_remote (u32 sw_if_index, - transport_endpoint_t * rmt, - ip46_address_t * lcl_addr) +transport_find_local_ip_for_remote (u32 *sw_if_index, + transport_endpoint_t *rmt, + ip46_address_t *lcl_addr) { fib_node_index_t fei; fib_prefix_t prefix; - if (sw_if_index == ENDPOINT_INVALID_INDEX) + if (*sw_if_index == ENDPOINT_INVALID_INDEX) { /* Find a FIB path to the destination */ clib_memcpy_fast (&prefix.fp_addr, &rmt->ip, sizeof (rmt->ip)); @@ -552,13 +682,13 @@ transport_find_local_ip_for_remote (u32 sw_if_index, if (fei == FIB_NODE_INDEX_INVALID) return SESSION_E_NOROUTE; - sw_if_index = fib_entry_get_resolving_interface (fei); - if (sw_if_index == ENDPOINT_INVALID_INDEX) + *sw_if_index = fib_entry_get_resolving_interface (fei); + if (*sw_if_index == ENDPOINT_INVALID_INDEX) return SESSION_E_NOINTF; } clib_memset (lcl_addr, 0, sizeof (*lcl_addr)); - return transport_get_interface_ip (sw_if_index, rmt->is_ip4, lcl_addr); + return transport_get_interface_ip (*sw_if_index, rmt->is_ip4, lcl_addr); } int @@ -566,16 +696,16 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg, ip46_address_t * lcl_addr, u16 * lcl_port) { transport_endpoint_t *rmt = (transport_endpoint_t *) rmt_cfg; + transport_main_t *tm = &tp_main; session_error_t error; int port; - u32 tei; /* * Find the local address */ if (ip_is_zero (&rmt_cfg->peer.ip, rmt_cfg->peer.is_ip4)) { - error = transport_find_local_ip_for_remote (rmt_cfg->peer.sw_if_index, + error = transport_find_local_ip_for_remote (&rmt_cfg->peer.sw_if_index, rmt, lcl_addr); if (error) return error; @@ -587,12 +717,16 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg, sizeof (rmt_cfg->peer.ip)); } + /* Cleanup freelist if need be */ + if (vec_len (tm->lcl_endpts_freelist)) + transport_cleanup_freelist (); + /* * Allocate source port */ if (rmt_cfg->peer.port == 0) { - port = transport_alloc_local_port (proto, lcl_addr); + port = transport_alloc_local_port (proto, lcl_addr, rmt_cfg); if (port < 1) return SESSION_E_NOPORT; *lcl_port = port; @@ -601,12 +735,19 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg, { port = clib_net_to_host_u16 (rmt_cfg->peer.port); *lcl_port = port; - tei = transport_endpoint_lookup (&local_endpoints_table, proto, - lcl_addr, port); - if (tei != ENDPOINT_INVALID_INDEX) + + if (!transport_endpoint_mark_used (proto, lcl_addr, port)) + return 0; + + /* IP:port pair already in use, check if 6-tuple available */ + if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port, + rmt->port, proto, rmt->is_ip4)) return SESSION_E_PORTINUSE; - transport_endpoint_mark_used (proto, lcl_addr, port); + /* 6-tuple is available so increment lcl endpoint refcount */ + transport_share_local_endpoint (proto, lcl_addr, port); + + return 0; } return 0; @@ -632,9 +773,9 @@ format_transport_pacer (u8 * s, va_list * args) now = transport_us_time_now (thread_index); diff = now - pacer->last_update; - s = format (s, "rate %lu bucket %lu t/p %.3f last_update %U idle %u", + s = format (s, "rate %lu bucket %ld t/p %.3f last_update %U burst %u", pacer->bytes_per_sec, pacer->bucket, pacer->tokens_per_period, - format_clib_us_time, diff, pacer->idle_timeout_us); + format_clib_us_time, diff, pacer->max_burst); return s; } @@ -642,40 +783,48 @@ static inline u32 spacer_max_burst (spacer_t * pacer, clib_us_time_t time_now) { u64 n_periods = (time_now - pacer->last_update); - u64 inc; - - if (PREDICT_FALSE (n_periods > pacer->idle_timeout_us)) - { - pacer->last_update = time_now; - pacer->bucket = TRANSPORT_PACER_MIN_BURST; - return TRANSPORT_PACER_MIN_BURST; - } + i64 inc; if ((inc = (f32) n_periods * pacer->tokens_per_period) > 10) { pacer->last_update = time_now; - pacer->bucket = clib_min (pacer->bucket + inc, pacer->bytes_per_sec); + pacer->bucket = clib_min (pacer->bucket + inc, (i64) pacer->max_burst); } - return clib_min (pacer->bucket, TRANSPORT_PACER_MAX_BURST); + return pacer->bucket >= 0 ? pacer->max_burst : 0; } static inline void spacer_update_bucket (spacer_t * pacer, u32 bytes) { - ASSERT (pacer->bucket >= bytes); pacer->bucket -= bytes; } static inline void spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec, - clib_us_time_t rtt) + clib_us_time_t rtt, clib_time_type_t sec_per_loop) { + clib_us_time_t max_time; + ASSERT (rate_bytes_per_sec != 0); pacer->bytes_per_sec = rate_bytes_per_sec; pacer->tokens_per_period = rate_bytes_per_sec * CLIB_US_TIME_PERIOD; - pacer->idle_timeout_us = clib_max (rtt * TRANSPORT_PACER_IDLE_FACTOR, - TRANSPORT_PACER_MIN_IDLE); + + /* Allow a min number of bursts per rtt, if their size is acceptable. Goal + * is to spread the sending of data over the rtt but to also allow for some + * coalescing that can potentially + * 1) reduce load on session layer by reducing scheduling frequency for a + * session and + * 2) optimize sending when tso if available + * + * Max "time-length" of a burst cannot be less than 1us or more than 1ms. + */ + max_time = clib_max (rtt / TRANSPORT_PACER_BURSTS_PER_RTT, + (clib_us_time_t) (sec_per_loop * CLIB_US_TIME_FREQ)); + max_time = clib_clamp (max_time, 1 /* 1us */ , 1000 /* 1ms */ ); + pacer->max_burst = (rate_bytes_per_sec * max_time) * CLIB_US_TIME_PERIOD; + pacer->max_burst = clib_clamp (pacer->max_burst, TRANSPORT_PACER_MIN_BURST, + TRANSPORT_PACER_MAX_BURST); } static inline u64 @@ -696,7 +845,8 @@ transport_connection_tx_pacer_reset (transport_connection_t * tc, u64 rate_bytes_per_sec, u32 start_bucket, clib_us_time_t rtt) { - spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec, rtt); + spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec, rtt, + transport_seconds_per_loop (tc->thread_index)); spacer_reset (&tc->pacer, transport_us_time_now (tc->thread_index), start_bucket); } @@ -722,7 +872,8 @@ void transport_connection_tx_pacer_update (transport_connection_t * tc, u64 bytes_per_sec, clib_us_time_t rtt) { - spacer_set_pace_rate (&tc->pacer, bytes_per_sec, rtt); + spacer_set_pace_rate (&tc->pacer, bytes_per_sec, rtt, + transport_seconds_per_loop (tc->thread_index)); } u32 @@ -752,11 +903,17 @@ transport_connection_tx_pacer_update_bytes (transport_connection_t * tc, spacer_update_bucket (&tc->pacer, bytes); } +void +transport_update_pacer_time (u32 thread_index, clib_time_type_t now) +{ + session_wrk_update_time (session_main_get_worker (thread_index), now); +} + void transport_connection_reschedule (transport_connection_t * tc) { tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED; - transport_connection_tx_pacer_reset_bucket (tc, TRANSPORT_PACER_MIN_BURST); + transport_connection_tx_pacer_reset_bucket (tc, 0 /* bucket */); if (transport_max_tx_dequeue (tc)) sesssion_reschedule_tx (tc); else @@ -769,6 +926,14 @@ transport_connection_reschedule (transport_connection_t * tc) } } +void +transport_fifos_init_ooo (transport_connection_t * tc) +{ + session_t *s = session_get (tc->s_index, tc->thread_index); + svm_fifo_init_ooo_lookup (s->rx_fifo, 0 /* ooo enq */ ); + svm_fifo_init_ooo_lookup (s->tx_fifo, 1 /* ooo deq */ ); +} + void transport_update_time (clib_time_type_t time_now, u8 thread_index) { @@ -788,6 +953,9 @@ transport_enable_disable (vlib_main_t * vm, u8 is_en) { if (vft->enable) (vft->enable) (vm, is_en); + + if (vft->update_time) + session_register_update_time_fn (vft->update_time, is_en); } } @@ -796,6 +964,7 @@ transport_init (void) { vlib_thread_main_t *vtm = vlib_get_thread_main (); session_main_t *smm = vnet_get_session_main (); + transport_main_t *tm = &tp_main; u32 num_threads; if (smm->local_endpoints_table_buckets == 0) @@ -804,14 +973,21 @@ transport_init (void) smm->local_endpoints_table_memory = 512 << 20; /* Initialize [port-allocator] random number seed */ - port_allocator_seed = (u32) clib_cpu_time_now (); + tm->port_allocator_seed = (u32) clib_cpu_time_now (); + tm->port_allocator_min_src_port = smm->port_allocator_min_src_port; + tm->port_allocator_max_src_port = smm->port_allocator_max_src_port; - clib_bihash_init_24_8 (&local_endpoints_table, "local endpoints table", + clib_bihash_init_24_8 (&tm->local_endpoints_table, "local endpoints table", smm->local_endpoints_table_buckets, smm->local_endpoints_table_memory); + clib_spinlock_init (&tm->local_endpoints_lock); + num_threads = 1 /* main thread */ + vtm->n_threads; if (num_threads > 1) - clib_spinlock_init (&local_endpoints_lock); + { + /* Main not polled if there are workers */ + smm->transport_cl_thread = 1; + } } /*