session_event_t *evt;
svm_msg_q_msg_t msg;
svm_msg_q_t *mq;
- u32 tries = 0, max_tries;
mq = session_main_get_vpp_event_queue (thread_index);
- while (svm_msg_q_try_lock (mq))
- {
- max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
- if (tries++ == max_tries)
- {
- SESSION_DBG ("failed to enqueue evt");
- return -1;
- }
- }
+ if (PREDICT_FALSE (svm_msg_q_lock (mq)))
+ return -1;
if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
{
svm_msg_q_unlock (mq);
evt->rpc_args.fp = data;
evt->rpc_args.arg = args;
break;
+ case SESSION_IO_EVT_RX:
case SESSION_IO_EVT_TX:
case SESSION_IO_EVT_TX_FLUSH:
case SESSION_IO_EVT_BUILTIN_RX:
SESSION_CTRL_EVT_CLOSE);
}
+void
+session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
+ void *rpc_args)
+{
+ session_send_evt_to_thread (fp, rpc_args, thread_index,
+ SESSION_CTRL_EVT_RPC);
+}
+
void
session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
{
if (thread_index != vlib_get_thread_index ())
- session_send_evt_to_thread (fp, rpc_args, thread_index,
- SESSION_CTRL_EVT_RPC);
+ session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
else
{
void (*fnp) (void *) = fp;
clib_memset (s, 0, sizeof (*s));
s->session_index = s - wrk->sessions;
s->thread_index = thread_index;
+ s->app_index = APP_INVALID_INDEX;
return s;
}
void
session_free (session_t * s)
{
- pool_put (session_main.wrk[s->thread_index].sessions, s);
if (CLIB_DEBUG)
- clib_memset (s, 0xFA, sizeof (*s));
+ {
+ u8 thread_index = s->thread_index;
+ clib_memset (s, 0xFA, sizeof (*s));
+ pool_put (session_main.wrk[thread_index].sessions, s);
+ return;
+ }
+ SESSION_EVT_DBG (SESSION_EVT_FREE, s);
+ pool_put (session_main.wrk[s->thread_index].sessions, s);
}
void
continue;
if (is_in_order)
{
- rv = svm_fifo_enqueue_nowait (s->rx_fifo, len, data);
+ rv = svm_fifo_enqueue (s->rx_fifo, len, data);
if (rv == len)
{
written += rv;
if (is_in_order)
{
- enqueued = svm_fifo_enqueue_nowait (s->rx_fifo,
- b->current_length,
- vlib_buffer_get_current (b));
+ enqueued = svm_fifo_enqueue (s->rx_fifo,
+ b->current_length,
+ vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
&& enqueued >= 0))
{
{
int enqueued = 0, rv, in_order_off;
- ASSERT (svm_fifo_max_enqueue (s->rx_fifo)
+ ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
>= b->current_length + sizeof (*hdr));
- svm_fifo_enqueue_nowait (s->rx_fifo, sizeof (session_dgram_hdr_t),
- (u8 *) hdr);
- enqueued = svm_fifo_enqueue_nowait (s->rx_fifo, b->current_length,
- vlib_buffer_get_current (b));
+ svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
+ enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
+ vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
{
in_order_off = enqueued > b->current_length ? enqueued : 0;
session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
{
session_t *s = session_get (tc->s_index, tc->thread_index);
- return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
+ u32 rv;
+
+ rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
+
+ if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
+ session_dequeue_notify (s);
+
+ return rv;
}
static inline int
session_enqueue_notify_inline (session_t * s)
{
app_worker_t *app_wrk;
+ u32 session_index;
+ u8 n_subscribers;
+
+ session_index = s->session_index;
+ n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
app_wrk = app_worker_get_if_valid (s->app_wrk_index);
if (PREDICT_FALSE (!app_wrk))
/* *INDENT-OFF* */
SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
ed->data[0] = SESSION_IO_EVT_RX;
- ed->data[1] = svm_fifo_max_dequeue (s->rx_fifo);
+ ed->data[1] = svm_fifo_max_dequeue_prod (s->rx_fifo);
}));
/* *INDENT-ON* */
SESSION_IO_EVT_RX)))
return -1;
- if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo)))
- return session_notify_subscribers (app_wrk->app_index, s,
- s->rx_fifo, SESSION_IO_EVT_RX);
+ if (PREDICT_FALSE (n_subscribers))
+ {
+ s = session_get (session_index, vlib_get_thread_index ());
+ return session_notify_subscribers (app_wrk->app_index, s,
+ s->rx_fifo, SESSION_IO_EVT_RX);
+ }
return 0;
}
return session_notify_subscribers (app_wrk->app_index, s,
s->tx_fifo, SESSION_IO_EVT_TX);
- svm_fifo_clear_tx_ntf (s->tx_fifo);
+ svm_fifo_clear_deq_ntf (s->tx_fifo);
return 0;
}
continue;
}
- if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo))
- continue;
-
if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
errors++;
}
return errors;
}
-int
-session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
+static inline int
+session_stream_connect_notify_inline (transport_connection_t * tc, u8 is_fail,
+ session_state_t opened_state)
{
u32 opaque = 0, new_ti, new_si;
app_worker_t *app_wrk;
return -1;
}
+ s = session_get (new_si, new_ti);
+ s->session_state = opened_state;
+ session_lookup_add_connection (tc, session_handle (s));
+
if (app_worker_connect_notify (app_wrk, s, opaque))
{
s = session_get (new_si, new_ti);
return -1;
}
- s = session_get (new_si, new_ti);
- s->session_state = SESSION_STATE_READY;
- session_lookup_add_connection (tc, session_handle (s));
-
return 0;
}
+int
+session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
+{
+ return session_stream_connect_notify_inline (tc, is_fail,
+ SESSION_STATE_READY);
+}
+
+int
+session_ho_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
+{
+ return session_stream_connect_notify_inline (tc, is_fail,
+ SESSION_STATE_OPENED);
+}
+
typedef struct _session_switch_pool_args
{
u32 session_index;
if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
return;
+ /* Transport thinks that app requested close but it actually didn't.
+ * Can happen for tcp if fin and rst are received in close succession. */
+ if (s->session_state == SESSION_STATE_READY)
+ {
+ session_transport_closing_notify (tc);
+ svm_fifo_dequeue_drop_all (s->tx_fifo);
+ }
/* If app close has not been received or has not yet resulted in
* a transport close, only mark the session transport as closed */
- if (s->session_state <= SESSION_STATE_CLOSING)
+ else if (s->session_state <= SESSION_STATE_CLOSING)
{
session_lookup_del_session (s);
s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
void
session_transport_reset_notify (transport_connection_t * tc)
{
- session_t *s;
app_worker_t *app_wrk;
- application_t *app;
+ session_t *s;
+
s = session_get (tc->s_index, tc->thread_index);
svm_fifo_dequeue_drop_all (s->tx_fifo);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
return;
s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
app_wrk = app_worker_get (s->app_wrk_index);
- app = application_get (app_wrk->app_index);
- app->cb_fns.session_reset_callback (s);
+ app_worker_reset_notify (app_wrk, s);
}
int
*/
int
session_stream_accept (transport_connection_t * tc, u32 listener_index,
- u8 notify)
+ u32 thread_index, u8 notify)
{
session_t *s;
int rv;
s = session_alloc_for_connection (tc);
- s->listener_index = listener_index;
+ s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
s->session_state = SESSION_STATE_CREATED;
if ((rv = app_worker_init_accepted (s)))
transport_connection_t *tc;
transport_endpoint_cfg_t *tep;
app_worker_t *app_wrk;
+ session_handle_t sh;
session_t *s;
int rv;
return -1;
}
+ sh = session_handle (s);
+ session_lookup_add_connection (tc, sh);
return app_worker_connect_notify (app_wrk, s, opaque);
}
* thing but better than allocating a separate half-open pool.
*/
tc->s_index = opaque;
+ if (transport_half_open_has_fifos (rmt->transport_proto))
+ return session_ho_stream_connect_notify (tc, 0 /* is_fail */ );
return 0;
}
if (!tc)
return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
- session_lookup_del_connection (tc);
+ if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
+ session_lookup_del_connection (tc);
transport_stop_listen (tp, s->connection_index);
return 0;
}
* point, either after sending everything or after a timeout, call delete
* notify. This will finally lead to the complete cleanup of the session.
*/
- if (svm_fifo_max_dequeue (s->tx_fifo))
+ if (svm_fifo_max_dequeue_cons (s->tx_fifo))
s->session_state = SESSION_STATE_CLOSED_WAITING;
else
s->session_state = SESSION_STATE_CLOSED;
{
svm_fifo_t *f;
- if (s->session_state == SESSION_STATE_LISTENING)
+ if (!s->rx_fifo)
return SESSION_INVALID_HANDLE;
f = s->rx_fifo;
/* *INDENT-ON* */
smm->session_type_to_next[session_type] = next_index;
- smm->session_tx_fns[session_type] = session_tx_fns[vft->tx_type];
+ smm->session_tx_fns[session_type] =
+ session_tx_fns[vft->transport_options.tx_type];
}
transport_connection_t *
s->connection_index);
}
+void
+session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
+{
+ if (s->session_state != SESSION_STATE_LISTENING)
+ return transport_get_endpoint (session_get_transport_proto (s),
+ s->connection_index, s->thread_index, tep,
+ is_lcl);
+ else
+ return transport_get_listener_endpoint (session_get_transport_proto (s),
+ s->connection_index, tep, is_lcl);
+}
+
transport_connection_t *
listen_session_get_transport (session_t * s)
{