case SESSION_IO_EVT_TX:
case SESSION_IO_EVT_TX_FLUSH:
case SESSION_IO_EVT_BUILTIN_RX:
- evt->fifo = data;
+ evt->session_index = *(u32 *) data;
break;
case SESSION_IO_EVT_BUILTIN_TX:
case SESSION_CTRL_EVT_CLOSE:
int
session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
{
- return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type);
+ return session_send_evt_to_thread (&f->master_session_index, 0,
+ f->master_thread_index, evt_type);
}
int
SESSION_CTRL_EVT_CLOSE);
}
+void
+session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
+ void *rpc_args)
+{
+ session_send_evt_to_thread (fp, rpc_args, thread_index,
+ SESSION_CTRL_EVT_RPC);
+}
+
void
session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
{
if (thread_index != vlib_get_thread_index ())
- session_send_evt_to_thread (fp, rpc_args, thread_index,
- SESSION_CTRL_EVT_RPC);
+ session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
else
{
void (*fnp) (void *) = fp;
s = session_alloc (thread_index);
s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
- s->enqueue_epoch = (u64) ~ 0;
s->session_state = SESSION_STATE_CLOSED;
/* Attach transport to session and vice versa */
session_worker_t *wrk;
wrk = session_main_get_worker (s->thread_index);
- if (s->enqueue_epoch != wrk->current_enqueue_epoch[tc->proto])
+ if (!(s->flags & SESSION_F_RX_EVT))
{
- s->enqueue_epoch = wrk->current_enqueue_epoch[tc->proto];
+ s->flags |= SESSION_F_RX_EVT;
vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
}
}
session_worker_t *wrk;
wrk = session_main_get_worker (s->thread_index);
- if (s->enqueue_epoch != wrk->current_enqueue_epoch[proto])
+ if (!(s->flags & SESSION_F_RX_EVT))
{
- s->enqueue_epoch = wrk->current_enqueue_epoch[proto];
+ s->flags |= SESSION_F_RX_EVT;
vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
}
}
}));
/* *INDENT-ON* */
+ s->flags &= ~SESSION_F_RX_EVT;
if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
SESSION_IO_EVT_RX)))
return -1;
continue;
}
- if (svm_fifo_is_empty (s->rx_fifo))
+ if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo))
continue;
if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
vec_reset_length (indices);
wrk->session_to_enqueue[transport_proto] = indices;
- wrk->current_enqueue_epoch[transport_proto]++;
return errors;
}
session_transport_closing_notify (transport_connection_t * tc)
{
app_worker_t *app_wrk;
- application_t *app;
session_t *s;
s = session_get (tc->s_index, tc->thread_index);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
return;
s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
- app_wrk = app_worker_get_if_valid (s->app_wrk_index);
- if (!app_wrk)
- return;
- app = application_get (app_wrk->app_index);
- app->cb_fns.session_disconnect_callback (s);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app_worker_close_notify (app_wrk, s);
}
/**
transport_connection_t *tc;
transport_endpoint_cfg_t *tep;
app_worker_t *app_wrk;
+ session_handle_t sh;
session_t *s;
int rv;
return -1;
}
+ sh = session_handle (s);
+ session_lookup_add_connection (tc, sh);
+
return app_worker_connect_notify (app_wrk, s, opaque);
}
vlib_thread_main_t *vtm = vlib_get_thread_main ();
u32 num_threads, preallocated_sessions_per_worker;
session_worker_t *wrk;
- int i, j;
+ int i;
num_threads = 1 /* main thread */ + vtm->n_threads;
/* Allocate cache line aligned worker contexts */
vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
- for (i = 0; i < TRANSPORT_N_PROTO; i++)
- {
- for (j = 0; j < num_threads; j++)
- smm->wrk[j].current_enqueue_epoch[i] = 1;
- }
-
for (i = 0; i < num_threads; i++)
{
wrk = &smm->wrk[i];