X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Ftransport.c;h=7e40164a9f87d88fa083c9ec5735d514d2d9864a;hb=e541d6f0edd39a093bcbee91282969647c1e6ebf;hp=5d03a4f8453a30ef0223cdecd7644a1e5d4f1ced;hpb=2b4f74ff9a48bfb1c764a4d23e00982ab347f1c3;p=vpp.git diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index 5d03a4f8453..7e40164a9f8 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -17,36 +17,29 @@ #include #include -typedef struct local_endpoint_ -{ - transport_endpoint_t ep; - int refcnt; -} local_endpoint_t; - /** * Per-type vector of transport protocol virtual function tables */ transport_proto_vft_t *tp_vfts; -/* - * Port allocator seed - */ -static u32 port_allocator_seed; - -/* - * Local endpoints table - */ -static transport_endpoint_table_t local_endpoints_table; +typedef struct local_endpoint_ +{ + transport_endpoint_t ep; + transport_proto_t proto; + int refcnt; +} local_endpoint_t; -/* - * Pool of local endpoints - */ -static local_endpoint_t *local_endpoints; +typedef struct transport_main_ +{ + transport_endpoint_table_t local_endpoints_table; + local_endpoint_t *local_endpoints; + u32 *lcl_endpts_freelist; + u32 port_allocator_seed; + u8 lcl_endpts_cleanup_pending; + clib_spinlock_t local_endpoints_lock; +} transport_main_t; -/* - * Local endpoints pool lock - */ -static clib_spinlock_t local_endpoints_lock; +static transport_main_t tp_main; u8 * format_transport_proto (u8 * s, va_list * args) @@ -76,6 +69,35 @@ format_transport_proto_short (u8 * s, va_list * args) return s; } +const char *transport_flags_str[] = { +#define _(sym, str) str, + foreach_transport_connection_flag +#undef _ +}; + +u8 * +format_transport_flags (u8 *s, va_list *args) +{ + transport_connection_flags_t flags; + int i, last = -1; + + flags = va_arg (*args, transport_connection_flags_t); + + for (i = 0; i < TRANSPORT_CONNECTION_N_FLAGS; i++) + if (flags & (1 << i)) + last = i; + + for (i = 0; i < last; i++) + { + if (flags & (1 << i)) + s = format (s, "%s, ", transport_flags_str[i]); + } + if (last >= 0) + s = format (s, "%s", transport_flags_str[last]); + + return s; +} + u8 * format_transport_connection (u8 * s, va_list * args) { @@ -100,8 +122,8 @@ format_transport_connection (u8 * s, va_list * args) if (transport_connection_is_tx_paced (tc)) s = format (s, "%Upacer: %U\n", format_white_space, indent, format_transport_pacer, &tc->pacer, tc->thread_index); - s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent, - tc->flags); + s = format (s, "%Utransport: flags: %U\n", format_white_space, indent, + format_transport_flags, tc->flags); } return s; } @@ -314,6 +336,8 @@ transport_cleanup_half_open (transport_proto_t tp, u32 conn_index) int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep) { + if (PREDICT_FALSE (!tp_vfts[tp].connect)) + return SESSION_E_TRANSPORT_NO_REG; return tp_vfts[tp].connect (tep); } @@ -341,8 +365,10 @@ transport_reset (transport_proto_t tp, u32 conn_index, u8 thread_index) u32 transport_start_listen (transport_proto_t tp, u32 session_index, - transport_endpoint_t * tep) + transport_endpoint_cfg_t *tep) { + if (PREDICT_FALSE (!tp_vfts[tp].start_listen)) + return SESSION_E_TRANSPORT_NO_REG; return tp_vfts[tp].start_listen (session_index, tep); } @@ -420,67 +446,146 @@ transport_connection_attribute (transport_proto_t tp, u32 conn_index, #define PORT_MASK ((1 << 16)- 1) void -transport_endpoint_del (u32 tepi) +transport_endpoint_free (u32 tepi) { - clib_spinlock_lock_if_init (&local_endpoints_lock); - pool_put_index (local_endpoints, tepi); - clib_spinlock_unlock_if_init (&local_endpoints_lock); + transport_main_t *tm = &tp_main; + pool_put_index (tm->local_endpoints, tepi); } always_inline local_endpoint_t * -transport_endpoint_new (void) +transport_endpoint_alloc (void) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; - pool_get_zero (local_endpoints, lep); + + ASSERT (vlib_get_thread_index () <= transport_cl_thread ()); + + pool_get_aligned_safe (tm->local_endpoints, lep, 0); return lep; } +static void +transport_cleanup_freelist (void) +{ + transport_main_t *tm = &tp_main; + local_endpoint_t *lep; + u32 *lep_indexp; + + clib_spinlock_lock (&tm->local_endpoints_lock); + + vec_foreach (lep_indexp, tm->lcl_endpts_freelist) + { + lep = pool_elt_at_index (tm->local_endpoints, *lep_indexp); + + /* Port re-shared after attempt to cleanup */ + if (lep->refcnt > 0) + continue; + + transport_endpoint_table_del (&tm->local_endpoints_table, lep->proto, + &lep->ep); + transport_endpoint_free (*lep_indexp); + } + + vec_reset_length (tm->lcl_endpts_freelist); + + tm->lcl_endpts_cleanup_pending = 0; + + clib_spinlock_unlock (&tm->local_endpoints_lock); +} + void -transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port) +transport_program_endpoint_cleanup (u32 lepi) { + transport_main_t *tm = &tp_main; + u8 flush_fl = 0; + + /* All workers can free connections. Synchronize access to freelist */ + clib_spinlock_lock (&tm->local_endpoints_lock); + + vec_add1 (tm->lcl_endpts_freelist, lepi); + + /* Avoid accumulating lots of endpoints for cleanup */ + if (!tm->lcl_endpts_cleanup_pending && + vec_len (tm->lcl_endpts_freelist) > 32) + { + tm->lcl_endpts_cleanup_pending = 1; + flush_fl = 1; + } + + clib_spinlock_unlock (&tm->local_endpoints_lock); + + if (flush_fl) + session_send_rpc_evt_to_thread_force (transport_cl_thread (), + transport_cleanup_freelist, 0); +} + +int +transport_release_local_endpoint (u8 proto, ip46_address_t *lcl_ip, u16 port) +{ + transport_main_t *tm = &tp_main; local_endpoint_t *lep; u32 lepi; - /* Cleanup local endpoint if this was an active connect */ - lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip, + lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip, clib_net_to_host_u16 (port)); - if (lepi != ENDPOINT_INVALID_INDEX) + if (lepi == ENDPOINT_INVALID_INDEX) + return -1; + + lep = pool_elt_at_index (tm->local_endpoints, lepi); + + /* Local endpoint no longer in use, program cleanup */ + if (!clib_atomic_sub_fetch (&lep->refcnt, 1)) { - lep = pool_elt_at_index (local_endpoints, lepi); - if (!clib_atomic_sub_fetch (&lep->refcnt, 1)) - { - transport_endpoint_table_del (&local_endpoints_table, proto, - &lep->ep); - transport_endpoint_del (lepi); - } + transport_program_endpoint_cleanup (lepi); + return 0; } + + /* Not an error, just in idication that endpoint was not cleaned up */ + return -1; } -static void -transport_endpoint_mark_used (u8 proto, ip46_address_t * ip, u16 port) +static int +transport_endpoint_mark_used (u8 proto, ip46_address_t *ip, u16 port) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; - clib_spinlock_lock_if_init (&local_endpoints_lock); - lep = transport_endpoint_new (); + u32 tei; + + ASSERT (vlib_get_thread_index () <= transport_cl_thread ()); + + tei = + transport_endpoint_lookup (&tm->local_endpoints_table, proto, ip, port); + if (tei != ENDPOINT_INVALID_INDEX) + return SESSION_E_PORTINUSE; + + /* Pool reallocs with worker barrier */ + lep = transport_endpoint_alloc (); clib_memcpy_fast (&lep->ep.ip, ip, sizeof (*ip)); lep->ep.port = port; + lep->proto = proto; lep->refcnt = 1; - transport_endpoint_table_add (&local_endpoints_table, proto, &lep->ep, - lep - local_endpoints); - clib_spinlock_unlock_if_init (&local_endpoints_lock); + + transport_endpoint_table_add (&tm->local_endpoints_table, proto, &lep->ep, + lep - tm->local_endpoints); + + return 0; } void transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port) { + transport_main_t *tm = &tp_main; local_endpoint_t *lep; u32 lepi; - lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip, + /* Active opens should call this only from a control thread, which are also + * used to allocate and free ports. So, pool has only one writer and + * potentially many readers. Listeners are allocated with barrier */ + lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip, clib_net_to_host_u16 (port)); if (lepi != ENDPOINT_INVALID_INDEX) { - lep = pool_elt_at_index (local_endpoints, lepi); + lep = pool_elt_at_index (tm->local_endpoints, lepi); clib_atomic_add_fetch (&lep->refcnt, 1); } } @@ -490,16 +595,17 @@ transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port) * table to mark the pair as used. */ int -transport_alloc_local_port (u8 proto, ip46_address_t * ip) +transport_alloc_local_port (u8 proto, ip46_address_t *lcl_addr, + transport_endpoint_cfg_t *rmt) { u16 min = 1024, max = 65535; /* XXX configurable ? */ + transport_main_t *tm = &tp_main; int tries, limit; - u32 tei; limit = max - min; - /* Only support active opens from thread 0 */ - ASSERT (vlib_get_thread_index () == 0); + /* Only support active opens from one of ctrl threads */ + ASSERT (vlib_get_thread_index () <= transport_cl_thread ()); /* Search for first free slot */ for (tries = 0; tries < limit; tries++) @@ -509,19 +615,23 @@ transport_alloc_local_port (u8 proto, ip46_address_t * ip) /* Find a port in the specified range */ while (1) { - port = random_u32 (&port_allocator_seed) & PORT_MASK; + port = random_u32 (&tm->port_allocator_seed) & PORT_MASK; if (PREDICT_TRUE (port >= min && port < max)) break; } - /* Look it up. If not found, we're done */ - tei = transport_endpoint_lookup (&local_endpoints_table, proto, ip, - port); - if (tei == ENDPOINT_INVALID_INDEX) - { - transport_endpoint_mark_used (proto, ip, port); - return port; - } + if (!transport_endpoint_mark_used (proto, lcl_addr, port)) + return port; + + /* IP:port pair already in use, check if 6-tuple available */ + if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port, + rmt->port, proto, rmt->is_ip4)) + continue; + + /* 6-tuple is available so increment lcl endpoint refcount */ + transport_share_local_endpoint (proto, lcl_addr, port); + + return port; } return -1; } @@ -584,9 +694,9 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg, ip46_address_t * lcl_addr, u16 * lcl_port) { transport_endpoint_t *rmt = (transport_endpoint_t *) rmt_cfg; + transport_main_t *tm = &tp_main; session_error_t error; int port; - u32 tei; /* * Find the local address @@ -605,12 +715,16 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg, sizeof (rmt_cfg->peer.ip)); } + /* Cleanup freelist if need be */ + if (vec_len (tm->lcl_endpts_freelist)) + transport_cleanup_freelist (); + /* * Allocate source port */ if (rmt_cfg->peer.port == 0) { - port = transport_alloc_local_port (proto, lcl_addr); + port = transport_alloc_local_port (proto, lcl_addr, rmt_cfg); if (port < 1) return SESSION_E_NOPORT; *lcl_port = port; @@ -619,12 +733,19 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_cfg_t * rmt_cfg, { port = clib_net_to_host_u16 (rmt_cfg->peer.port); *lcl_port = port; - tei = transport_endpoint_lookup (&local_endpoints_table, proto, - lcl_addr, port); - if (tei != ENDPOINT_INVALID_INDEX) + + if (!transport_endpoint_mark_used (proto, lcl_addr, port)) + return 0; + + /* IP:port pair already in use, check if 6-tuple available */ + if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port, + rmt->port, proto, rmt->is_ip4)) return SESSION_E_PORTINUSE; - transport_endpoint_mark_used (proto, lcl_addr, port); + /* 6-tuple is available so increment lcl endpoint refcount */ + transport_share_local_endpoint (proto, lcl_addr, port); + + return 0; } return 0; @@ -830,6 +951,9 @@ transport_enable_disable (vlib_main_t * vm, u8 is_en) { if (vft->enable) (vft->enable) (vm, is_en); + + if (vft->update_time) + session_register_update_time_fn (vft->update_time, is_en); } } @@ -838,6 +962,7 @@ transport_init (void) { vlib_thread_main_t *vtm = vlib_get_thread_main (); session_main_t *smm = vnet_get_session_main (); + transport_main_t *tm = &tp_main; u32 num_threads; if (smm->local_endpoints_table_buckets == 0) @@ -846,15 +971,16 @@ transport_init (void) smm->local_endpoints_table_memory = 512 << 20; /* Initialize [port-allocator] random number seed */ - port_allocator_seed = (u32) clib_cpu_time_now (); + tm->port_allocator_seed = (u32) clib_cpu_time_now (); - clib_bihash_init_24_8 (&local_endpoints_table, "local endpoints table", + clib_bihash_init_24_8 (&tm->local_endpoints_table, "local endpoints table", smm->local_endpoints_table_buckets, smm->local_endpoints_table_memory); + clib_spinlock_init (&tm->local_endpoints_lock); + num_threads = 1 /* main thread */ + vtm->n_threads; if (num_threads > 1) { - clib_spinlock_init (&local_endpoints_lock); /* Main not polled if there are workers */ smm->transport_cl_thread = 1; }