X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession_api.c;h=05b3bb89f0c156ddaeebd225b6ffc701596c3d5b;hb=3348a4cf070b90a9c23bbc0b3752fa2801f832a9;hp=c12354fdeaee488a55c1b05eaf819febcad45ed2;hpb=c3638fece1a3b96349b7df11261e6661b101ccbe;p=vpp.git diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index c12354fdeae..05b3bb89f0c 100755 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -147,7 +147,8 @@ send_del_segment_callback (u32 api_client_index, const ssvm_private_t * fs) } static int -send_app_cut_through_registration_add (u32 api_client_index, u64 mq_addr, +send_app_cut_through_registration_add (u32 api_client_index, + u32 wrk_map_index, u64 mq_addr, u64 peer_mq_addr) { vl_api_app_cut_through_registration_add_t *mp; @@ -169,6 +170,7 @@ send_app_cut_through_registration_add (u32 api_client_index, u64 mq_addr, mp->evt_q_address = mq_addr; mp->peer_evt_q_address = peer_mq_addr; + mp->wrk_index = wrk_map_index; mq = uword_to_pointer (mq_addr, svm_msg_q_t *); peer_mq = uword_to_pointer (peer_mq_addr, svm_msg_q_t *); @@ -253,7 +255,7 @@ send_session_accept_callback (stream_session_t * s) } else { - ll = application_get_local_listen_session (server_wrk, + ll = application_get_local_listen_session (server, ls->listener_index); if (ll->transport_listener_index != ~0) { @@ -443,7 +445,7 @@ mq_send_session_accepted_cb (stream_session_t * s) memset (evt, 0, sizeof (*evt)); evt->event_type = SESSION_CTRL_EVT_ACCEPTED; mp = (session_accepted_msg_t *) evt->data; - mp->context = app_wrk->wrk_index; + mp->context = app->app_index; mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo); mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo); @@ -475,6 +477,7 @@ mq_send_session_accepted_cb (stream_session_t * s) u8 main_thread = vlib_num_workers ()? 1 : 0; send_app_cut_through_registration_add (app->api_client_index, + app_wrk->wrk_map_index, ls->server_evt_q, ls->client_evt_q); @@ -486,9 +489,7 @@ mq_send_session_accepted_cb (stream_session_t * s) } else { - ll = - application_get_local_listen_session (app_wrk, - ls->listener_index); + ll = application_get_local_listen_session (app, ls->listener_index); if (ll->transport_listener_index != ~0) { listener = listen_session_get (ll->transport_listener_index); @@ -615,6 +616,7 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context, u8 main_thread = vlib_num_workers ()? 1 : 0; send_app_cut_through_registration_add (app->api_client_index, + app_wrk->wrk_map_index, ls->client_evt_q, ls->server_evt_q); @@ -636,6 +638,72 @@ done: return 0; } +static int +mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context, + session_handle_t handle, int rv) +{ + svm_msg_q_msg_t _msg, *msg = &_msg; + svm_msg_q_t *app_mq, *vpp_evt_q; + transport_connection_t *tc; + stream_session_t *ls = 0; + session_bound_msg_t *mp; + app_worker_t *app_wrk; + session_event_t *evt; + application_t *app; + + app_wrk = app_worker_get (app_wrk_index); + app = application_get (app_wrk->app_index); + app_mq = app_wrk->event_queue; + if (!app_mq) + { + clib_warning ("app %u with api index: %u not attached", app->app_index, + app->api_client_index); + return -1; + } + + svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING, + SVM_Q_WAIT, msg); + svm_msg_q_unlock (app_mq); + evt = svm_msg_q_msg_data (app_mq, msg); + memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_BOUND; + mp = (session_bound_msg_t *) evt->data; + mp->context = api_context; + + if (rv) + goto done; + + mp->handle = handle; + if (application_has_global_scope (app)) + { + ls = listen_session_get_from_handle (handle); + tc = listen_session_get_transport (ls); + mp->lcl_port = tc->lcl_port; + mp->lcl_is_ip4 = tc->is_ip4; + clib_memcpy (mp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip)); + } + else + { + local_session_t *local; + local = application_get_local_listener_w_handle (handle); + mp->lcl_port = local->port; + mp->lcl_is_ip4 = session_type_is_ip4 (local->session_type); + } + + if (ls && session_transport_service_type (ls) == TRANSPORT_SERVICE_CL) + { + mp->rx_fifo = pointer_to_uword (ls->server_rx_fifo); + mp->tx_fifo = pointer_to_uword (ls->server_tx_fifo); + vpp_evt_q = session_manager_get_vpp_event_queue (0); + mp->vpp_evt_q = pointer_to_uword (vpp_evt_q); + } + +done: + mp->retval = rv; + svm_msg_q_add (app_mq, msg, SVM_Q_WAIT); + return 0; +} + static session_cb_vft_t session_mq_cb_vft = { .session_accept_callback = mq_send_session_accepted_cb, .session_disconnect_callback = mq_send_session_disconnected_cb, @@ -791,6 +859,7 @@ vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp) stream_session_t *s; application_t *app = 0; svm_msg_q_t *vpp_evt_q; + app_worker_t *app_wrk; int rv; if (session_manager_is_enabled () == 0) @@ -837,6 +906,14 @@ done: } })); /* *INDENT-ON* */ + + /* If app uses mq for control messages, send an mq message as well */ + if (app && application_use_mq_for_ctrl (app)) + { + app_wrk = application_get_worker (app, 0); + mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, + rv); + } } static void @@ -1070,6 +1147,7 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp) int rv = 0; clib_error_t *error; application_t *app = 0; + app_worker_t *app_wrk; stream_session_t *s; transport_connection_t *tc = 0; ip46_address_t *ip46; @@ -1129,6 +1207,14 @@ done: } })); /* *INDENT-ON* */ + + /* If app uses mq for control messages, send an mq message as well */ + if (app && application_use_mq_for_ctrl (app)) + { + app_wrk = application_get_worker (app, mp->wrk_index); + mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, + rv); + } } static void @@ -1151,6 +1237,7 @@ vl_api_unbind_sock_t_handler (vl_api_unbind_sock_t * mp) { a->app_index = app->app_index; a->handle = mp->handle; + a->wrk_map_index = mp->wrk_index; if ((error = vnet_unbind (a))) { rv = clib_error_get_code (error); @@ -1167,7 +1254,7 @@ vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp) { vl_api_connect_session_reply_t *rmp; vnet_connect_args_t _a, *a = &_a; - application_t *app; + application_t *app = 0; clib_error_t *error = 0; int rv = 0; @@ -1219,6 +1306,12 @@ vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp) done: REPLY_MACRO (VL_API_CONNECT_SESSION_REPLY); + + if (app && application_use_mq_for_ctrl (app)) + { + app_worker_t *app_wrk = application_get_worker (app, mp->wrk_index); + mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, 1); + } } static void @@ -1241,7 +1334,7 @@ vl_api_app_worker_add_del_t_handler (vl_api_app_worker_add_del_t * mp) if (!reg) return; - app = application_lookup (mp->app_api_index); + app = application_lookup (clib_net_to_host_u32 (mp->app_api_index)); if (!app) { rv = VNET_API_ERROR_INVALID_VALUE; @@ -1261,8 +1354,8 @@ vl_api_app_worker_add_del_t_handler (vl_api_app_worker_add_del_t * mp) goto done; } - /* Make coverity happy */ - ASSERT (args.evt_q && args.segment); + if (!mp->is_add) + goto done; /* Send fifo segment fd if needed */ if (ssvm_type (args.segment) == SSVM_SEGMENT_MEMFD) @@ -1282,9 +1375,9 @@ vl_api_app_worker_add_del_t_handler (vl_api_app_worker_add_del_t * mp) done: REPLY_MACRO2 (VL_API_APP_WORKER_ADD_DEL_REPLY, ({ rmp->is_add = mp->is_add; - if (!rv) + rmp->wrk_index = clib_host_to_net_u32 (args.wrk_index); + if (!rv && mp->is_add) { - rmp->wrk_index = clib_host_to_net_u32 (args.wrk_index); if (vec_len (args.segment->name)) { memcpy (rmp->segment_name, args.segment->name,