if (session_handle_is_local (mp->handle))
{
+ app_worker_t *app_wrk;
+ application_t *app;
ls = application_get_local_session_from_handle (mp->handle);
- if (!ls || ls->app_index != mp->context)
+ if (!ls)
{
- clib_warning ("server %u doesn't own local handle %llu",
+ clib_warning ("unknown local handle 0x%lx", mp->handle);
+ return;
+ }
+ app_wrk = app_worker_get (ls->app_wrk_index);
+ app = application_get (app_wrk->app_index);
+ if (app->app_index != mp->context)
+ {
+ clib_warning ("server %u doesn't own local handle 0x%lx",
mp->context, mp->handle);
return;
}
clib_warning ("session doesn't exist");
return;
}
- if (s->app_index != mp->context)
+ if (s->app_wrk_index != mp->context)
{
clib_warning ("app doesn't own session");
return;
}
s->session_state = SESSION_STATE_READY;
+ if (!svm_fifo_is_empty (s->server_rx_fifo))
+ {
+ app_worker_t *app;
+ app = app_worker_get (s->app_wrk_index);
+ app_worker_send_event (app, s, FIFO_EVENT_APP_RX);
+ }
}
}
session_mq_reset_reply_handler (void *data)
{
session_reset_reply_msg_t *mp;
- application_t *app;
+ app_worker_t *app_wrk;
stream_session_t *s;
+ application_t *app;
u32 index, thread_index;
mp = (session_reset_reply_msg_t *) data;
session_parse_handle (mp->handle, &index, &thread_index);
s = session_get_if_valid (index, thread_index);
- if (s == 0 || app->index != s->app_index)
+ if (!s)
{
clib_warning ("Invalid session!");
return;
}
+ app_wrk = app_worker_get (s->app_wrk_index);
+ if (!app_wrk || app_wrk->app_index != app->app_index)
+ {
+ clib_warning ("App % does not own handle 0x%lx!", app->app_index,
+ mp->handle);
+ return;
+ }
/* Client objected to resetting the session, log and continue */
if (mp->retval)
vnet_disconnect_args_t _a, *a = &_a;
svm_msg_q_msg_t _msg, *msg = &_msg;
session_disconnected_msg_t *mp;
+ app_worker_t *app_wrk;
session_event_t *evt;
stream_session_t *s;
application_t *app;
int rv = 0;
mp = (session_disconnected_msg_t *) data;
+ if (!(s = session_get_from_handle_if_valid (mp->handle)))
+ {
+ clib_warning ("could not disconnect handle %llu", mp->handle);
+ return;
+ }
+ app_wrk = app_worker_get (s->app_wrk_index);
app = application_lookup (mp->client_index);
- s = session_get_from_handle_if_valid (mp->handle);
- if (!(app && s && s->app_index == app->index))
+ if (!(app_wrk && app && app->app_index == app_wrk->app_index))
{
- clib_warning ("could not disconnect session: %llu app: %u", mp->handle,
- mp->client_index);
+ clib_warning ("could not disconnect session: %llu app: %u",
+ mp->handle, mp->client_index);
return;
}
a->handle = mp->handle;
- a->app_index = app->index;
+ a->app_index = app_wrk->wrk_index;
rv = vnet_disconnect_session (a);
- svm_msg_q_lock_and_alloc_msg_w_ring (app->event_queue,
+ svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
SESSION_MQ_CTRL_EVT_RING,
SVM_Q_WAIT, msg);
- svm_msg_q_unlock (app->event_queue);
- evt = svm_msg_q_msg_data (app->event_queue, msg);
+ svm_msg_q_unlock (app_wrk->event_queue);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
memset (evt, 0, sizeof (*evt));
evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
rmp = (session_disconnected_reply_msg_t *) evt->data;
rmp->handle = mp->handle;
rmp->context = mp->context;
rmp->retval = rv;
- svm_msg_q_add (app->event_queue, msg, SVM_Q_WAIT);
+ svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
}
static void
if (app)
{
a->handle = mp->handle;
- a->app_index = app->index;
+ a->app_index = app->app_index;
vnet_disconnect_session (a);
}
}
hdr->data_offset += n_bytes_read;
if (hdr->data_offset == hdr->data_length)
- svm_fifo_dequeue_drop (f, hdr->data_length);
+ {
+ u32 offset = hdr->data_length + SESSION_CONN_HDR_LEN;
+ svm_fifo_dequeue_drop (f, offset);
+ }
}
else
n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->server_tx_fifo,
stream_session_t * s, int *n_tx_pkts)
{
application_t *app;
- app = application_get (s->opaque);
+ app = application_get (s->t_app_index);
svm_fifo_unset_event (s->server_tx_fifo);
return app->cb_fns.builtin_app_tx_callback (s);
}
svm_msg_q_msg_t _msg, *msg = &_msg;
f64 now = vlib_time_now (vm);
int n_tx_packets = 0, i, rv;
+ app_worker_t *app_wrk;
application_t *app;
svm_msg_q_t *mq;
void (*fp) (void *);
{
stream_session_t *s; /* $$$ prefetch 1 ahead maybe */
session_event_t *e;
- u32 to_dequeue;
+ u8 is_full;
e = &fifo_events[i];
switch (e->event_type)
clib_warning ("It's dead, Jim!");
continue;
}
- to_dequeue = svm_fifo_max_dequeue (s->server_tx_fifo);
+ is_full = svm_fifo_is_full (s->server_tx_fifo);
/* Spray packets in per session type frames, since they go to
* different nodes */
if (PREDICT_TRUE (rv == SESSION_TX_OK))
{
/* Notify app there's tx space if not polling */
- if (PREDICT_FALSE (to_dequeue == s->server_tx_fifo->nitems
+ if (PREDICT_FALSE (is_full
&& !svm_fifo_has_event (s->server_tx_fifo)))
session_dequeue_notify (s);
}
if (PREDICT_FALSE (!s))
continue;
svm_fifo_unset_event (s->server_rx_fifo);
- app = application_get (s->app_index);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app = application_get (app_wrk->app_index);
app->cb_fns.builtin_app_rx_callback (s);
break;
case FIFO_EVENT_RPC: