#include <vnet/session/session.h>
#include <vnet/fib/fib.h>
-typedef struct local_endpoint_
-{
- transport_endpoint_t ep;
- int refcnt;
-} local_endpoint_t;
-
/**
* Per-type vector of transport protocol virtual function tables
*/
transport_proto_vft_t *tp_vfts;
-/*
- * Port allocator seed
- */
-static u32 port_allocator_seed;
+typedef struct local_endpoint_
+{
+ transport_endpoint_t ep;
+ transport_proto_t proto;
+ int refcnt;
+} local_endpoint_t;
-/*
- * Local endpoints table
- */
-static transport_endpoint_table_t local_endpoints_table;
+typedef struct transport_main_
+{
+ transport_endpoint_table_t local_endpoints_table;
+ local_endpoint_t *local_endpoints;
+ u32 *lcl_endpts_freelist;
+ u32 port_allocator_seed;
+ u16 port_allocator_min_src_port;
+ u16 port_allocator_max_src_port;
+ u8 lcl_endpts_cleanup_pending;
+ clib_spinlock_t local_endpoints_lock;
+} transport_main_t;
-/*
- * Pool of local endpoints
- */
-static local_endpoint_t *local_endpoints;
-
-/*
- * Local endpoints pool lock
- */
-static clib_spinlock_t local_endpoints_lock;
+static transport_main_t tp_main;
u8 *
format_transport_proto (u8 * s, va_list * args)
return s;
}
+const char *transport_flags_str[] = {
+#define _(sym, str) str,
+ foreach_transport_connection_flag
+#undef _
+};
+
+u8 *
+format_transport_flags (u8 *s, va_list *args)
+{
+ transport_connection_flags_t flags;
+ int i, last = -1;
+
+ flags = va_arg (*args, transport_connection_flags_t);
+
+ for (i = 0; i < TRANSPORT_CONNECTION_N_FLAGS; i++)
+ if (flags & (1 << i))
+ last = i;
+
+ for (i = 0; i < last; i++)
+ {
+ if (flags & (1 << i))
+ s = format (s, "%s, ", transport_flags_str[i]);
+ }
+ if (last >= 0)
+ s = format (s, "%s", transport_flags_str[last]);
+
+ return s;
+}
+
u8 *
format_transport_connection (u8 * s, va_list * args)
{
if (transport_connection_is_tx_paced (tc))
s = format (s, "%Upacer: %U\n", format_white_space, indent,
format_transport_pacer, &tc->pacer, tc->thread_index);
- s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent,
- tc->flags);
+ s = format (s, "%Utransport: flags: %U\n", format_white_space, indent,
+ format_transport_flags, tc->flags);
}
return s;
}
format_transport_half_open_connection (u8 * s, va_list * args)
{
u32 transport_proto = va_arg (*args, u32);
- u32 ho_index = va_arg (*args, u32);
transport_proto_vft_t *tp_vft;
tp_vft = transport_protocol_get_vft (transport_proto);
if (!tp_vft)
return s;
- s = format (s, "%U", tp_vft->format_half_open, ho_index);
+ s = (tp_vft->format_half_open) (s, args);
return s;
}
int
transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep)
{
+ if (PREDICT_FALSE (!tp_vfts[tp].connect))
+ return SESSION_E_TRANSPORT_NO_REG;
return tp_vfts[tp].connect (tep);
}
transport_start_listen (transport_proto_t tp, u32 session_index,
transport_endpoint_cfg_t *tep)
{
+ if (PREDICT_FALSE (!tp_vfts[tp].start_listen))
+ return SESSION_E_TRANSPORT_NO_REG;
return tp_vfts[tp].start_listen (session_index, tep);
}
void
transport_endpoint_free (u32 tepi)
{
- pool_put_index (local_endpoints, tepi);
-}
-
-static void
-transport_endpoint_pool_realloc_rpc (void *rpc_args)
-{
- pool_realloc_safe_aligned (local_endpoints, 0);
+ transport_main_t *tm = &tp_main;
+ pool_put_index (tm->local_endpoints, tepi);
}
always_inline local_endpoint_t *
transport_endpoint_alloc (void)
{
+ transport_main_t *tm = &tp_main;
local_endpoint_t *lep;
ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
- pool_get_aligned_safe (local_endpoints, lep, transport_cl_thread (),
- transport_endpoint_pool_realloc_rpc, 0);
+
+ pool_get_aligned_safe (tm->local_endpoints, lep, 0);
return lep;
}
+static void
+transport_cleanup_freelist (void)
+{
+ transport_main_t *tm = &tp_main;
+ local_endpoint_t *lep;
+ u32 *lep_indexp;
+
+ clib_spinlock_lock (&tm->local_endpoints_lock);
+
+ vec_foreach (lep_indexp, tm->lcl_endpts_freelist)
+ {
+ lep = pool_elt_at_index (tm->local_endpoints, *lep_indexp);
+
+ /* Port re-shared after attempt to cleanup */
+ if (lep->refcnt > 0)
+ continue;
+
+ transport_endpoint_table_del (&tm->local_endpoints_table, lep->proto,
+ &lep->ep);
+ transport_endpoint_free (*lep_indexp);
+ }
+
+ vec_reset_length (tm->lcl_endpts_freelist);
+
+ tm->lcl_endpts_cleanup_pending = 0;
+
+ clib_spinlock_unlock (&tm->local_endpoints_lock);
+}
+
void
-transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port)
+transport_program_endpoint_cleanup (u32 lepi)
{
+ transport_main_t *tm = &tp_main;
+ u8 flush_fl = 0;
+
+ /* All workers can free connections. Synchronize access to freelist */
+ clib_spinlock_lock (&tm->local_endpoints_lock);
+
+ vec_add1 (tm->lcl_endpts_freelist, lepi);
+
+ /* Avoid accumulating lots of endpoints for cleanup */
+ if (!tm->lcl_endpts_cleanup_pending &&
+ vec_len (tm->lcl_endpts_freelist) > 32)
+ {
+ tm->lcl_endpts_cleanup_pending = 1;
+ flush_fl = 1;
+ }
+
+ clib_spinlock_unlock (&tm->local_endpoints_lock);
+
+ if (flush_fl)
+ session_send_rpc_evt_to_thread_force (transport_cl_thread (),
+ transport_cleanup_freelist, 0);
+}
+
+int
+transport_release_local_endpoint (u8 proto, ip46_address_t *lcl_ip, u16 port)
+{
+ transport_main_t *tm = &tp_main;
local_endpoint_t *lep;
u32 lepi;
- /* Cleanup local endpoint if this was an active connect */
- lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip,
+ lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip,
clib_net_to_host_u16 (port));
if (lepi == ENDPOINT_INVALID_INDEX)
- return;
+ return -1;
+
+ lep = pool_elt_at_index (tm->local_endpoints, lepi);
- lep = pool_elt_at_index (local_endpoints, lepi);
+ /* Local endpoint no longer in use, program cleanup */
if (!clib_atomic_sub_fetch (&lep->refcnt, 1))
{
- transport_endpoint_table_del (&local_endpoints_table, proto, &lep->ep);
-
- /* All workers can free connections. Synchronize access to pool */
- clib_spinlock_lock (&local_endpoints_lock);
- transport_endpoint_free (lepi);
- clib_spinlock_unlock (&local_endpoints_lock);
+ transport_program_endpoint_cleanup (lepi);
+ return 0;
}
+
+ /* Not an error, just in idication that endpoint was not cleaned up */
+ return -1;
}
static int
transport_endpoint_mark_used (u8 proto, ip46_address_t *ip, u16 port)
{
+ transport_main_t *tm = &tp_main;
local_endpoint_t *lep;
u32 tei;
ASSERT (vlib_get_thread_index () <= transport_cl_thread ());
- tei = transport_endpoint_lookup (&local_endpoints_table, proto, ip, port);
+ tei =
+ transport_endpoint_lookup (&tm->local_endpoints_table, proto, ip, port);
if (tei != ENDPOINT_INVALID_INDEX)
return SESSION_E_PORTINUSE;
lep = transport_endpoint_alloc ();
clib_memcpy_fast (&lep->ep.ip, ip, sizeof (*ip));
lep->ep.port = port;
+ lep->proto = proto;
lep->refcnt = 1;
- transport_endpoint_table_add (&local_endpoints_table, proto, &lep->ep,
- lep - local_endpoints);
+ transport_endpoint_table_add (&tm->local_endpoints_table, proto, &lep->ep,
+ lep - tm->local_endpoints);
return 0;
}
void
transport_share_local_endpoint (u8 proto, ip46_address_t * lcl_ip, u16 port)
{
+ transport_main_t *tm = &tp_main;
local_endpoint_t *lep;
u32 lepi;
- lepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip,
+ /* Active opens should call this only from a control thread, which are also
+ * used to allocate and free ports. So, pool has only one writer and
+ * potentially many readers. Listeners are allocated with barrier */
+ lepi = transport_endpoint_lookup (&tm->local_endpoints_table, proto, lcl_ip,
clib_net_to_host_u16 (port));
if (lepi != ENDPOINT_INVALID_INDEX)
{
- lep = pool_elt_at_index (local_endpoints, lepi);
+ lep = pool_elt_at_index (tm->local_endpoints, lepi);
clib_atomic_add_fetch (&lep->refcnt, 1);
}
}
* table to mark the pair as used.
*/
int
-transport_alloc_local_port (u8 proto, ip46_address_t * ip)
+transport_alloc_local_port (u8 proto, ip46_address_t *lcl_addr,
+ transport_endpoint_cfg_t *rmt)
{
- u16 min = 1024, max = 65535; /* XXX configurable ? */
+ transport_main_t *tm = &tp_main;
+ u16 min = tm->port_allocator_min_src_port;
+ u16 max = tm->port_allocator_max_src_port;
int tries, limit;
limit = max - min;
/* Find a port in the specified range */
while (1)
{
- port = random_u32 (&port_allocator_seed) & PORT_MASK;
+ port = random_u32 (&tm->port_allocator_seed) & PORT_MASK;
if (PREDICT_TRUE (port >= min && port < max))
break;
}
- if (!transport_endpoint_mark_used (proto, ip, port))
+ if (!transport_endpoint_mark_used (proto, lcl_addr, port))
return port;
+
+ /* IP:port pair already in use, check if 6-tuple available */
+ if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port,
+ rmt->port, proto, rmt->is_ip4))
+ continue;
+
+ /* 6-tuple is available so increment lcl endpoint refcount */
+ transport_share_local_endpoint (proto, lcl_addr, port);
+
+ return port;
}
return -1;
}
ip46_address_t * lcl_addr, u16 * lcl_port)
{
transport_endpoint_t *rmt = (transport_endpoint_t *) rmt_cfg;
+ transport_main_t *tm = &tp_main;
session_error_t error;
int port;
sizeof (rmt_cfg->peer.ip));
}
+ /* Cleanup freelist if need be */
+ if (vec_len (tm->lcl_endpts_freelist))
+ transport_cleanup_freelist ();
+
/*
* Allocate source port
*/
if (rmt_cfg->peer.port == 0)
{
- port = transport_alloc_local_port (proto, lcl_addr);
+ port = transport_alloc_local_port (proto, lcl_addr, rmt_cfg);
if (port < 1)
return SESSION_E_NOPORT;
*lcl_port = port;
port = clib_net_to_host_u16 (rmt_cfg->peer.port);
*lcl_port = port;
- return transport_endpoint_mark_used (proto, lcl_addr, port);
+ if (!transport_endpoint_mark_used (proto, lcl_addr, port))
+ return 0;
+
+ /* IP:port pair already in use, check if 6-tuple available */
+ if (session_lookup_connection (rmt->fib_index, lcl_addr, &rmt->ip, port,
+ rmt->port, proto, rmt->is_ip4))
+ return SESSION_E_PORTINUSE;
+
+ /* 6-tuple is available so increment lcl endpoint refcount */
+ transport_share_local_endpoint (proto, lcl_addr, port);
+
+ return 0;
}
return 0;
{
vlib_thread_main_t *vtm = vlib_get_thread_main ();
session_main_t *smm = vnet_get_session_main ();
+ transport_main_t *tm = &tp_main;
u32 num_threads;
if (smm->local_endpoints_table_buckets == 0)
smm->local_endpoints_table_memory = 512 << 20;
/* Initialize [port-allocator] random number seed */
- port_allocator_seed = (u32) clib_cpu_time_now ();
+ tm->port_allocator_seed = (u32) clib_cpu_time_now ();
+ tm->port_allocator_min_src_port = smm->port_allocator_min_src_port;
+ tm->port_allocator_max_src_port = smm->port_allocator_max_src_port;
- clib_bihash_init_24_8 (&local_endpoints_table, "local endpoints table",
+ clib_bihash_init_24_8 (&tm->local_endpoints_table, "local endpoints table",
smm->local_endpoints_table_buckets,
smm->local_endpoints_table_memory);
- clib_spinlock_init (&local_endpoints_lock);
+ clib_spinlock_init (&tm->local_endpoints_lock);
num_threads = 1 /* main thread */ + vtm->n_threads;
if (num_threads > 1)