{
ec_main_t *ecm = &ec_main;
vnet_connect_args_t _a = {}, *a = &_a;
- vlib_main_t *vm = vlib_get_main ();
int rv, needs_crypto;
u32 n_clients, ci;
ci = ecm->connect_conn_index;
- vlib_worker_thread_barrier_sync (vm);
-
while (ci < n_clients)
{
/* Crude pacing for call setups */
ci += 1;
}
- vlib_worker_thread_barrier_release (vm);
-
if (ci < ecm->expected_connections && ecm->run_test != EC_EXITING)
ec_program_connects ();
void
ec_program_connects (void)
{
- session_send_rpc_evt_to_thread_force (0, ec_connect_rpc, 0);
+ session_send_rpc_evt_to_thread_force (transport_cl_thread (), ec_connect_rpc,
+ 0);
}
#define ec_cli(_fmt, _args...) \
SESSION_TEST ((tc->lcl_port == placeholder_client_port),
"ports should be equal");
- /* These sessions, because of the way they're established are pinned to
- * main thread, even when we have workers and we avoid polling main thread,
- * i.e., we can't cleanup pending disconnects, so force cleanup for both
- */
- session_transport_cleanup (s);
- s = session_get (accepted_session_index, accepted_session_thread);
- session_transport_cleanup (s);
-
vnet_app_detach_args_t detach_args = {
.app_index = server_index,
.api_client_index = ~0,
app_worker_t *client_wrk;
application_t *client;
- ASSERT (vlib_thread_is_main_w_barrier ());
+ ASSERT (session_vlib_thread_is_cl_thread ());
if (session_endpoint_is_zero (&a->sep))
return SESSION_E_INVALID_RMT_IP;
{
session_handle_t *shp;
- ASSERT (vlib_get_thread_index () == 0);
+ ASSERT (session_vlib_thread_is_cl_thread ());
pool_get (app_wrk->half_open_table, shp);
*shp = sh;
app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
{
application_t *app = application_get (app_wrk->app_index);
- ASSERT (vlib_get_thread_index () <= 1);
+ ASSERT (session_vlib_thread_is_cl_thread ());
pool_put_index (app_wrk->half_open_table, s->ho_index);
if (app->cb_fns.half_open_cleanup_callback)
app->cb_fns.half_open_cleanup_callback (s);
session_half_open_delete_notify (transport_connection_t *tc)
{
/* Notification from ctrl thread accepted without rpc */
- if (!tc->thread_index)
+ if (tc->thread_index == transport_cl_thread ())
{
session_half_open_free (ho_session_get (tc->s_index));
}
else
{
void *args = uword_to_pointer ((uword) tc->s_index, void *);
- session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
- args);
+ session_send_rpc_evt_to_thread_force (transport_cl_thread (),
+ session_half_open_free_rpc, args);
}
}
/** Flag that is set if main thread signaled to handle connects */
u32 n_pending_connects;
- /** Main thread loops in poll mode without a connect */
- u32 no_connect_loops;
-
/** List head for first worker evts pending handling on main */
clib_llist_index_t evts_pending_main;
* Trade memory for speed, for now */
u32 *session_type_to_next;
- /** Thread for cl and ho that rely on cl allocs */
+ /** Thread used for allocating active open connections, i.e., half-opens
+ * for transports like tcp, and sessions that will be migrated for cl
+ * transports like udp. If vpp has workers, this will be first worker. */
u32 transport_cl_thread;
transport_proto_t last_transport_proto_type;
return session_main.transport_cl_thread;
}
+always_inline u32
+session_vlib_thread_is_cl_thread (void)
+{
+ return (vlib_get_thread_index () == transport_cl_thread () ||
+ vlib_thread_is_main_w_barrier ());
+}
+
/*
* Listen sessions
*/
ho_session_alloc (void)
{
session_t *s;
- ASSERT (vlib_get_thread_index () == 0);
- s = session_alloc (0);
+ ASSERT (session_vlib_thread_is_cl_thread ());
+ s = session_alloc (transport_cl_thread ());
s->session_state = SESSION_STATE_CONNECTING;
s->flags |= SESSION_F_HALF_OPEN;
- /* Not ideal. Half-opens are only allocated from main with worker barrier
- * but can be cleaned up, i.e., session_half_open_free, from main without
- * a barrier. In debug images, the free_bitmap can grow while workers peek
- * the sessions pool, e.g., session_half_open_migrate_notify, and as a
- * result crash while validating the session. To avoid this, grow the bitmap
- * now. */
- if (CLIB_DEBUG)
- {
- session_t *sp = session_main.wrk[0].sessions;
- clib_bitmap_validate (pool_header (sp)->free_bitmap,
- s->session_index + 1);
- }
return s;
}
always_inline session_t *
ho_session_get (u32 ho_index)
{
- return session_get (ho_index, 0 /* half-open thread */);
+ return session_get (ho_index, transport_cl_thread ());
}
always_inline void
session_mq_handle_connects_rpc (void *arg)
{
u32 max_connects = 32, n_connects = 0;
- vlib_main_t *vm = vlib_get_main ();
session_evt_elt_t *he, *elt, *next;
- session_worker_t *fwrk, *wrk;
+ session_worker_t *fwrk;
- ASSERT (vlib_get_thread_index () == 0);
+ ASSERT (session_vlib_thread_is_cl_thread ());
/* Pending connects on linked list pertaining to first worker */
- fwrk = session_main_get_worker (1);
+ fwrk = session_main_get_worker (transport_cl_thread ());
if (!fwrk->n_pending_connects)
- goto update_state;
-
- vlib_worker_thread_barrier_sync (vm);
+ return;
he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
elt = clib_llist_next (fwrk->event_elts, evt_list, he);
- /* Avoid holding the barrier for too long */
+ /* Avoid holding the worker for too long */
while (n_connects < max_connects && elt != he)
{
next = clib_llist_next (fwrk->event_elts, evt_list, elt);
/* Decrement with worker barrier */
fwrk->n_pending_connects -= n_connects;
-
- vlib_worker_thread_barrier_release (vm);
-
-update_state:
-
- /* Switch worker to poll mode if it was in interrupt mode and had work or
- * back to interrupt if threshold of loops without a connect is passed.
- * While in poll mode, reprogram connects rpc */
- wrk = session_main_get_worker (0);
- if (wrk->state != SESSION_WRK_POLLING)
- {
- if (n_connects)
- {
- session_wrk_set_state (wrk, SESSION_WRK_POLLING);
- vlib_node_set_state (vm, session_queue_node.index,
- VLIB_NODE_STATE_POLLING);
- wrk->no_connect_loops = 0;
- }
- }
- else
- {
- if (!n_connects)
- {
- if (++wrk->no_connect_loops > 1e5)
- {
- session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
- vlib_node_set_state (vm, session_queue_node.index,
- VLIB_NODE_STATE_INTERRUPT);
- }
- }
- else
- wrk->no_connect_loops = 0;
- }
-
- if (wrk->state == SESSION_WRK_POLLING)
+ if (fwrk->n_pending_connects > 0)
{
- elt = session_evt_alloc_ctrl (wrk);
- elt->evt.event_type = SESSION_CTRL_EVT_RPC;
- elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
+ session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index,
+ session_mq_handle_connects_rpc, 0);
}
}
u32 thread_index = wrk - session_main.wrk;
session_evt_elt_t *he;
- /* No workers, so just deal with the connect now */
- if (PREDICT_FALSE (!thread_index))
+ if (PREDICT_FALSE (thread_index > transport_cl_thread ()))
{
- session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
+ clib_warning ("Connect on wrong thread. Dropping");
return;
}
- if (PREDICT_FALSE (thread_index != 1))
+ /* If on worker, check if main has any pending messages. Avoids reordering
+ * with other control messages that need to be handled by main
+ */
+ if (thread_index)
{
- clib_warning ("Connect on wrong thread. Dropping");
- return;
+ he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
+
+ /* Events pending on main, postpone to avoid reordering */
+ if (!clib_llist_is_empty (wrk->event_elts, evt_list, he))
+ {
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
+ return;
+ }
}
- /* Add to pending list to be handled by main thread */
+ /* Add to pending list to be handled by first worker */
he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
wrk->n_pending_connects += 1;
if (wrk->n_pending_connects == 1)
{
- vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
- session_queue_node.index);
- session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
+ session_send_rpc_evt_to_thread_force (thread_index,
+ session_mq_handle_connects_rpc, 0);
}
}
case SESSION_CTRL_EVT_ACCEPTED_REPLY:
session_mq_accepted_reply_handler (fwrk, elt);
break;
+ case SESSION_CTRL_EVT_CONNECT:
+ session_mq_connect_handler (fwrk, elt);
+ break;
default:
clib_warning ("unhandled %u", elt->evt.event_type);
ALWAYS_ASSERT (0);
/* Regrab element in case pool moved */
elt = clib_llist_elt (fwrk->event_elts, ei);
- session_evt_ctrl_data_free (fwrk, elt);
- clib_llist_put (fwrk->event_elts, elt);
+ if (!clib_llist_elt_is_linked (elt, evt_list))
+ {
+ session_evt_ctrl_data_free (fwrk, elt);
+ clib_llist_put (fwrk->event_elts, elt);
+ }
ei = next_ei;
}
clib_spinlock_unlock (&tm->local_endpoints_lock);
if (flush_fl)
- session_send_rpc_evt_to_thread_force (0, transport_cleanup_freelist, 0);
+ session_send_rpc_evt_to_thread_force (transport_cl_thread (),
+ transport_cleanup_freelist, 0);
}
int
static tcp_connection_t *
tcp_half_open_connection_alloc (void)
{
- ASSERT (vlib_get_thread_index () == 0);
- return tcp_connection_alloc (0);
+ return tcp_connection_alloc (transport_cl_thread ());
}
/**
static void
tcp_half_open_connection_free (tcp_connection_t * tc)
{
- ASSERT (vlib_get_thread_index () == 0);
+ ASSERT (vlib_get_thread_index () == tc->c_thread_index ||
+ vlib_thread_is_main_w_barrier ());
return tcp_connection_free (tc);
}
always_inline tcp_connection_t *
tcp_half_open_connection_get (u32 conn_index)
{
- return tcp_connection_get (conn_index, 0);
+ return tcp_connection_get (conn_index, transport_cl_thread ());
}
/**
#include <vnet/tcp/tcp_types.h>
+static inline u8
+tcp_timer_thread_is_valid (tcp_connection_t *tc)
+{
+ return ((tc->c_thread_index == vlib_get_thread_index ()) ||
+ vlib_thread_is_main_w_barrier ());
+}
+
always_inline void
-tcp_timer_set (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id,
+tcp_timer_set (tcp_timer_wheel_t *tw, tcp_connection_t *tc, u8 timer_id,
u32 interval)
{
- ASSERT (tc->c_thread_index == vlib_get_thread_index ());
+ ASSERT (tcp_timer_thread_is_valid (tc));
ASSERT (tc->timers[timer_id] == TCP_TIMER_HANDLE_INVALID);
tc->timers[timer_id] = tw_timer_start_tcp_twsl (tw, tc->c_c_index,
timer_id, interval);
always_inline void
tcp_timer_reset (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id)
{
- ASSERT (tc->c_thread_index == vlib_get_thread_index ());
+ ASSERT (tcp_timer_thread_is_valid (tc));
tc->pending_timers &= ~(1 << timer_id);
if (tc->timers[timer_id] == TCP_TIMER_HANDLE_INVALID)
return;
tcp_timer_update (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id,
u32 interval)
{
- ASSERT (tc->c_thread_index == vlib_get_thread_index ());
+ ASSERT (tcp_timer_thread_is_valid (tc));
if (tc->timers[timer_id] != TCP_TIMER_HANDLE_INVALID)
tw_timer_update_tcp_twsl (tw, tc->timers[timer_id], interval);
else
clib_memset (ctx, 0, sizeof (*ctx));
ctx->c_c_index = ctx - tm->half_open_ctx_pool;
+ ctx->c_thread_index = transport_cl_thread ();
return ctx->c_c_index;
}