session_event_t *evt;
svm_msg_q_msg_t msg;
svm_msg_q_t *mq;
- u32 tries = 0, max_tries;
mq = session_main_get_vpp_event_queue (thread_index);
- while (svm_msg_q_try_lock (mq))
- {
- max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
- if (tries++ == max_tries)
- {
- SESSION_DBG ("failed to enqueue evt");
- return -1;
- }
- }
+ if (PREDICT_FALSE (svm_msg_q_lock (mq)))
+ return -1;
if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
{
svm_msg_q_unlock (mq);
evt->rpc_args.fp = data;
evt->rpc_args.arg = args;
break;
+ case SESSION_IO_EVT_RX:
case SESSION_IO_EVT_TX:
case SESSION_IO_EVT_TX_FLUSH:
case SESSION_IO_EVT_BUILTIN_RX:
- evt->fifo = data;
+ evt->session_index = *(u32 *) data;
break;
case SESSION_IO_EVT_BUILTIN_TX:
case SESSION_CTRL_EVT_CLOSE:
int
session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
{
- return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type);
+ return session_send_evt_to_thread (&f->master_session_index, 0,
+ f->master_thread_index, evt_type);
}
int
SESSION_CTRL_EVT_CLOSE);
}
+void
+session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
+ void *rpc_args)
+{
+ session_send_evt_to_thread (fp, rpc_args, thread_index,
+ SESSION_CTRL_EVT_RPC);
+}
+
void
session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
{
if (thread_index != vlib_get_thread_index ())
- session_send_evt_to_thread (fp, rpc_args, thread_index,
- SESSION_CTRL_EVT_RPC);
+ session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
else
{
void (*fnp) (void *) = fp;
session_worker_t *wrk;
session_event_t *evt;
- if (!session_has_transport (s))
- {
- /* Polling may not be enabled on main thread so close now */
- session_transport_close (s);
- return;
- }
-
/* If we are in the handler thread, or being called with the worker barrier
* held, just append a new event to pending disconnects vector. */
if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
clib_memset (s, 0, sizeof (*s));
s->session_index = s - wrk->sessions;
s->thread_index = thread_index;
+ s->app_index = APP_INVALID_INDEX;
return s;
}
void
session_free (session_t * s)
{
- pool_put (session_main.wrk[s->thread_index].sessions, s);
if (CLIB_DEBUG)
- clib_memset (s, 0xFA, sizeof (*s));
+ {
+ u8 thread_index = s->thread_index;
+ clib_memset (s, 0xFA, sizeof (*s));
+ pool_put (session_main.wrk[thread_index].sessions, s);
+ return;
+ }
+ SESSION_EVT_DBG (SESSION_EVT_FREE, s);
+ pool_put (session_main.wrk[s->thread_index].sessions, s);
}
void
s = session_alloc (thread_index);
s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
- s->enqueue_epoch = (u64) ~ 0;
s->session_state = SESSION_STATE_CLOSED;
/* Attach transport to session and vice versa */
continue;
if (is_in_order)
{
- rv = svm_fifo_enqueue_nowait (s->rx_fifo, len, data);
+ rv = svm_fifo_enqueue (s->rx_fifo, len, data);
if (rv == len)
{
written += rv;
if (is_in_order)
{
- enqueued = svm_fifo_enqueue_nowait (s->rx_fifo,
- b->current_length,
- vlib_buffer_get_current (b));
+ enqueued = svm_fifo_enqueue (s->rx_fifo,
+ b->current_length,
+ vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
&& enqueued >= 0))
{
session_worker_t *wrk;
wrk = session_main_get_worker (s->thread_index);
- if (s->enqueue_epoch != wrk->current_enqueue_epoch[tc->proto])
+ if (!(s->flags & SESSION_F_RX_EVT))
{
- s->enqueue_epoch = wrk->current_enqueue_epoch[tc->proto];
+ s->flags |= SESSION_F_RX_EVT;
vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
}
}
{
int enqueued = 0, rv, in_order_off;
- ASSERT (svm_fifo_max_enqueue (s->rx_fifo)
+ ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
>= b->current_length + sizeof (*hdr));
- svm_fifo_enqueue_nowait (s->rx_fifo, sizeof (session_dgram_hdr_t),
- (u8 *) hdr);
- enqueued = svm_fifo_enqueue_nowait (s->rx_fifo, b->current_length,
- vlib_buffer_get_current (b));
+ svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
+ enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
+ vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
{
in_order_off = enqueued > b->current_length ? enqueued : 0;
session_worker_t *wrk;
wrk = session_main_get_worker (s->thread_index);
- if (s->enqueue_epoch != wrk->current_enqueue_epoch[proto])
+ if (!(s->flags & SESSION_F_RX_EVT))
{
- s->enqueue_epoch = wrk->current_enqueue_epoch[proto];
+ s->flags |= SESSION_F_RX_EVT;
vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
}
}
session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
{
session_t *s = session_get (tc->s_index, tc->thread_index);
- return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
+ u32 rv;
+
+ rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
+
+ if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
+ session_dequeue_notify (s);
+
+ return rv;
}
static inline int
* @return 0 on success or negative number if failed to send notification.
*/
static inline int
-session_enqueue_notify (session_t * s)
+session_enqueue_notify_inline (session_t * s)
{
app_worker_t *app_wrk;
+ u32 session_index;
+ u8 n_subscribers;
+
+ session_index = s->session_index;
+ n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
app_wrk = app_worker_get_if_valid (s->app_wrk_index);
if (PREDICT_FALSE (!app_wrk))
/* *INDENT-OFF* */
SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
ed->data[0] = SESSION_IO_EVT_RX;
- ed->data[1] = svm_fifo_max_dequeue (s->rx_fifo);
+ ed->data[1] = svm_fifo_max_dequeue_prod (s->rx_fifo);
}));
/* *INDENT-ON* */
+ s->flags &= ~SESSION_F_RX_EVT;
if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
SESSION_IO_EVT_RX)))
return -1;
- if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo)))
- return session_notify_subscribers (app_wrk->app_index, s,
- s->rx_fifo, SESSION_IO_EVT_RX);
+ if (PREDICT_FALSE (n_subscribers))
+ {
+ s = session_get (session_index, vlib_get_thread_index ());
+ return session_notify_subscribers (app_wrk->app_index, s,
+ s->rx_fifo, SESSION_IO_EVT_RX);
+ }
return 0;
}
+int
+session_enqueue_notify (session_t * s)
+{
+ return session_enqueue_notify_inline (s);
+}
+
int
session_dequeue_notify (session_t * s)
{
return session_notify_subscribers (app_wrk->app_index, s,
s->tx_fifo, SESSION_IO_EVT_TX);
- svm_fifo_clear_tx_ntf (s->tx_fifo);
+ svm_fifo_clear_deq_ntf (s->tx_fifo);
return 0;
}
errors++;
continue;
}
- if (PREDICT_FALSE (session_enqueue_notify (s)))
+
+ if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
errors++;
}
vec_reset_length (indices);
wrk->session_to_enqueue[transport_proto] = indices;
- wrk->current_enqueue_epoch[transport_proto]++;
return errors;
}
return errors;
}
-int
-session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
+static inline int
+session_stream_connect_notify_inline (transport_connection_t * tc, u8 is_fail,
+ session_state_t opened_state)
{
u32 opaque = 0, new_ti, new_si;
app_worker_t *app_wrk;
return -1;
}
+ s = session_get (new_si, new_ti);
+ s->session_state = opened_state;
+ session_lookup_add_connection (tc, session_handle (s));
+
if (app_worker_connect_notify (app_wrk, s, opaque))
{
s = session_get (new_si, new_ti);
return -1;
}
- s = session_get (new_si, new_ti);
- s->session_state = SESSION_STATE_READY;
- session_lookup_add_connection (tc, session_handle (s));
-
return 0;
}
+int
+session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
+{
+ return session_stream_connect_notify_inline (tc, is_fail,
+ SESSION_STATE_READY);
+}
+
+int
+session_ho_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
+{
+ return session_stream_connect_notify_inline (tc, is_fail,
+ SESSION_STATE_OPENED);
+}
+
typedef struct _session_switch_pool_args
{
u32 session_index;
session_transport_closing_notify (transport_connection_t * tc)
{
app_worker_t *app_wrk;
- application_t *app;
session_t *s;
s = session_get (tc->s_index, tc->thread_index);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
return;
s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
- app_wrk = app_worker_get_if_valid (s->app_wrk_index);
- if (!app_wrk)
- return;
- app = application_get (app_wrk->app_index);
- app->cb_fns.session_disconnect_callback (s);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app_worker_close_notify (app_wrk, s);
}
/**
if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
return;
+ /* Transport thinks that app requested close but it actually didn't.
+ * Can happen for tcp if fin and rst are received in close succession. */
+ if (s->session_state == SESSION_STATE_READY)
+ {
+ session_transport_closing_notify (tc);
+ svm_fifo_dequeue_drop_all (s->tx_fifo);
+ }
/* If app close has not been received or has not yet resulted in
* a transport close, only mark the session transport as closed */
- if (s->session_state <= SESSION_STATE_CLOSING)
+ else if (s->session_state <= SESSION_STATE_CLOSING)
{
session_lookup_del_session (s);
s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
void
session_transport_reset_notify (transport_connection_t * tc)
{
- session_t *s;
app_worker_t *app_wrk;
- application_t *app;
+ session_t *s;
+
s = session_get (tc->s_index, tc->thread_index);
svm_fifo_dequeue_drop_all (s->tx_fifo);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
return;
s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
app_wrk = app_worker_get (s->app_wrk_index);
- app = application_get (app_wrk->app_index);
- app->cb_fns.session_reset_callback (s);
+ app_worker_reset_notify (app_wrk, s);
}
int
*/
int
session_stream_accept (transport_connection_t * tc, u32 listener_index,
- u8 notify)
+ u32 thread_index, u8 notify)
{
session_t *s;
int rv;
s = session_alloc_for_connection (tc);
- s->listener_index = listener_index;
+ s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
s->session_state = SESSION_STATE_CREATED;
if ((rv = app_worker_init_accepted (s)))
transport_connection_t *tc;
transport_endpoint_cfg_t *tep;
app_worker_t *app_wrk;
+ session_handle_t sh;
session_t *s;
int rv;
return -1;
}
+ sh = session_handle (s);
+ session_lookup_add_connection (tc, sh);
return app_worker_connect_notify (app_wrk, s, opaque);
}
* thing but better than allocating a separate half-open pool.
*/
tc->s_index = opaque;
+ if (transport_half_open_has_fifos (rmt->transport_proto))
+ return session_ho_stream_connect_notify (tc, 0 /* is_fail */ );
return 0;
}
if (!tc)
return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
- session_lookup_del_connection (tc);
+ if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
+ session_lookup_del_connection (tc);
transport_stop_listen (tp, s->connection_index);
return 0;
}
* point, either after sending everything or after a timeout, call delete
* notify. This will finally lead to the complete cleanup of the session.
*/
- if (svm_fifo_max_dequeue (s->tx_fifo))
+ if (svm_fifo_max_dequeue_cons (s->tx_fifo))
s->session_state = SESSION_STATE_CLOSED_WAITING;
else
s->session_state = SESSION_STATE_CLOSED;
{
svm_fifo_t *f;
- if (s->session_state == SESSION_STATE_LISTENING)
+ if (!s->rx_fifo)
return SESSION_INVALID_HANDLE;
f = s->rx_fifo;
/* *INDENT-ON* */
smm->session_type_to_next[session_type] = next_index;
- smm->session_tx_fns[session_type] = session_tx_fns[vft->tx_type];
+ smm->session_tx_fns[session_type] =
+ session_tx_fns[vft->transport_options.tx_type];
}
transport_connection_t *
s->connection_index);
}
+void
+session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
+{
+ if (s->session_state != SESSION_STATE_LISTENING)
+ return transport_get_endpoint (session_get_transport_proto (s),
+ s->connection_index, s->thread_index, tep,
+ is_lcl);
+ else
+ return transport_get_listener_endpoint (session_get_transport_proto (s),
+ s->connection_index, tep, is_lcl);
+}
+
transport_connection_t *
listen_session_get_transport (session_t * s)
{
vlib_thread_main_t *vtm = vlib_get_thread_main ();
u32 num_threads, preallocated_sessions_per_worker;
session_worker_t *wrk;
- int i, j;
+ int i;
num_threads = 1 /* main thread */ + vtm->n_threads;
/* Allocate cache line aligned worker contexts */
vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
- for (i = 0; i < TRANSPORT_N_PROTO; i++)
- {
- for (j = 0; j < num_threads; j++)
- smm->wrk[j].current_enqueue_epoch[i] = 1;
- }
-
for (i = 0; i < num_threads; i++)
{
wrk = &smm->wrk[i];