#include <vnet/session/transport.h>
#include <vnet/session/session.h>
#include <vnet/session/application.h>
+#include <vnet/session/application_interface.h>
#include <vnet/session/session_debug.h>
#include <svm/queue.h>
+static void
+session_mq_accepted_reply_handler (void *data)
+{
+ session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
+ vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+ local_session_t *ls;
+ stream_session_t *s;
+
+ /* Server isn't interested, kill the session */
+ if (mp->retval)
+ {
+ a->app_index = mp->context;
+ a->handle = mp->handle;
+ vnet_disconnect_session (a);
+ return;
+ }
+
+ if (session_handle_is_local (mp->handle))
+ {
+ ls = application_get_local_session_from_handle (mp->handle);
+ if (!ls || ls->app_wrk_index != mp->context)
+ {
+ clib_warning ("server %u doesn't own local handle %llu",
+ mp->context, mp->handle);
+ return;
+ }
+ if (application_local_session_connect_notify (ls))
+ return;
+ ls->session_state = SESSION_STATE_READY;
+ }
+ else
+ {
+ s = session_get_from_handle_if_valid (mp->handle);
+ if (!s)
+ {
+ clib_warning ("session doesn't exist");
+ return;
+ }
+ if (s->app_wrk_index != mp->context)
+ {
+ clib_warning ("app doesn't own session");
+ return;
+ }
+ s->session_state = SESSION_STATE_READY;
+ if (!svm_fifo_is_empty (s->server_rx_fifo))
+ {
+ app_worker_t *app;
+ app = app_worker_get (s->app_wrk_index);
+ application_send_event (app, s, FIFO_EVENT_APP_RX);
+ }
+ }
+}
+
+static void
+session_mq_reset_reply_handler (void *data)
+{
+ session_reset_reply_msg_t *mp;
+ app_worker_t *app_wrk;
+ stream_session_t *s;
+ application_t *app;
+ u32 index, thread_index;
+
+ mp = (session_reset_reply_msg_t *) data;
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ session_parse_handle (mp->handle, &index, &thread_index);
+ s = session_get_if_valid (index, thread_index);
+ if (!s)
+ {
+ clib_warning ("Invalid session!");
+ return;
+ }
+ app_wrk = app_worker_get (s->app_wrk_index);
+ if (!app_wrk || app_wrk->app_index != app->app_index)
+ {
+ clib_warning ("App % does not own handle 0x%lx!", app->app_index,
+ mp->handle);
+ return;
+ }
+
+ /* Client objected to resetting the session, log and continue */
+ if (mp->retval)
+ {
+ clib_warning ("client retval %d", mp->retval);
+ return;
+ }
+
+ /* This comes as a response to a reset, transport only waiting for
+ * confirmation to remove connection state, no need to disconnect */
+ stream_session_cleanup (s);
+}
+
+static void
+session_mq_disconnected_handler (void *data)
+{
+ session_disconnected_reply_msg_t *rmp;
+ vnet_disconnect_args_t _a, *a = &_a;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ session_disconnected_msg_t *mp;
+ app_worker_t *app_wrk;
+ session_event_t *evt;
+ stream_session_t *s;
+ application_t *app;
+ int rv = 0;
+
+ mp = (session_disconnected_msg_t *) data;
+ s = session_get_from_handle_if_valid (mp->handle);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app = application_lookup (mp->client_index);
+ if (!(app_wrk && s && app->app_index == app_wrk->app_index))
+ {
+ clib_warning ("could not disconnect session: %llu app_wrk: %u",
+ mp->handle, mp->client_index);
+ return;
+ }
+
+ a->handle = mp->handle;
+ a->app_index = app_wrk->wrk_index;
+ rv = vnet_disconnect_session (a);
+
+ svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+ SESSION_MQ_CTRL_EVT_RING,
+ SVM_Q_WAIT, msg);
+ svm_msg_q_unlock (app_wrk->event_queue);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+ rmp = (session_disconnected_reply_msg_t *) evt->data;
+ rmp->handle = mp->handle;
+ rmp->context = mp->context;
+ rmp->retval = rv;
+ svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+}
+
+static void
+session_mq_disconnected_reply_handler (void *data)
+{
+ session_disconnected_reply_msg_t *mp;
+ vnet_disconnect_args_t _a, *a = &_a;
+ application_t *app;
+
+ mp = (session_disconnected_reply_msg_t *) data;
+
+ /* Client objected to disconnecting the session, log and continue */
+ if (mp->retval)
+ {
+ clib_warning ("client retval %d", mp->retval);
+ return;
+ }
+
+ /* Disconnect has been confirmed. Confirm close to transport */
+ app = application_lookup (mp->context);
+ if (app)
+ {
+ a->handle = mp->handle;
+ a->app_index = app->app_index;
+ vnet_disconnect_session (a);
+ }
+}
+
vlib_node_registration_t session_queue_node;
typedef struct
return s;
}
-vlib_node_registration_t session_queue_node;
-
#define foreach_session_queue_error \
_(TX, "Packets transmitted") \
_(TIMER, "Timer events") \
SESSION_TX_OK
};
-
static void
session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
u32 next_index, u32 * to_next, u16 n_segs,
always_inline int
session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_fifo_event_t * e,
+ session_event_t * e,
stream_session_t * s, int *n_tx_packets,
u8 peek_data)
{
int
session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_fifo_event_t * e,
+ session_event_t * e,
stream_session_t * s, int *n_tx_pkts)
{
return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 1);
int
session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_fifo_event_t * e,
+ session_event_t * e,
stream_session_t * s, int *n_tx_pkts)
{
return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 0);
int
session_tx_fifo_dequeue_internal (vlib_main_t * vm,
vlib_node_runtime_t * node,
- session_fifo_event_t * e,
+ session_event_t * e,
stream_session_t * s, int *n_tx_pkts)
{
application_t *app;
}
always_inline stream_session_t *
-session_event_get_session (session_fifo_event_t * e, u8 thread_index)
+session_event_get_session (session_event_t * e, u8 thread_index)
{
return session_get_if_valid (e->fifo->master_session_index, thread_index);
}
vlib_frame_t * frame)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
- session_fifo_event_t *pending_events, *e;
- session_fifo_event_t *fifo_events;
- u32 n_to_dequeue, n_events;
- svm_queue_t *q;
- application_t *app;
- int n_tx_packets = 0;
- u32 thread_index = vm->thread_index;
- int i, rv;
+ u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
+ session_event_t *pending_events, *e;
+ session_event_t *fifo_events;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
f64 now = vlib_time_now (vm);
+ int n_tx_packets = 0, i, rv;
+ app_worker_t *app_wrk;
+ application_t *app;
+ svm_msg_q_t *mq;
void (*fp) (void *);
SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
transport_update_time (now, thread_index);
/*
- * Get vpp queue events
+ * Get vpp queue events that we can dequeue without blocking
*/
- q = smm->vpp_event_queues[thread_index];
- if (PREDICT_FALSE (q == 0))
- return 0;
-
+ mq = smm->vpp_event_queues[thread_index];
fifo_events = smm->free_event_vector[thread_index];
-
- /* min number of events we can dequeue without blocking */
- n_to_dequeue = q->cursize;
+ n_to_dequeue = svm_msg_q_size (mq);
pending_events = smm->pending_event_vector[thread_index];
if (!n_to_dequeue && !vec_len (pending_events)
}
/* See you in the next life, don't be late
- * XXX: we may need priorities here
- */
- if (pthread_mutex_trylock (&q->mutex))
+ * XXX: we may need priorities here */
+ if (svm_msg_q_try_lock (mq))
return 0;
for (i = 0; i < n_to_dequeue; i++)
{
vec_add2 (fifo_events, e, 1);
- svm_queue_sub_raw (q, (u8 *) e);
+ svm_msg_q_sub_w_lock (mq, msg);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
+ svm_msg_q_free_msg (mq, msg);
}
- /* The other side of the connection is not polling */
- if (q->cursize < (q->maxsize / 8))
- (void) pthread_cond_broadcast (&q->condvar);
- pthread_mutex_unlock (&q->mutex);
+ svm_msg_q_unlock (mq);
vec_append (fifo_events, pending_events);
vec_append (fifo_events, smm->pending_disconnects[thread_index]);
for (i = 0; i < n_events; i++)
{
stream_session_t *s; /* $$$ prefetch 1 ahead maybe */
- session_fifo_event_t *e;
+ session_event_t *e;
+ u8 is_full;
e = &fifo_events[i];
switch (e->event_type)
clib_warning ("It's dead, Jim!");
continue;
}
+ is_full = svm_fifo_is_full (s->server_tx_fifo);
/* Spray packets in per session type frames, since they go to
* different nodes */
&n_tx_packets);
if (PREDICT_TRUE (rv == SESSION_TX_OK))
{
- session_dequeue_notify (s);
+ /* Notify app there's tx space if not polling */
+ if (PREDICT_FALSE (is_full
+ && !svm_fifo_has_event (s->server_tx_fifo)))
+ session_dequeue_notify (s);
}
else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
{
if (PREDICT_FALSE (!s))
continue;
svm_fifo_unset_event (s->server_rx_fifo);
- app = application_get (s->app_index);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app = application_get (app_wrk->app_index);
app->cb_fns.builtin_app_rx_callback (s);
break;
case FIFO_EVENT_RPC:
fp = e->rpc_args.fp;
(*fp) (e->rpc_args.arg);
break;
-
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ session_mq_disconnected_handler (e->data);
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+ session_mq_accepted_reply_handler (e->data);
+ break;
+ case SESSION_CTRL_EVT_CONNECTED_REPLY:
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
+ session_mq_disconnected_reply_handler (e->data);
+ break;
+ case SESSION_CTRL_EVT_RESET_REPLY:
+ session_mq_reset_reply_handler (e->data);
+ break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}
session_manager_main_t *smm = vnet_get_session_manager_main ();
vlib_main_t *vm = &vlib_global_main;
u32 my_thread_index = vm->thread_index;
- session_fifo_event_t _e, *e = &_e;
+ session_event_t _e, *e = &_e;
+ svm_msg_q_ring_t *ring;
stream_session_t *s0;
+ svm_msg_q_msg_t *msg;
+ svm_msg_q_t *mq;
int i, index;
- i8 *headp;
-
- svm_queue_t *q;
- q = smm->vpp_event_queues[my_thread_index];
- index = q->head;
+ mq = smm->vpp_event_queues[my_thread_index];
+ index = mq->q->head;
- for (i = 0; i < q->cursize; i++)
+ for (i = 0; i < mq->q->cursize; i++)
{
- headp = (i8 *) (&q->data[0] + q->elsize * index);
- clib_memcpy (e, headp, q->elsize);
+ msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+ ring = svm_msg_q_ring (mq, msg->ring_index);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
switch (e->event_type)
{
index++;
- if (index == q->maxsize)
+ if (index == mq->q->maxsize)
index = 0;
}
}
static u8
-session_node_cmp_event (session_fifo_event_t * e, svm_fifo_t * f)
+session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
{
stream_session_t *s;
switch (e->event_type)
}
u8
-session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
+session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
- svm_queue_t *q;
- session_fifo_event_t *pending_event_vector, *evt;
+ svm_msg_q_t *mq;
+ session_event_t *pending_event_vector, *evt;
int i, index, found = 0;
- i8 *headp;
+ svm_msg_q_msg_t *msg;
+ svm_msg_q_ring_t *ring;
u8 thread_index;
ASSERT (e);
/*
* Search evt queue
*/
- q = smm->vpp_event_queues[thread_index];
- index = q->head;
- for (i = 0; i < q->cursize; i++)
+ mq = smm->vpp_event_queues[thread_index];
+ index = mq->q->head;
+ for (i = 0; i < mq->q->cursize; i++)
{
- headp = (i8 *) (&q->data[0] + q->elsize * index);
- clib_memcpy (e, headp, q->elsize);
+ msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+ ring = svm_msg_q_ring (mq, msg->ring_index);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
found = session_node_cmp_event (e, f);
if (found)
return 1;
- if (++index == q->maxsize)
+ if (++index == mq->q->maxsize)
index = 0;
}
/*