X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession_node.c;h=281622bcc9b047301bedad0e729f49c1e44f42ec;hb=326b81e30e63a8296df51d85e6514356cd737225;hp=bf0c3959471bc0a6566a1b0046f09d2419a0b17a;hpb=52207f1b7b60cb0784d5241f0a4d40eef531c67e;p=vpp.git diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index bf0c3959471..281622bcc9b 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -29,6 +29,7 @@ session_mq_accepted_reply_handler (void *data) { session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data; vnet_disconnect_args_t _a = { 0 }, *a = &_a; + app_worker_t *app_wrk; local_session_t *ls; stream_session_t *s; @@ -44,9 +45,15 @@ session_mq_accepted_reply_handler (void *data) if (session_handle_is_local (mp->handle)) { ls = application_get_local_session_from_handle (mp->handle); - if (!ls || ls->app_index != mp->context) + if (!ls) { - clib_warning ("server %u doesn't own local handle %llu", + clib_warning ("unknown local handle 0x%lx", mp->handle); + return; + } + app_wrk = app_worker_get (ls->app_wrk_index); + if (app_wrk->app_index != mp->context) + { + clib_warning ("server %u doesn't own local handle 0x%lx", mp->context, mp->handle); return; } @@ -62,12 +69,15 @@ session_mq_accepted_reply_handler (void *data) clib_warning ("session doesn't exist"); return; } - if (s->app_index != mp->context) + app_wrk = app_worker_get (s->app_wrk_index); + if (app_wrk->app_index != mp->context) { clib_warning ("app doesn't own session"); return; } s->session_state = SESSION_STATE_READY; + if (!svm_fifo_is_empty (s->server_rx_fifo)) + app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX); } } @@ -75,8 +85,9 @@ static void session_mq_reset_reply_handler (void *data) { session_reset_reply_msg_t *mp; - application_t *app; + app_worker_t *app_wrk; stream_session_t *s; + application_t *app; u32 index, thread_index; mp = (session_reset_reply_msg_t *) data; @@ -86,11 +97,18 @@ session_mq_reset_reply_handler (void *data) session_parse_handle (mp->handle, &index, &thread_index); s = session_get_if_valid (index, thread_index); - if (s == 0 || app->index != s->app_index) + if (!s) { clib_warning ("Invalid session!"); return; } + app_wrk = app_worker_get (s->app_wrk_index); + if (!app_wrk || app_wrk->app_index != app->app_index) + { + clib_warning ("App % does not own handle 0x%lx!", app->app_index, + mp->handle); + return; + } /* Client objected to resetting the session, log and continue */ if (mp->retval) @@ -111,35 +129,43 @@ session_mq_disconnected_handler (void *data) vnet_disconnect_args_t _a, *a = &_a; svm_msg_q_msg_t _msg, *msg = &_msg; session_disconnected_msg_t *mp; + app_worker_t *app_wrk; session_event_t *evt; + stream_session_t *s; application_t *app; int rv = 0; mp = (session_disconnected_msg_t *) data; - app = application_lookup (mp->client_index); - if (app) + if (!(s = session_get_from_handle_if_valid (mp->handle))) { - a->handle = mp->handle; - a->app_index = app->index; - rv = vnet_disconnect_session (a); + clib_warning ("could not disconnect handle %llu", mp->handle); + return; } - else + app_wrk = app_worker_get (s->app_wrk_index); + app = application_lookup (mp->client_index); + if (!(app_wrk && app && app->app_index == app_wrk->app_index)) { - rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED; + clib_warning ("could not disconnect session: %llu app: %u", + mp->handle, mp->client_index); + return; } - svm_msg_q_lock_and_alloc_msg_w_ring (app->event_queue, + a->handle = mp->handle; + a->app_index = app_wrk->wrk_index; + rv = vnet_disconnect_session (a); + + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg); - svm_msg_q_unlock (app->event_queue); - evt = svm_msg_q_msg_data (app->event_queue, msg); + svm_msg_q_unlock (app_wrk->event_queue); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); memset (evt, 0, sizeof (*evt)); evt->event_type = SESSION_CTRL_EVT_DISCONNECTED; rmp = (session_disconnected_reply_msg_t *) evt->data; rmp->handle = mp->handle; rmp->context = mp->context; rmp->retval = rv; - svm_msg_q_add (app->event_queue, msg, SVM_Q_WAIT); + svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT); } static void @@ -163,7 +189,7 @@ session_mq_disconnected_reply_handler (void *data) if (app) { a->handle = mp->handle; - a->app_index = app->index; + a->app_index = app->app_index; vnet_disconnect_session (a); } } @@ -285,7 +311,10 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx, hdr->data_offset += n_bytes_read; if (hdr->data_offset == hdr->data_length) - svm_fifo_dequeue_drop (f, hdr->data_length); + { + u32 offset = hdr->data_length + SESSION_CONN_HDR_LEN; + svm_fifo_dequeue_drop (f, offset); + } } else n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->server_tx_fifo, @@ -701,7 +730,9 @@ session_tx_fifo_dequeue_internal (vlib_main_t * vm, stream_session_t * s, int *n_tx_pkts) { application_t *app; - app = application_get (s->opaque); + if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED)) + return 0; + app = application_get (s->t_app_index); svm_fifo_unset_event (s->server_tx_fifo); return app->cb_fns.builtin_app_tx_callback (s); } @@ -723,6 +754,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, svm_msg_q_msg_t _msg, *msg = &_msg; f64 now = vlib_time_now (vm); int n_tx_packets = 0, i, rv; + app_worker_t *app_wrk; application_t *app; svm_msg_q_t *mq; void (*fp) (void *); @@ -787,7 +819,7 @@ skip_dequeue: { stream_session_t *s; /* $$$ prefetch 1 ahead maybe */ session_event_t *e; - u32 to_dequeue; + u8 want_tx_evt; e = &fifo_events[i]; switch (e->event_type) @@ -803,21 +835,22 @@ skip_dequeue: s = session_event_get_session (e, thread_index); if (PREDICT_FALSE (!s)) { - clib_warning ("It's dead, Jim!"); + clib_warning ("session was freed!"); continue; } - to_dequeue = svm_fifo_max_dequeue (s->server_tx_fifo); + want_tx_evt = svm_fifo_want_tx_evt (s->server_tx_fifo); /* Spray packets in per session type frames, since they go to * different nodes */ rv = (smm->session_tx_fns[s->session_type]) (vm, node, e, s, &n_tx_packets); if (PREDICT_TRUE (rv == SESSION_TX_OK)) { - /* Notify app there's tx space if not polling */ - if (PREDICT_FALSE (to_dequeue == s->server_tx_fifo->nitems - && !svm_fifo_has_event (s->server_tx_fifo))) - session_dequeue_notify (s); + if (PREDICT_FALSE (want_tx_evt)) + { + svm_fifo_set_want_tx_evt (s->server_tx_fifo, 0); + session_dequeue_notify (s); + } } else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS)) { @@ -847,12 +880,18 @@ skip_dequeue: break; case FIFO_EVENT_BUILTIN_RX: s = session_event_get_session (e, thread_index); - if (PREDICT_FALSE (!s)) + if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING)) continue; svm_fifo_unset_event (s->server_rx_fifo); - app = application_get (s->app_index); + app_wrk = app_worker_get (s->app_wrk_index); + app = application_get (app_wrk->app_index); app->cb_fns.builtin_app_rx_callback (s); break; + case FIFO_EVENT_BUILTIN_TX: + s = session_get_from_handle_if_valid (e->session_handle); + if (PREDICT_TRUE (s != 0)) + session_tx_fifo_dequeue_internal (vm, node, e, s, &n_tx_packets); + break; case FIFO_EVENT_RPC: fp = e->rpc_args.fp; (*fp) (e->rpc_args.arg);