X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fvnet%2Fsession%2Ftransport.c;h=1c2a9261d3caf370eb1bc6c76529c2ae1245cd68;hb=7e9e2bd8f2240dc286c96440fca43ada3c3056d6;hp=93e02247b4e1aa65d88f420037dd8e94f263ad4f;hpb=05ead78945831fbaad6e973d21dfbd9a2c9e2ba4;p=vpp.git diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index 93e02247b4e..1c2a9261d3c 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -17,36 +17,31 @@ #include #include -typedef struct local_endpoint_ -{ - transport_endpoint_t ep; - int refcnt; -} local_endpoint_t; - /** * Per-type vector of transport protocol virtual function tables */ transport_proto_vft_t *tp_vfts; -/* - * Port allocator seed - */ -static u32 port_allocator_seed; - -/* - * Local endpoints table - */ -static transport_endpoint_table_t local_endpoints_table; +typedef struct local_endpoint_ +{ + transport_endpoint_t ep; + transport_proto_t proto; + int refcnt; +} local_endpoint_t; -/* - * Pool of local endpoints - */ -static local_endpoint_t *local_endpoints; +typedef struct transport_main_ +{ + transport_endpoint_table_t local_endpoints_table; + local_endpoint_t *local_endpoints; + u32 *lcl_endpts_freelist; + u32 port_allocator_seed; + u16 port_allocator_min_src_port; + u16 port_allocator_max_src_port; + u8 lcl_endpts_cleanup_pending; + clib_spinlock_t local_endpoints_lock; +} transport_main_t; -/* - * Local endpoints pool lock - */ -static clib_spinlock_t local_endpoints_lock; +static transport_main_t tp_main; u8 * format_transport_proto (u8 * s, va_list * args) @@ -76,6 +71,35 @@ format_transport_proto_short (u8 * s, va_list * args) return s; } +const char *transport_flags_str[] = { +#define _(sym, str) str, + foreach_transport_connection_flag +#undef _ +}; + +u8 * +format_transport_flags (u8 *s, va_list *args) +{ + transport_connection_flags_t flags; + int i, last = -1; + + flags = va_arg (*args, transport_connection_flags_t); + + for (i = 0; i < TRANSPORT_CONNECTION_N_FLAGS; i++) + if (flags & (1 << i)) + last = i; + + for (i = 0; i < last; i++) + { + if (flags & (1 << i)) + s = format (s, "%s, ", transport_flags_str[i]); + } + if (last >= 0) + s = format (s, "%s", transport_flags_str[last]); + + return s; +} + u8 * format_transport_connection (u8 * s, va_list * args) { @@ -100,8 +124,8 @@ format_transport_connection (u8 * s, va_list * args) if (transport_connection_is_tx_paced (tc)) s = format (s, "%Upacer: %U\n", format_white_space, indent, format_transport_pacer, &tc->pacer, tc->thread_index); - s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent, - tc->flags); + s = format (s, "%Utransport: flags: %U\n", format_white_space, indent, + format_transport_flags, tc->flags); } return s; } @@ -124,14 +148,13 @@ u8 * format_transport_half_open_connection (u8 * s, va_list * args) { u32 transport_proto = va_arg (*args, u32); - u32 ho_index = va_arg (*args, u32); transport_proto_vft_t *tp_vft; tp_vft = transport_protocol_get_vft (transport_proto); if (!tp_vft) return s; - s = format (s, "%U", tp_vft->format_half_open, ho_index); + s = (tp_vft->format_half_open) (s, args); return s; } @@ -314,6 +337,8 @@ transport_cleanup_half_open (transport_proto_t tp, u32 conn_index) int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep) { + if (PREDICT_FALSE (!tp_vfts[tp].connect)) + return SESSION_E_TRANSPORT_NO_REG; return tp_vfts[tp].connect (tep); } @@ -343,6 +368,8 @@ u32 transport_start_listen (transport_proto_t tp, u32 session_index, transport_endpoint_cfg_t *tep) { + if (PREDICT_FALSE (!tp_vfts[tp].start_listen)) + return SESSION_E_TRANSPORT_NO_REG; return tp_vfts[tp].start_listen (session_index, tep); } @@ -422,78 +449,146 @@ transport_connection_attribute (transport_proto_t tp, u32 conn_index, void transport_endpoint_free (u32 tepi) { - /* All workers can free connections. Synchronize access to pool */ - clib_spinlock_lock (&local_endpoints_lock); - pool_put_index (local_endpoints, tepi); - clib_spinlock_unlock (&local_endpoints_lock); -} - -static void -transport_endpoint_pool_realloc_rpc (void *rpc_args) -{ - pool_realloc_safe_aligned (local_endpoints, 0); + transport_main_t *tm = &tp_main; + pool_put_index (tm->local_endpoints, tepi); } always_inline local_endpoint_t * transport_endpoint_alloc (void) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; ASSERT (vlib_get_thread_index () <= transport_cl_thread ()); - pool_get_aligned_safe (local_endpoints, lep, transport_cl_thread (), - transport_endpoint_pool_realloc_rpc, 0); - pool_get_zero (local_endpoints, lep); + + pool_get_aligned_safe (tm->local_endpoints, lep, 0); return lep; } +static void +transport_cleanup_freelist (void) +{ + transport_main_t *tm = &tp_main; + local_endpoint_t *lep; + u32 *lep_indexp; + + clib_spinlock_lock (&tm->local_endpoints_lock); + + vec_foreach (lep_indexp, tm->lcl_endpts_freelist) + { + lep = pool_elt_at_index (tm->local_endpoints, *lep_indexp); + + /* Port re-shared after attempt to cleanup */ + if (lep->refcnt > 0) + continue; + + transport_endpoint_table_del (&tm->local_endpoints_table, lep->proto, + &lep->ep); + transport_endpoint_free (*lep_indexp); + } + + vec_reset_length (tm->lcl_endpts_freelist); + + tm->lcl_endpts_cleanup_pending = 0; + + clib_spinlock_unlock (&tm->local_endpoints_lock); +} + void -transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port) +transport_program_endpoint_cleanup (u32 lepi) +{ + transport_main_t *tm = &tp_main; + u8 flush_fl = 0; + + /* All workers can free connections. Synchronize access to freelist */ + clib_spinlock_lock (&tm->local_endpoints_lock); + + vec_add1 (tm->lcl_endpts_freelist, lepi); + + /* Avoid accumulating lots of endpoints for cleanup */ + if (!tm->lcl_endpts_cleanup_pending && + vec_len (tm->lcl_endpts_freelist) > 32) + { + tm->lcl_endpts_cleanup_pending = 1; + flush_fl = 1; + } + + clib_spinlock_unlock (&tm->local_endpoints_lock); + + if (flush_fl) + session_send_rpc_evt_to_thread_force (transport_cl_thread (), + transport_cleanup_freelist, 0); +} + +int +transport_release_local_endpoint (u8 proto, ip46_address_t *lcl_ip, u16 port) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; u32 lepi; - /* Cleanup local endpoint if this was an active connect */ - lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip, - clib_net_to_host_u16 (port)); - if (lepi != ENDPOINT_INVALID_INDEX) + lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip, + port); + if (lepi == ENDPOINT_INVALID_INDEX) + return -1; + + /* First worker may be cleaning up ports so avoid touching free bitmap */ + lep = &tm->local_endpoints[lepi]; + ASSERT (lep->refcnt >= 1); + + /* Local endpoint no longer in use, program cleanup */ + if (!clib_atomic_sub_fetch (&lep->refcnt, 1)) { - lep = pool_elt_at_index (local_endpoints, lepi); - if (!clib_atomic_sub_fetch (&lep->refcnt, 1)) - { - transport_endpoint_table_del (&local_endpoints_table, proto, - &lep->ep); - transport_endpoint_free (lepi); - } + transport_program_endpoint_cleanup (lepi); + return 0; } + + /* Not an error, just in idication that endpoint was not cleaned up */ + return -1; } -static void -transport_endpoint_mark_used (u8 proto, ip46_address_t * ip, u16 port) +static int +transport_endpoint_mark_used (u8 proto, ip46_address_t *ip, u16 port) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; + u32 tei; ASSERT (vlib_get_thread_index () <= transport_cl_thread ()); + tei = + transport_endpoint_lookup (&tm->local_endpoints_table, proto, ip, port); + if (tei != ENDPOINT_INVALID_INDEX) + return SESSION_E_PORTINUSE; + /* Pool reallocs with worker barrier */ lep = transport_endpoint_alloc (); clib_memcpy_fast (&lep->ep.ip, ip, sizeof (*ip)); lep->ep.port = port; + lep->proto = proto; lep->refcnt = 1; - transport_endpoint_table_add (&local_endpoints_table, proto, &lep->ep, - lep - local_endpoints); + + transport_endpoint_table_add (&tm->local_endpoints_table, proto, &lep->ep, + lep - tm->local_endpoints); + + return 0; } void transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; u32 lepi; - lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip, - clib_net_to_host_u16 (port)); + /* Active opens should call this only from a control thread, which are also + * used to allocate and free ports. So, pool has only one writer and + * potentially many readers. Listeners are allocated with barrier */ + lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip, + port); if (lepi != ENDPOINT_INVALID_INDEX) { - lep = pool_elt_at_index (local_endpoints, lepi); + lep = pool_elt_at_index (tm->local_endpoints, lepi); clib_atomic_add_fetch (&lep->refcnt, 1); } } @@ -501,13 +596,17 @@ transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port) /** * Allocate local port and add if successful add entry to local endpoint * table to mark the pair as used. + * + * @return port in net order or -1 if port cannot be allocated */ int -transport_alloc_local_port (u8 proto, ip46_address_t * ip) +transport_alloc_local_port (u8 proto, ip46_address_t *lcl_addr, + transport_endpoint_cfg_t *rmt) { - u16 min = 1024, max = 65535; /* XXX configurable ? */ + transport_main_t *tm = &tp_main; + u16 min = tm->port_allocator_min_src_port; + u16 max = tm->port_allocator_max_src_port; int tries, limit; - u32 tei; limit = max - min; @@ -522,19 +621,26 @@ transport_alloc_local_port (u8 proto, ip46_address_t * ip) /* Find a port in the specified range */ while (1) { - port = random_u32 (&port_allocator_seed) & PORT_MASK; + port = random_u32 (&tm->port_allocator_seed) & PORT_MASK; if (PREDICT_TRUE (port >= min && port < max)) - break; + { + port = clib_host_to_net_u16 (port); + break; + } } - /* Look it up. If not found, we're done */ - tei = transport_endpoint_lookup (&local_endpoints_table, proto, ip, - port); - if (tei == ENDPOINT_INVALID_INDEX) - { - transport_endpoint_mark_used (proto, ip, port); - return port; - } + if (!transport_endpoint_mark_used (proto, lcl_addr, port)) + return port; + + /* IP:port pair already in use, check if 6-tuple available */ + if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port, + rmt->port, proto, rmt->is_ip4)) + continue; + + /* 6-tuple is available so increment lcl endpoint refcount */ + transport_share_local_endpoint (proto, lcl_addr, port); + + return port; } return -1; } @@ -597,9 +703,9 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg, ip46_address_t * lcl_addr, u16 * lcl_port) { transport_endpoint_t *rmt = (transport_endpoint_t *) rmt_cfg; + transport_main_t *tm = &tp_main; session_error_t error; int port; - u32 tei; /* * Find the local address @@ -618,26 +724,37 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg, sizeof (rmt_cfg->peer.ip)); } + /* Cleanup freelist if need be */ + if (vec_len (tm->lcl_endpts_freelist)) + transport_cleanup_freelist (); + /* * Allocate source port */ if (rmt_cfg->peer.port == 0) { - port = transport_alloc_local_port (proto, lcl_addr); + port = transport_alloc_local_port (proto, lcl_addr, rmt_cfg); if (port < 1) return SESSION_E_NOPORT; *lcl_port = port; } else { - port = clib_net_to_host_u16 (rmt_cfg->peer.port); - *lcl_port = port; - tei = transport_endpoint_lookup (&local_endpoints_table, proto, - lcl_addr, port); - if (tei != ENDPOINT_INVALID_INDEX) + *lcl_port = rmt_cfg->peer.port; + + if (!transport_endpoint_mark_used (proto, lcl_addr, rmt_cfg->peer.port)) + return 0; + + /* IP:port pair already in use, check if 6-tuple available */ + if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, + rmt_cfg->peer.port, rmt->port, proto, + rmt->is_ip4)) return SESSION_E_PORTINUSE; - transport_endpoint_mark_used (proto, lcl_addr, port); + /* 6-tuple is available so increment lcl endpoint refcount */ + transport_share_local_endpoint (proto, lcl_addr, rmt_cfg->peer.port); + + return 0; } return 0; @@ -854,6 +971,7 @@ transport_init (void) { vlib_thread_main_t *vtm = vlib_get_thread_main (); session_main_t *smm = vnet_get_session_main (); + transport_main_t *tm = &tp_main; u32 num_threads; if (smm->local_endpoints_table_buckets == 0) @@ -862,12 +980,14 @@ transport_init (void) smm->local_endpoints_table_memory = 512 << 20; /* Initialize [port-allocator] random number seed */ - port_allocator_seed = (u32) clib_cpu_time_now (); + tm->port_allocator_seed = (u32) clib_cpu_time_now (); + tm->port_allocator_min_src_port = smm->port_allocator_min_src_port; + tm->port_allocator_max_src_port = smm->port_allocator_max_src_port; - clib_bihash_init_24_8 (&local_endpoints_table, "local endpoints table", + clib_bihash_init_24_8 (&tm->local_endpoints_table, "local endpoints table", smm->local_endpoints_table_buckets, smm->local_endpoints_table_memory); - clib_spinlock_init (&local_endpoints_lock); + clib_spinlock_init (&tm->local_endpoints_lock); num_threads = 1 /* main thread */ + vtm->n_threads; if (num_threads > 1)