From ab2f6dbf9f7b7164a9810f4c80c8abf8463e42ad Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Fri, 31 Aug 2018 14:31:41 -0700 Subject: [PATCH] session: support multiple worker binds Allows app workers to listen on the same session endpoint. Incoming connects are spread across the workers in a round-robin fashion Change-Id: Ib5f5817230d9abc6127a85cdbdcad70d980c0f7f Signed-off-by: Florin Coras --- src/vcl/vcl_bapi.c | 43 ++- src/vcl/vcl_debug.h | 2 +- src/vcl/vcl_private.h | 2 +- src/vcl/vcl_test_server.c | 6 +- src/vcl/vppcom.c | 39 ++- src/vnet/session/application.c | 537 +++++++++++++++++++++++-------- src/vnet/session/application.h | 95 ++++-- src/vnet/session/application_interface.c | 81 +++-- src/vnet/session/application_interface.h | 2 +- src/vnet/session/segment_manager.c | 3 +- src/vnet/session/session.api | 2 + src/vnet/session/session.c | 116 ++----- src/vnet/session/session.h | 6 +- src/vnet/session/session_api.c | 19 +- src/vnet/session/session_node.c | 17 +- src/vnet/session/session_test.c | 9 +- src/vnet/session/stream_session.h | 31 +- src/vnet/tls/tls.c | 16 +- test/scripts/socket_test.sh | 4 +- test/test_vcl.py | 116 +++---- 20 files changed, 721 insertions(+), 425 deletions(-) diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c index 44e6d9d1d04..0c46d8220fd 100644 --- a/src/vcl/vcl_bapi.c +++ b/src/vcl/vcl_bapi.c @@ -138,9 +138,9 @@ static void vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t * mp) { + int n_fds = 0, *fds = 0; vcl_worker_t *wrk; - int n_fds = 0; - int *fds = 0; + u32 wrk_index; if (mp->retval) { @@ -148,14 +148,14 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t * format_api_error, ntohl (mp->retval)); goto failed; } - ASSERT (mp->context == mp->wrk_index); - if (mp->context != mp->wrk_index) + wrk_index = clib_net_to_host_u32 (mp->wrk_index); + if (mp->context != wrk_index) { clib_warning ("VCL<%d>: wrk numbering doesn't match ours: %u, vpp: %u", - getpid (), mp->context, mp->wrk_index); + getpid (), mp->context, wrk_index); goto failed; } - wrk = vcl_worker_get (mp->context); + wrk = vcl_worker_get (wrk_index); wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *); @@ -301,10 +301,31 @@ vl_api_unbind_sock_reply_t_handler (vl_api_unbind_sock_reply_t * mp) VDBG (1, "VCL<%d>: sid %u: unbind succeeded!", getpid (), mp->context); } +static void +vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * + mp) +{ + if (mp->retval) + clib_warning ("VCL<%d>: ERROR: sid %u: disconnect failed: %U", + getpid (), mp->context, format_api_error, + ntohl (mp->retval)); +} + +static void +vl_api_connect_sock_reply_t_handler (vl_api_connect_sock_reply_t * mp) +{ + if (mp->retval) + clib_warning ("VCL<%d>: ERROR: sid %u: connect failed: %U", + getpid (), mp->context, format_api_error, + ntohl (mp->retval)); +} + #define foreach_sock_msg \ _(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \ _(BIND_SOCK_REPLY, bind_sock_reply) \ _(UNBIND_SOCK_REPLY, unbind_sock_reply) \ +_(CONNECT_SOCK_REPLY, connect_sock_reply) \ +_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \ _(APPLICATION_ATTACH_REPLY, application_attach_reply) \ _(APPLICATION_DETACH_REPLY, application_detach_reply) \ _(MAP_ANOTHER_SEGMENT, map_another_segment) \ @@ -421,13 +442,12 @@ vppcom_send_connect_sock (vcl_session_t * session) { vl_api_connect_sock_t *cmp; - /* Assumes caller as acquired the spinlock: vcm->sessions_lockp */ cmp = vl_msg_api_alloc (sizeof (*cmp)); memset (cmp, 0, sizeof (*cmp)); cmp->_vl_msg_id = ntohs (VL_API_CONNECT_SOCK); cmp->client_index = vcm->my_client_index; cmp->context = session->session_index; - + cmp->wrk_index = vcl_get_worker_index (); cmp->is_ip4 = session->transport.is_ip4; clib_memcpy (cmp->ip, &session->transport.rmt_ip, sizeof (cmp->ip)); cmp->port = session->transport.rmt_port; @@ -437,13 +457,10 @@ vppcom_send_connect_sock (vcl_session_t * session) } void -vppcom_send_disconnect_session (u64 vpp_handle, u32 session_index) +vppcom_send_disconnect_session (u64 vpp_handle) { vl_api_disconnect_session_t *dmp; - VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect msg", - getpid (), vpp_handle, session_index); - dmp = vl_msg_api_alloc (sizeof (*dmp)); memset (dmp, 0, sizeof (*dmp)); dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION); @@ -467,6 +484,7 @@ vppcom_send_bind_sock (vcl_session_t * session) bmp->_vl_msg_id = ntohs (VL_API_BIND_SOCK); bmp->client_index = vcm->my_client_index; bmp->context = session->session_index; + bmp->wrk_index = vcl_get_worker_index (); bmp->is_ip4 = session->transport.is_ip4; clib_memcpy (bmp->ip, &session->transport.lcl_ip, sizeof (bmp->ip)); bmp->port = session->transport.lcl_port; @@ -485,6 +503,7 @@ vppcom_send_unbind_sock (u64 vpp_handle) ump->_vl_msg_id = ntohs (VL_API_UNBIND_SOCK); ump->client_index = vcm->my_client_index; + ump->wrk_index = vcl_get_worker_index (); ump->handle = vpp_handle; vl_msg_api_send_shmem (vcm->vl_input_queue, (u8 *) & ump); } diff --git a/src/vcl/vcl_debug.h b/src/vcl/vcl_debug.h index 13e6726c259..3faa45822f8 100644 --- a/src/vcl/vcl_debug.h +++ b/src/vcl/vcl_debug.h @@ -22,7 +22,7 @@ #define VDBG(_lvl, _fmt, _args...) \ if (vcm->debug > _lvl) \ - clib_warning (_fmt, ##_args) + clib_warning ("vcl: " _fmt, __vcl_worker_index, ##_args) #define foreach_vcl_dbg_evt \ _(INIT, "vcl init track") \ diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 1a9bb412fd7..ba9094cd96b 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -485,7 +485,7 @@ void vppcom_send_session_enable_disable (u8 is_enable); void vppcom_app_send_attach (void); void vppcom_app_send_detach (void); void vppcom_send_connect_sock (vcl_session_t * session); -void vppcom_send_disconnect_session (u64 vpp_handle, u32 session_index); +void vppcom_send_disconnect_session (u64 vpp_handle); void vppcom_send_bind_sock (vcl_session_t * session); void vppcom_send_unbind_sock (u64 vpp_handle); void vppcom_api_hookup (void); diff --git a/src/vcl/vcl_test_server.c b/src/vcl/vcl_test_server.c index 8b8b77fab2c..d1f2db72872 100644 --- a/src/vcl/vcl_test_server.c +++ b/src/vcl/vcl_test_server.c @@ -472,10 +472,9 @@ vcl_test_server_handle_cfg (vcl_test_server_worker_t * wrk, break; case SOCK_TEST_TYPE_EXIT: - vtinf ("Have a great day conn %d!", conn->fd); + vtinf ("Have a great day conn %d (closing)!", conn->fd); vppcom_session_close (conn->fd); conn_pool_free (conn); - vtinf ("Closed client fd %d", conn->fd); wrk->nfds--; break; @@ -644,6 +643,7 @@ main (int argc, char **argv) clib_mem_init_thread_safe (0, 64 << 20); ssm->cfg.port = SOCK_TEST_SERVER_PORT; ssm->cfg.workers = 1; + ssm->active_workers = 1; vcl_test_server_process_opts (ssm, argc, argv); rv = vppcom_app_create ("vcl_test_server"); @@ -661,7 +661,7 @@ main (int argc, char **argv) } vcl_test_server_worker_loop (&ssm->workers[0]); - while (ssm->active_workers) + while (ssm->active_workers > 0) ; vppcom_app_destroy (); diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 87f29e35311..60d5eb3539a 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -614,13 +614,13 @@ vppcom_app_attach (void) } static int -vppcom_session_unbind (u32 session_index) +vppcom_session_unbind (u32 session_handle) { vcl_worker_t *wrk = vcl_worker_get_current (); vcl_session_t *session = 0; u64 vpp_handle; - session = vcl_session_get (wrk, session_index); + session = vcl_session_get_w_handle (wrk, session_handle); if (!session) return VPPCOM_EBADFD; @@ -630,7 +630,7 @@ vppcom_session_unbind (u32 session_index) session->session_state = STATE_DISCONNECT; VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state" - " 0x%x (%s)", getpid (), vpp_handle, session_index, STATE_DISCONNECT, + " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT, vppcom_session_state_str (STATE_DISCONNECT)); vcl_evt (VCL_EVT_UNBIND, session); vppcom_send_unbind_sock (vpp_handle); @@ -639,7 +639,7 @@ vppcom_session_unbind (u32 session_index) } static int -vppcom_session_disconnect (u32 session_index) +vppcom_session_disconnect (u32 session_handle) { vcl_worker_t *wrk = vcl_worker_get_current (); svm_msg_q_t *vpp_evt_q; @@ -647,18 +647,18 @@ vppcom_session_disconnect (u32 session_index) session_state_t state; u64 vpp_handle; - session = vcl_session_get (wrk, session_index); + session = vcl_session_get_w_handle (wrk, session_handle); vpp_handle = session->vpp_handle; state = session->session_state; VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u state 0x%x (%s)", getpid (), - vpp_handle, session_index, state, vppcom_session_state_str (state)); + vpp_handle, session_handle, state, vppcom_session_state_str (state)); if (PREDICT_FALSE (state & STATE_LISTEN)) { clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: " "Cannot disconnect a listen socket!", - getpid (), vpp_handle, session_index); + getpid (), vpp_handle, session_handle); return VPPCOM_EBADFD; } @@ -668,13 +668,13 @@ vppcom_session_disconnect (u32 session_index) vcl_send_session_disconnected_reply (vpp_evt_q, vcm->my_client_index, vpp_handle, 0); VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect " - "REPLY...", getpid (), vpp_handle, session_index); + "REPLY...", getpid (), vpp_handle, session_handle); } else { VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...", - getpid (), vpp_handle, session_index); - vppcom_send_disconnect_session (vpp_handle, session_index); + getpid (), vpp_handle, session_handle); + vppcom_send_disconnect_session (vpp_handle); } return VPPCOM_OK; @@ -1142,7 +1142,7 @@ handle: vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session, client_session_index); - return client_session_index; + return vcl_session_handle (client_session); } int @@ -1653,6 +1653,12 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, } break; case SESSION_IO_EVT_CT_TX: + if (svm_fifo_is_empty (e->fifo)) + { + svm_fifo_unset_event (e->fifo); + if (svm_fifo_is_empty (e->fifo)) + break; + } session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0); sid = session->session_index; if (sid < n_bits && read_map) @@ -1977,7 +1983,7 @@ vppcom_epoll_create (void) VDBG (0, "VCL<%d>: Created vep_idx %u / sid %u!", getpid (), vep_session->session_index, vep_session->session_index); - return (vep_session->session_index); + return vcl_session_handle (vep_session); } int @@ -2037,7 +2043,8 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle, if (vep_session->vep.next_sh != ~0) { vcl_session_t *next_session; - next_session = vcl_session_get (wrk, vep_session->vep.next_sh); + next_session = vcl_session_get_w_handle (wrk, + vep_session->vep.next_sh); if (PREDICT_FALSE (!next_session)) { clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: Invalid " @@ -2121,7 +2128,7 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle, else { vcl_session_t *prev_session; - prev_session = vcl_session_get (wrk, session->vep.prev_sh); + prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh); if (PREDICT_FALSE (!prev_session)) { clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid " @@ -2135,7 +2142,7 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle, if (session->vep.next_sh != ~0) { vcl_session_t *next_session; - next_session = vcl_session_get (wrk, session->vep.next_sh); + next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh); if (PREDICT_FALSE (!next_session)) { clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid " @@ -2314,7 +2321,7 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, session_events = session->vep.ev.events; break; default: - clib_warning ("unhandled: %u", e->event_type); + VDBG (0, "unhandled: %u", e->event_type); svm_msg_q_free_msg (mq, msg); continue; } diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index b7498d00881..8cf32896631 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -20,6 +20,56 @@ static app_main_t app_main; +static app_listener_t * +app_listener_alloc (application_t * app) +{ + app_listener_t *app_listener; + pool_get (app->listeners, app_listener); + memset (app_listener, 0, sizeof (*app_listener)); + app_listener->al_index = app_listener - app->listeners; + return app_listener; +} + +static app_listener_t * +app_listener_get (application_t * app, u32 app_listener_index) +{ + return pool_elt_at_index (app->listeners, app_listener_index); +} + +static void +app_listener_free (application_t * app, app_listener_t * app_listener) +{ + clib_bitmap_free (app_listener->workers); + pool_put (app->listeners, app_listener); + if (CLIB_DEBUG) + memset (app_listener, 0xfa, sizeof (*app_listener)); +} + +static app_listener_t * +app_local_listener_alloc (application_t * app) +{ + app_listener_t *app_listener; + pool_get (app->local_listeners, app_listener); + memset (app_listener, 0, sizeof (*app_listener)); + app_listener->al_index = app_listener - app->local_listeners; + return app_listener; +} + +static app_listener_t * +app_local_listener_get (application_t * app, u32 app_listener_index) +{ + return pool_elt_at_index (app->local_listeners, app_listener_index); +} + +static void +app_local_listener_free (application_t * app, app_listener_t * app_listener) +{ + clib_bitmap_free (app_listener->workers); + pool_put (app->local_listeners, app_listener); + if (CLIB_DEBUG) + memset (app_listener, 0xfa, sizeof (*app_listener)); +} + static app_worker_map_t * app_worker_map_alloc (application_t * app) { @@ -95,6 +145,16 @@ application_local_session_table (application_t * app) return app_ns->local_table_index; } +static void +application_local_listener_session_endpoint (local_session_t * ll, + session_endpoint_t * sep) +{ + sep->transport_proto = + session_type_transport_proto (ll->listener_session_type); + sep->port = ll->port; + sep->is_ip4 = ll->listener_session_type & 1; +} + int application_api_queue_is_full (application_t * app) { @@ -329,6 +389,9 @@ application_free (application_t * app) { app_worker_map_t *wrk_map; app_worker_t *app_wrk; + u32 table_index; + local_session_t *ll; + session_endpoint_t sep; /* * The app event queue allocated in first segment is cleared with @@ -340,6 +403,10 @@ application_free (application_t * app) if (application_is_proxy (app)) application_remove_proxy (app); + /* + * Free workers + */ + /* *INDENT-OFF* */ pool_flush (wrk_map, app->worker_maps, ({ app_wrk = app_worker_get (wrk_map->wrk_index); @@ -348,6 +415,24 @@ application_free (application_t * app) /* *INDENT-ON* */ pool_free (app->worker_maps); + /* + * Free local listeners. Global table unbinds stop local listeners + * as well, but if we have only local binds, these won't be cleaned up. + * Don't bother with local accepted sessions, we clean them when + * cleaning up the worker. + */ + table_index = application_local_session_table (app); + /* *INDENT-OFF* */ + pool_foreach (ll, app->local_listen_sessions, ({ + application_local_listener_session_endpoint (ll, &sep); + session_lookup_del_session_endpoint (table_index, &sep); + })); + /* *INDENT-ON* */ + pool_free (app->local_listen_sessions); + + /* + * Cleanup remaining state + */ application_table_del (app); vec_free (app->name); vec_free (app->tls_cert); @@ -371,6 +456,29 @@ application_get_default_worker (application_t * app) return application_get_worker (app, 0); } +app_worker_t * +application_listener_select_worker (stream_session_t * ls, u8 is_local) +{ + app_listener_t *app_listener; + application_t *app; + u32 wrk_index; + + app = application_get (ls->app_index); + if (!is_local) + app_listener = app_listener_get (app, ls->listener_db_index); + else + app_listener = app_local_listener_get (app, ls->listener_db_index); + + wrk_index = clib_bitmap_next_set (app_listener->workers, + app_listener->accept_rotor + 1); + if (wrk_index == ~0) + wrk_index = clib_bitmap_first_set (app_listener->workers); + + ASSERT (wrk_index != ~0); + app_listener->accept_rotor = wrk_index; + return application_get_worker (app, wrk_index); +} + app_worker_t * app_worker_alloc (application_t * app) { @@ -427,7 +535,7 @@ app_worker_free (app_worker_t * app_wrk) for (i = 0; i < vec_len (handles); i++) { a->app_index = app->app_index; - a->app_wrk_index = app_wrk->wrk_map_index; + a->wrk_map_index = app_wrk->wrk_map_index; a->handle = handles[i]; /* seg manager is removed when unbind completes */ vnet_unbind (a); @@ -461,7 +569,7 @@ app_worker_free (app_worker_t * app_wrk) /* * Local sessions */ - application_local_sessions_free (app_wrk); + app_worker_local_sessions_free (app_wrk); pool_put (app_main.workers, app_wrk); if (CLIB_DEBUG) @@ -516,8 +624,18 @@ app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk) return 0; } +application_t * +app_worker_get_app (u32 wrk_index) +{ + app_worker_t *app_wrk; + app_wrk = app_worker_get_if_valid (wrk_index); + if (!app_wrk) + return 0; + return application_get_if_valid (app_wrk->app_index); +} + static segment_manager_t * -application_alloc_segment_manager (app_worker_t * app_wrk) +app_worker_alloc_segment_manager (app_worker_t * app_wrk) { segment_manager_t *sm = 0; @@ -536,6 +654,59 @@ application_alloc_segment_manager (app_worker_t * app_wrk) return sm; } +int +app_worker_start_listen (app_worker_t * app_wrk, stream_session_t * ls) +{ + segment_manager_t *sm; + + /* Allocate segment manager. All sessions derived out of a listen session + * have fifos allocated by the same segment manager. */ + if (!(sm = app_worker_alloc_segment_manager (app_wrk))) + return -1; + + /* Add to app's listener table. Useful to find all child listeners + * when app goes down, although, just for unbinding this is not needed */ + hash_set (app_wrk->listeners_table, listen_session_get_handle (ls), + segment_manager_index (sm)); + + if (!ls->server_rx_fifo + && session_transport_service_type (ls) == TRANSPORT_SERVICE_CL) + { + if (session_alloc_fifos (sm, ls)) + return -1; + } + return 0; +} + +int +app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle) +{ + segment_manager_t *sm; + uword *sm_indexp; + + sm_indexp = hash_get (app_wrk->listeners_table, handle); + if (PREDICT_FALSE (!sm_indexp)) + { + clib_warning ("listener handle was removed %llu!", handle); + return -1; + } + + sm = segment_manager_get (*sm_indexp); + if (app_wrk->first_segment_manager == *sm_indexp) + { + /* Delete sessions but don't remove segment manager */ + app_wrk->first_segment_manager_in_use = 0; + segment_manager_del_sessions (sm); + } + else + { + segment_manager_init_del (sm); + } + hash_unset (app_wrk->listeners_table, handle); + + return 0; +} + /** * Start listening local transport endpoint for requested transport. * @@ -544,40 +715,75 @@ application_alloc_segment_manager (app_worker_t * app_wrk) * it's own specific listening connection. */ int -app_worker_start_listen (app_worker_t * app_wrk, session_endpoint_t * sep, - session_handle_t * res) +application_start_listen (application_t * app, + session_endpoint_extended_t * sep_ext, + session_handle_t * res) { - segment_manager_t *sm; - stream_session_t *s; - session_handle_t handle; + app_listener_t *app_listener; + u32 table_index, fib_proto; + session_endpoint_t *sep; + app_worker_t *app_wrk; + stream_session_t *ls; + session_handle_t lh; session_type_t sst; - sst = session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4); - s = listen_session_new (0, sst); - s->app_wrk_index = app_wrk->wrk_index; + /* + * Check if sep is already listened on + */ + sep = (session_endpoint_t *) sep_ext; + fib_proto = session_endpoint_fib_proto (sep); + table_index = application_session_table (app, fib_proto); + lh = session_lookup_endpoint_listener (table_index, sep, 1); + if (lh != SESSION_INVALID_HANDLE) + { + ls = listen_session_get_from_handle (lh); + if (ls->app_index != app->app_index) + return VNET_API_ERROR_ADDRESS_IN_USE; - /* Allocate segment manager. All sessions derived out of a listen session - * have fifos allocated by the same segment manager. */ - if (!(sm = application_alloc_segment_manager (app_wrk))) - goto err; + app_wrk = app_worker_get (sep_ext->app_wrk_index); + if (ls->app_wrk_index == app_wrk->wrk_index) + return VNET_API_ERROR_ADDRESS_IN_USE; - /* Add to app's listener table. Useful to find all child listeners - * when app goes down, although, just for unbinding this is not needed */ - handle = listen_session_get_handle (s); - hash_set (app_wrk->listeners_table, handle, segment_manager_index (sm)); + if (app_worker_start_listen (app_wrk, ls)) + return -1; - if (stream_session_listen (s, sep)) - { - segment_manager_del (sm); - hash_unset (app_wrk->listeners_table, handle); - goto err; + app_listener = app_listener_get (app, ls->listener_db_index); + app_listener->workers = clib_bitmap_set (app_listener->workers, + app_wrk->wrk_map_index, 1); + + *res = listen_session_get_handle (ls); + return 0; } - *res = handle; + /* + * Allocate new listener for application + */ + sst = session_type_from_proto_and_ip (sep_ext->transport_proto, + sep_ext->is_ip4); + ls = listen_session_new (0, sst); + ls->app_index = app->app_index; + + if (session_listen (ls, sep_ext)) + goto err; + + app_listener = app_listener_alloc (app); + ls->listener_db_index = app_listener->al_index; + + /* + * Setup app worker as a listener + */ + app_wrk = app_worker_get (sep_ext->app_wrk_index); + ls->app_wrk_index = app_wrk->wrk_index; + if (app_worker_start_listen (app_wrk, ls)) + goto err; + app_listener->workers = clib_bitmap_set (app_listener->workers, + app_wrk->wrk_map_index, 1); + + *res = listen_session_get_handle (ls); return 0; err: - listen_session_del (s); + listen_session_del (ls); return -1; } @@ -585,48 +791,43 @@ err: * Stop listening on session associated to handle * * @param handle listener handle - * @param app_index index of the app owning the handle. This is used - * only for validating ownership + * @param app_index index of the app owning the handle. + * @param app_wrk_index index of the worker requesting the stop */ int -app_worker_stop_listen (session_handle_t handle, u32 app_index) +application_stop_listen (u32 app_index, u32 app_wrk_index, + session_handle_t handle) { + app_listener_t *app_listener; stream_session_t *listener; - segment_manager_t *sm; app_worker_t *app_wrk; - uword *indexp; + application_t *app; listener = listen_session_get_from_handle (handle); - app_wrk = app_worker_get (listener->app_wrk_index); - if (PREDICT_FALSE (!app_wrk || app_wrk->app_index != app_index)) + app = application_get (app_index); + if (PREDICT_FALSE (!app || app->app_index != listener->app_index)) { clib_warning ("app doesn't own handle %llu!", handle); return -1; } - if (PREDICT_FALSE (hash_get (app_wrk->listeners_table, handle) == 0)) + + app_listener = app_listener_get (app, listener->listener_db_index); + if (!clib_bitmap_get (app_listener->workers, app_wrk_index)) { - clib_warning ("listener handle was removed %llu!", handle); - return -1; + clib_warning ("worker not listening on handle %lu", handle); + return 0; } - stream_session_stop_listen (listener); - - indexp = hash_get (app_wrk->listeners_table, handle); - ASSERT (indexp); + app_wrk = application_get_worker (app, app_wrk_index); + app_worker_stop_listen (app_wrk, handle); + clib_bitmap_set_no_check (app_listener->workers, app_wrk_index, 0); - sm = segment_manager_get (*indexp); - if (app_wrk->first_segment_manager == *indexp) - { - /* Delete sessions but don't remove segment manager */ - app_wrk->first_segment_manager_in_use = 0; - segment_manager_del_sessions (sm); - } - else + if (clib_bitmap_is_zero (app_listener->workers)) { - segment_manager_init_del (sm); + session_stop_listen (listener); + app_listener_free (app, app_listener); + listen_session_del (listener); } - hash_unset (app_wrk->listeners_table, handle); - listen_session_del (listener); return 0; } @@ -653,7 +854,7 @@ app_worker_alloc_connects_segment_manager (app_worker_t * app_wrk) if (app_wrk->connects_seg_manager == APP_INVALID_SEGMENT_MANAGER_INDEX) { - sm = application_alloc_segment_manager (app_wrk); + sm = app_worker_alloc_segment_manager (app_wrk); if (sm == 0) return -1; app_wrk->connects_seg_manager = segment_manager_index (sm); @@ -702,6 +903,7 @@ vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a) a->segment = &fs->ssvm; segment_manager_segment_reader_unlock (sm); a->evt_q = app_wrk->event_queue; + a->wrk_index = app_wrk->wrk_map_index; } else { @@ -808,7 +1010,7 @@ app_worker_first_listener (app_worker_t * app, u8 fib_proto, hash_foreach (handle, sm_index, app->listeners_table, ({ listener = listen_session_get_from_handle (handle); if (listener->session_type == sst - && listener->listener_index != SESSION_PROXY_LISTENER_INDEX) + && listener->enqueue_epoch != SESSION_PROXY_LISTENER_INDEX) return listener; })); /* *INDENT-ON* */ @@ -838,7 +1040,7 @@ application_proxy_listener (app_worker_t * app, u8 fib_proto, hash_foreach (handle, sm_index, app->listeners_table, ({ listener = listen_session_get_from_handle (handle); if (listener->session_type == sst - && listener->listener_index == SESSION_PROXY_LISTENER_INDEX) + && listener->enqueue_epoch == SESSION_PROXY_LISTENER_INDEX) return listener; })); /* *INDENT-ON* */ @@ -852,7 +1054,7 @@ application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto, { app_namespace_t *app_ns = app_namespace_get (app->ns_index); u8 is_ip4 = (fib_proto == FIB_PROTOCOL_IP4); - session_endpoint_t sep = SESSION_ENDPOINT_NULL; + session_endpoint_extended_t sep = SESSION_ENDPOINT_EXT_NULL; transport_connection_t *tc; app_worker_t *app_wrk; stream_session_t *s; @@ -869,9 +1071,10 @@ application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto, sep.fib_index = app_namespace_get_fib_index (app_ns, fib_proto); sep.sw_if_index = app_ns->sw_if_index; sep.transport_proto = transport_proto; - app_worker_start_listen (app_wrk, &sep, &handle); + sep.app_wrk_index = app_wrk->wrk_index; /* only default */ + application_start_listen (app, &sep, &handle); s = listen_session_get_from_handle (handle); - s->listener_index = SESSION_PROXY_LISTENER_INDEX; + s->enqueue_epoch = SESSION_PROXY_LISTENER_INDEX; } } else @@ -891,9 +1094,12 @@ application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto, sep.port = 0; sti = session_lookup_get_index_for_fib (fib_proto, sep.fib_index); if (is_start) - session_lookup_add_session_endpoint (sti, &sep, s->session_index); + session_lookup_add_session_endpoint (sti, + (session_endpoint_t *) & sep, + s->session_index); else - session_lookup_del_session_endpoint (sti, &sep); + session_lookup_del_session_endpoint (sti, + (session_endpoint_t *) & sep); } return 0; @@ -1116,33 +1322,33 @@ static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = { * flag is set, we do wait for queue mutex. */ int -application_send_event (app_worker_t * app, stream_session_t * s, u8 evt_type) +app_worker_send_event (app_worker_t * app, stream_session_t * s, u8 evt_type) { ASSERT (app && evt_type <= FIFO_EVENT_APP_TX); return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ ); } int -application_lock_and_send_event (app_worker_t * app, stream_session_t * s, - u8 evt_type) +app_worker_lock_and_send_event (app_worker_t * app, stream_session_t * s, + u8 evt_type) { return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ ); } local_session_t * -application_alloc_local_session (app_worker_t * app) +application_local_session_alloc (app_worker_t * app_wrk) { local_session_t *s; - pool_get (app->local_sessions, s); + pool_get (app_wrk->local_sessions, s); memset (s, 0, sizeof (*s)); - s->app_wrk_index = app->app_index; - s->session_index = s - app->local_sessions; + s->app_wrk_index = app_wrk->wrk_index; + s->session_index = s - app_wrk->local_sessions; s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0); return s; } void -application_free_local_session (app_worker_t * app, local_session_t * s) +application_local_session_free (app_worker_t * app, local_session_t * s) { pool_put (app->local_sessions, s); if (CLIB_DEBUG) @@ -1170,54 +1376,79 @@ application_get_local_session_from_handle (session_handle_t handle) } local_session_t * -application_get_local_listen_session_from_handle (session_handle_t lh) +application_local_listen_session_alloc (application_t * app) { - u32 ll_index, server_wrk_index; - app_worker_t *server_wrk; + local_session_t *ll; + pool_get (app->local_listen_sessions, ll); + memset (ll, 0, sizeof (*ll)); + return ll; +} - local_session_parse_handle (lh, &server_wrk_index, &ll_index); - server_wrk = app_worker_get (server_wrk_index); - return application_get_local_listen_session (server_wrk, ll_index); +u32 +application_local_listener_index (application_t * app, local_session_t * ll) +{ + return (ll - app->local_listen_sessions); } -always_inline void -application_local_listener_session_endpoint (local_session_t * ll, - session_endpoint_t * sep) +void +application_local_listen_session_free (application_t * app, + local_session_t * ll) { - sep->transport_proto = - session_type_transport_proto (ll->listener_session_type); - sep->port = ll->port; - sep->is_ip4 = ll->listener_session_type & 1; + pool_put (app->local_listen_sessions, ll); + if (CLIB_DEBUG) + memset (ll, 0xfb, sizeof (*ll)); } int -application_start_local_listen (app_worker_t * app_wrk, - session_endpoint_t * sep, +application_start_local_listen (application_t * app, + session_endpoint_extended_t * sep_ext, session_handle_t * handle) { + app_listener_t *app_listener; + session_endpoint_t *sep; + app_worker_t *app_wrk; session_handle_t lh; local_session_t *ll; - application_t *app; u32 table_index; - app = application_get (app_wrk->app_index); + sep = (session_endpoint_t *) sep_ext; table_index = application_local_session_table (app); + app_wrk = app_worker_get (sep_ext->app_wrk_index); /* An exact sep match, as opposed to session_lookup_local_listener */ lh = session_lookup_endpoint_listener (table_index, sep, 1); if (lh != SESSION_INVALID_HANDLE) - return VNET_API_ERROR_ADDRESS_IN_USE; + { + ll = application_get_local_listener_w_handle (lh); + if (ll->app_index != app->app_index) + return VNET_API_ERROR_ADDRESS_IN_USE; - pool_get (app_wrk->local_listen_sessions, ll); - memset (ll, 0, sizeof (*ll)); + if (ll->app_wrk_index == app_wrk->wrk_index) + return VNET_API_ERROR_ADDRESS_IN_USE; + + app_listener = app_local_listener_get (app, ll->listener_db_index); + app_listener->workers = clib_bitmap_set (app_listener->workers, + app_wrk->wrk_map_index, 1); + *handle = application_local_session_handle (ll); + return 0; + } + + ll = application_local_listen_session_alloc (app); ll->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0); ll->app_wrk_index = app_wrk->app_index; - ll->session_index = ll - app_wrk->local_listen_sessions; - ll->port = sep->port; + ll->session_index = application_local_listener_index (app, ll); + ll->port = sep_ext->port; /* Store the original session type for the unbind */ ll->listener_session_type = - session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4); + session_type_from_proto_and_ip (sep_ext->transport_proto, + sep_ext->is_ip4); ll->transport_listener_index = ~0; + ll->app_index = app->app_index; + + app_listener = app_local_listener_alloc (app); + ll->listener_db_index = app_listener->al_index; + app_listener->workers = clib_bitmap_set (app_listener->workers, + app_wrk->wrk_map_index, 1); *handle = application_local_session_handle (ll); session_lookup_add_session_endpoint (table_index, sep, *handle); @@ -1231,10 +1462,12 @@ application_start_local_listen (app_worker_t * app_wrk, * so parse it. */ int -application_stop_local_listen (session_handle_t lh, u32 app_index) +application_stop_local_listen (u32 app_index, u32 wrk_map_index, + session_handle_t lh) { session_endpoint_t sep = SESSION_ENDPOINT_NULL; - u32 table_index, ll_index, server_wrk_index; + u32 table_index, ll_index, server_index; + app_listener_t *app_listener; app_worker_t *server_wrk; stream_session_t *sl = 0; local_session_t *ll, *ls; @@ -1259,28 +1492,44 @@ application_stop_local_listen (session_handle_t lh, u32 app_index) return -1; } - local_session_parse_handle (lh, &server_wrk_index, &ll_index); - server_wrk = app_worker_get (server_wrk_index); - if (PREDICT_FALSE (server_wrk->app_index != app_index)) + local_session_parse_handle (lh, &server_index, &ll_index); + if (PREDICT_FALSE (server_index != app_index)) { clib_warning ("app %u does not own local handle 0x%lx", app_index, lh); + return -1; } - ll = application_get_local_listen_session (server_wrk, ll_index); + + ll = application_get_local_listen_session (server, ll_index); if (PREDICT_FALSE (!ll)) { clib_warning ("no local listener"); return -1; } - application_local_listener_session_endpoint (ll, &sep); - session_lookup_del_session_endpoint (table_index, &sep); + app_listener = app_local_listener_get (server, ll->listener_db_index); + if (!clib_bitmap_get (app_listener->workers, wrk_map_index)) + { + clib_warning ("app wrk %u not listening on handle %lu", wrk_map_index, + lh); + return -1; + } + + server_wrk = application_get_worker (server, wrk_map_index); /* *INDENT-OFF* */ pool_foreach (ls, server_wrk->local_sessions, ({ if (ls->listener_index == ll->session_index) application_local_session_disconnect (server_wrk->app_index, ls); })); /* *INDENT-ON* */ - pool_put_index (server_wrk->local_listen_sessions, ll->session_index); + + clib_bitmap_set_no_check (app_listener->workers, wrk_map_index, 0); + if (clib_bitmap_is_zero (app_listener->workers)) + { + app_local_listener_free (server, app_listener); + application_local_listener_session_endpoint (ll, &sep); + session_lookup_del_session_endpoint (table_index, &sep); + application_local_listen_session_free (server, ll); + } return 0; } @@ -1317,8 +1566,7 @@ application_local_session_connect (app_worker_t * client_wrk, local_session_t *ls; svm_msg_q_t *sq, *cq; - ls = application_alloc_local_session (server_wrk); - + ls = application_local_session_alloc (server_wrk); server = application_get (server_wrk->app_index); client = application_get (client_wrk->app_index); @@ -1380,6 +1628,7 @@ application_local_session_connect (app_worker_t * client_wrk, ls->client_wrk_index = client_wrk->wrk_index; ls->client_opaque = opaque; ls->listener_session_type = ll->session_type; + ls->session_state = SESSION_STATE_READY; if ((rv = server->cb_fns.add_segment_callback (server->api_client_index, &seg->ssvm))) @@ -1495,24 +1744,41 @@ application_local_session_cleanup (app_worker_t * client_wrk, segment_manager_del_segment (sm, seg); } - application_free_local_session (server_wrk, ls); + application_local_session_free (server_wrk, ls); return 0; } int -application_local_session_disconnect (u32 app_wrk_index, local_session_t * ls) +application_local_session_disconnect (u32 app_index, local_session_t * ls) { app_worker_t *client_wrk, *server_wrk; + u8 is_server = 0, is_client = 0; + application_t *app; + + app = application_get_if_valid (app_index); + if (!app) + return 0; client_wrk = app_worker_get_if_valid (ls->client_wrk_index); server_wrk = app_worker_get (ls->app_wrk_index); + if (server_wrk->app_index == app_index) + is_server = 1; + else if (client_wrk && client_wrk->app_index == app_index) + is_client = 1; + + if (!is_server && !is_client) + { + clib_warning ("app %u is neither client nor server for session 0x%lx", + app_index, application_local_session_handle (ls)); + return VNET_API_ERROR_INVALID_VALUE; + } if (ls->session_state == SESSION_STATE_CLOSED) return application_local_session_cleanup (client_wrk, server_wrk, ls); - if (app_wrk_index == ls->client_wrk_index) + if (app_index == ls->client_wrk_index) { mq_send_local_session_disconnected_cb (ls->app_wrk_index, ls); } @@ -1556,29 +1822,15 @@ application_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index) } void -application_local_sessions_free (app_worker_t * app_wrk) +app_worker_local_sessions_free (app_worker_t * app_wrk) { - u32 index, server_wrk_index, session_index, table_index; - segment_manager_t *sm; + u32 index, server_wrk_index, session_index; u64 handle, *handles = 0; - local_session_t *ls, *ll; app_worker_t *server_wrk; - session_endpoint_t sep; - application_t *app; + segment_manager_t *sm; + local_session_t *ls; int i; - /* - * Local listens. Don't bother with local sessions, we clean them lower - */ - app = application_get (app_wrk->app_index); - table_index = application_local_session_table (app); - /* *INDENT-OFF* */ - pool_foreach (ll, app_wrk->local_listen_sessions, ({ - application_local_listener_session_endpoint (ll, &sep); - session_lookup_del_session_endpoint (table_index, &sep); - })); - /* *INDENT-ON* */ - /* * Local sessions */ @@ -1657,10 +1909,10 @@ format_app_worker_listener (u8 * s, va_list * args) if (!app_wrk) { if (verbose) - s = format (s, "%-40s%-25s%-15s%-15s%-10s", "Connection", "App", - "API Client", "ListenerID", "SegManager"); + s = format (s, "%-40s%-25s%=10s%-15s%-15s%-10s", "Connection", "App", + "Wrk", "API Client", "ListenerID", "SegManager"); else - s = format (s, "%-40s%-25s", "Connection", "App"); + s = format (s, "%-40s%-25s%=10s", "Connection", "App", "Wrk"); return s; } @@ -1672,11 +1924,12 @@ format_app_worker_listener (u8 * s, va_list * args) if (verbose) { - s = format (s, "%-40s%-25s%-15u%-15u%-10u", str, app_name, - app->api_client_index, handle, sm_index); + s = format (s, "%-40s%-25s%=10u%-15u%-15u%-10u", str, app_name, + app_wrk->wrk_map_index, app->api_client_index, handle, + sm_index); } else - s = format (s, "%-40s%-25s", str, app_name); + s = format (s, "%-40s%-25s%=10u", str, app_name, app_wrk->wrk_map_index); vec_free (app_name); return s; @@ -1817,13 +2070,6 @@ app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose) return; /* *INDENT-OFF* */ - pool_foreach (ls, app_wrk->local_listen_sessions, ({ - tp = session_type_transport_proto(ls->listener_session_type); - conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, - ls->port); - vlib_cli_output (vm, "%-40v%-15u%-20s", conn, ls->app_wrk_index, "*"); - vec_reset_length (conn); - })); pool_foreach (ls, app_wrk->local_sessions, ({ tp = session_type_transport_proto(ls->listener_session_type); conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, @@ -1840,8 +2086,12 @@ app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose) static void application_format_local_sessions (application_t * app, int verbose) { + vlib_main_t *vm = vlib_get_main (); app_worker_map_t *wrk_map; app_worker_t *app_wrk; + transport_proto_t tp; + local_session_t *ls; + u8 *conn = 0; if (!app) { @@ -1849,6 +2099,23 @@ application_format_local_sessions (application_t * app, int verbose) return; } + /* + * Format local listeners + */ + + /* *INDENT-OFF* */ + pool_foreach (ls, app->local_listen_sessions, ({ + tp = session_type_transport_proto (ls->listener_session_type); + conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, + ls->port); + vlib_cli_output (vm, "%-40v%-15u%-20s", conn, ls->app_wrk_index, "*"); + vec_reset_length (conn); + })); + /* *INDENT-ON* */ + + /* + * Format local accepted/connected sessions + */ /* *INDENT-OFF* */ pool_foreach (wrk_map, app->worker_maps, ({ app_wrk = app_worker_get (wrk_map->wrk_index); diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index de609d2d7a0..e83b7a6ba6e 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -60,6 +60,8 @@ typedef struct _stream_session_cb_vft typedef struct app_worker_ { + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); + /** Worker index in global worker pool*/ u32 wrk_index; @@ -93,9 +95,6 @@ typedef struct app_worker_ /** Segment manager used for incoming "cut through" connects */ u32 local_segment_manager; - /** Pool of local listen sessions */ - local_session_t *local_listen_sessions; - /** Pool of local sessions the app owns (as a server) */ local_session_t *local_sessions; @@ -110,6 +109,13 @@ typedef struct app_worker_map_ u32 wrk_index; } app_worker_map_t; +typedef struct app_listener_ +{ + clib_bitmap_t *workers; /**< workers accepting connections */ + u32 accept_rotor; /**< last worker to accept a connection */ + u32 al_index; +} app_listener_t; + typedef struct application_ { /** App index in app pool */ @@ -138,6 +144,15 @@ typedef struct application_ u16 proxied_transports; + /** Pool of listeners for the app */ + app_listener_t *listeners; + + /** Pool of local listeners for app */ + app_listener_t *local_listeners; + + /** Pool of local listen sessions */ + local_session_t *local_listen_sessions; + /* * TLS Specific */ @@ -150,6 +165,7 @@ typedef struct application_ /** Preferred tls engine */ u8 tls_engine; + } application_t; typedef struct app_main_ @@ -207,11 +223,8 @@ app_worker_t *app_worker_alloc (application_t * app); int app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk); app_worker_t *app_worker_get (u32 wrk_index); app_worker_t *app_worker_get_if_valid (u32 wrk_index); +application_t *app_worker_get_app (u32 wrk_index); void app_worker_free (app_worker_t * app_wrk); -int app_worker_start_listen (app_worker_t * app, - session_endpoint_t * tep, - session_handle_t * handle); -int app_worker_stop_listen (session_handle_t handle, u32 app_wrk_index); int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep, u32 api_context); segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *, @@ -224,8 +237,17 @@ stream_session_t *app_worker_first_listener (app_worker_t * app, u8 fib_proto, u8 transport_proto); u8 app_worker_application_is_builtin (app_worker_t * app_wrk); +int app_worker_send_event (app_worker_t * app, stream_session_t * s, u8 evt); +int app_worker_lock_and_send_event (app_worker_t * app, stream_session_t * s, + u8 evt_type); clib_error_t *vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a); +int application_start_listen (application_t * app, + session_endpoint_extended_t * tep, + session_handle_t * handle); +int application_stop_listen (u32 app_index, u32 app_wrk_index, + session_handle_t handle); + application_t *application_alloc (void); int application_alloc_and_init (app_init_args_t * args); void application_free (application_t * app); @@ -236,8 +258,10 @@ application_t *application_lookup_name (const u8 * name); u32 application_index (application_t * app); app_worker_t *application_get_worker (application_t * app, u32 wrk_index); app_worker_t *application_get_default_worker (application_t * app); -int application_api_queue_is_full (application_t * app); +app_worker_t *application_listener_select_worker (stream_session_t * ls, + u8 is_local); +int application_api_queue_is_full (application_t * app); int application_is_proxy (application_t * app); int application_is_builtin (application_t * app); @@ -252,12 +276,17 @@ void application_setup_proxy (application_t * app); void application_remove_proxy (application_t * app); segment_manager_properties_t *application_get_segment_manager_properties (u32 - app_wrk_index); + app_index); + segment_manager_properties_t * application_segment_manager_properties (application_t * app); -local_session_t *application_alloc_local_session (app_worker_t * app); -void application_free_local_session (app_worker_t * app, +/* + * Local session + */ + +local_session_t *application_local_session_alloc (app_worker_t * app); +void application_local_session_free (app_worker_t * app, local_session_t * ls); local_session_t *application_get_local_session (app_worker_t * app, u32 session_index); @@ -265,29 +294,39 @@ local_session_t *application_get_local_session_from_handle (session_handle_t handle); local_session_t * application_get_local_listen_session_from_handle (session_handle_t lh); -int application_start_local_listen (app_worker_t * server, - session_endpoint_t * sep, +int application_start_local_listen (application_t * server, + session_endpoint_extended_t * sep, session_handle_t * handle); -int application_stop_local_listen (session_handle_t lh, u32 app_wrk_index); +int application_stop_local_listen (u32 app_index, u32 app_wrk_index, + session_handle_t lh); int application_local_session_connect (app_worker_t * client, app_worker_t * server, - local_session_t * ll, u32 opaque); + local_session_t * ls, u32 opaque); int application_local_session_connect_notify (local_session_t * ls); int application_local_session_disconnect (u32 app_wrk_index, local_session_t * ls); int application_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index); -void application_local_sessions_free (app_worker_t * app); - -int application_send_event (app_worker_t * app, stream_session_t * s, u8 evt); -int application_lock_and_send_event (app_worker_t * app, - stream_session_t * s, u8 evt_type); +void app_worker_local_sessions_free (app_worker_t * app); always_inline u32 -local_session_id (local_session_t * ll) +local_session_id (local_session_t * ls) { - ASSERT (ll->app_wrk_index < (2 << 16) && ll->session_index < (2 << 16)); - return ((u32) ll->app_wrk_index << 16 | (u32) ll->session_index); + ASSERT (ls->session_index < (2 << 16)); + u32 app_or_wrk_index; + + if (ls->session_state == SESSION_STATE_LISTENING) + { + ASSERT (ls->app_index < (2 << 16)); + app_or_wrk_index = ls->app_index; + } + else + { + ASSERT (ls->app_wrk_index < (2 << 16)); + app_or_wrk_index = ls->app_wrk_index; + } + + return ((u32) app_or_wrk_index << 16 | (u32) ls->session_index); } always_inline void @@ -315,7 +354,7 @@ application_local_session_handle (local_session_t * ls) } always_inline local_session_t * -application_get_local_listen_session (app_worker_t * app, u32 session_index) +application_get_local_listen_session (application_t * app, u32 session_index) { return pool_elt_at_index (app->local_listen_sessions, session_index); } @@ -323,10 +362,10 @@ application_get_local_listen_session (app_worker_t * app, u32 session_index) always_inline local_session_t * application_get_local_listener_w_handle (session_handle_t handle) { - u32 server_wrk_index, session_index; - app_worker_t *app; - local_session_parse_handle (handle, &server_wrk_index, &session_index); - app = app_worker_get (server_wrk_index); + u32 server_index, session_index; + application_t *app; + local_session_parse_handle (handle, &server_index, &session_index); + app = application_get (server_index); return application_get_local_listen_session (app, session_index); } diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c index 2e631363016..ab87def4062 100644 --- a/src/vnet/session/application_interface.c +++ b/src/vnet/session/application_interface.c @@ -147,11 +147,10 @@ session_endpoint_update_for_app (session_endpoint_t * sep, } } -static int -vnet_bind_i (vnet_bind_args_t * a) +static inline int +vnet_bind_inline (vnet_bind_args_t * a) { - u64 lh, ll_handle = SESSION_INVALID_HANDLE; - u32 table_index, fib_proto; + u64 ll_handle = SESSION_INVALID_HANDLE; app_worker_t *app_wrk; application_t *app; int rv; @@ -163,17 +162,12 @@ vnet_bind_i (vnet_bind_args_t * a) return VNET_API_ERROR_APPLICATION_NOT_ATTACHED; } app_wrk = application_get_worker (app, a->wrk_map_index); + a->sep_ext.app_wrk_index = app_wrk->wrk_index; session_endpoint_update_for_app (&a->sep, app); if (!session_endpoint_in_ns (&a->sep)) return VNET_API_ERROR_INVALID_VALUE_2; - fib_proto = session_endpoint_fib_proto (&a->sep); - table_index = application_session_table (app, fib_proto); - lh = session_lookup_endpoint_listener (table_index, &a->sep, 1); - if (lh != SESSION_INVALID_HANDLE) - return VNET_API_ERROR_ADDRESS_IN_USE; - /* * Add session endpoint to local session table. Only binds to "inaddr_any" * (i.e., zero address) are added to local scope table. @@ -181,8 +175,8 @@ vnet_bind_i (vnet_bind_args_t * a) if (application_has_local_scope (app) && session_endpoint_is_local (&a->sep)) { - if ((rv = - application_start_local_listen (app_wrk, &a->sep, &a->handle))) + if ((rv = application_start_local_listen (app, &a->sep_ext, + &a->handle))) return rv; ll_handle = a->handle; } @@ -195,13 +189,17 @@ vnet_bind_i (vnet_bind_args_t * a) */ /* Setup listen path down to transport */ - rv = app_worker_start_listen (app_wrk, &a->sep, &a->handle); + rv = application_start_listen (app, &a->sep_ext, &a->handle); if (rv && ll_handle != SESSION_INVALID_HANDLE) - session_lookup_del_session_endpoint (table_index, &a->sep); + { + application_stop_local_listen (a->app_index, a->wrk_map_index, + ll_handle); + return rv; + } /* * Store in local table listener the index of the transport layer - * listener. We'll need local listeners are hit and we need to + * listener. We'll need if if local listeners are hit and we need to * return global handle */ if (ll_handle != SESSION_INVALID_HANDLE) @@ -210,18 +208,19 @@ vnet_bind_i (vnet_bind_args_t * a) stream_session_t *tl; ll = application_get_local_listener_w_handle (ll_handle); tl = listen_session_get_from_handle (a->handle); - ll->transport_listener_index = tl->session_index; + if (ll->transport_listener_index == ~0) + ll->transport_listener_index = tl->session_index; } return rv; } -int -vnet_unbind_i (u32 app_index, session_handle_t handle) +static inline int +vnet_unbind_inline (vnet_unbind_args_t * a) { application_t *app; int rv; - if (!(app = application_get_if_valid (app_index))) + if (!(app = application_get_if_valid (a->app_index))) { SESSION_DBG ("app (%d) not attached", wrk_map_index); return VNET_API_ERROR_APPLICATION_NOT_ATTACHED; @@ -229,7 +228,8 @@ vnet_unbind_i (u32 app_index, session_handle_t handle) if (application_has_local_scope (app)) { - if ((rv = application_stop_local_listen (handle, app_index))) + if ((rv = application_stop_local_listen (a->app_index, + a->wrk_map_index, a->handle))) return rv; } @@ -237,7 +237,8 @@ vnet_unbind_i (u32 app_index, session_handle_t handle) * Clear the global scope table of the listener */ if (application_has_global_scope (app)) - return app_worker_stop_listen (handle, app_index); + return application_stop_listen (a->app_index, a->wrk_map_index, + a->handle); return 0; } @@ -247,7 +248,7 @@ application_connect (vnet_connect_args_t * a) app_worker_t *server_wrk, *client_wrk; u32 table_index, server_index, li; stream_session_t *listener; - application_t *client; + application_t *client, *server; local_session_t *ll; u8 fib_proto; u64 lh; @@ -283,8 +284,11 @@ application_connect (vnet_connect_args_t * a) */ if (server_index != a->app_index) { - server_wrk = app_worker_get (server_index); - ll = application_get_local_listen_session (server_wrk, li); + server = application_get (server_index); + ll = application_get_local_listen_session (server, li); + listener = (stream_session_t *) ll; + server_wrk = application_listener_select_worker (listener, + 1 /* is_local */ ); return application_local_session_connect (client_wrk, server_wrk, ll, a->api_context); @@ -308,13 +312,11 @@ global_scope: listener = session_lookup_listener (table_index, &a->sep); if (listener) { - server_wrk = app_worker_get (listener->app_wrk_index); - if (server_wrk) - { - ll = (local_session_t *) listener; - return application_local_session_connect (client_wrk, server_wrk, - ll, a->api_context); - } + server_wrk = application_listener_select_worker (listener, + 0 /* is_local */ ); + ll = (local_session_t *) listener; + return application_local_session_connect (client_wrk, server_wrk, ll, + a->api_context); } /* @@ -531,7 +533,7 @@ vnet_bind_uri (vnet_bind_args_t * a) if (rv) return rv; clib_memcpy (&a->sep_ext, &sep, sizeof (sep)); - return vnet_bind_i (a); + return vnet_bind_inline (a); } int @@ -553,8 +555,8 @@ vnet_unbind_uri (vnet_unbind_args_t * a) (session_endpoint_t *) & sep); if (!listener) return VNET_API_ERROR_ADDRESS_NOT_IN_USE; - - return vnet_unbind_i (a->app_index, listen_session_get_handle (listener)); + a->handle = listen_session_get_handle (listener); + return vnet_unbind_inline (a); } clib_error_t * @@ -592,13 +594,6 @@ vnet_disconnect_session (vnet_disconnect_args_t * a) if (!(ls = application_get_local_session_from_handle (a->handle))) return 0; - if (ls->app_wrk_index != a->app_index - && ls->client_wrk_index != a->app_index) - { - clib_warning ("app %u is neither client nor server for session %u", - a->app_index, a->app_index); - return VNET_API_ERROR_INVALID_VALUE; - } return application_local_session_disconnect (a->app_index, ls); } else @@ -620,7 +615,7 @@ clib_error_t * vnet_bind (vnet_bind_args_t * a) { int rv; - if ((rv = vnet_bind_i (a))) + if ((rv = vnet_bind_inline (a))) return clib_error_return_code (0, rv, 0, "bind failed: %d", rv); return 0; } @@ -629,7 +624,7 @@ clib_error_t * vnet_unbind (vnet_unbind_args_t * a) { int rv; - if ((rv = vnet_unbind_i (a->app_index, a->handle))) + if ((rv = vnet_unbind_inline (a))) return clib_error_return_code (0, rv, 0, "unbind failed: %d", rv); return 0; } diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 1f481dcf85e..1eefb0c3860 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -64,7 +64,7 @@ typedef struct _vnet_unbind_args_t u64 handle; /**< Session handle */ }; u32 app_index; /**< Owning application index */ - u32 app_wrk_index; /**< App's local pool worker index */ + u32 wrk_map_index; /**< App's local pool worker index */ } vnet_unbind_args_t; typedef struct _vnet_connect_args diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c index 158a8e8abf3..83d196358f3 100644 --- a/src/vnet/session/segment_manager.c +++ b/src/vnet/session/segment_manager.c @@ -34,7 +34,8 @@ static u32 default_app_evt_queue_size = 128; segment_manager_properties_t * segment_manager_properties_get (segment_manager_t * sm) { - return application_get_segment_manager_properties (sm->app_wrk_index); + app_worker_t *app_wrk = app_worker_get (sm->app_wrk_index); + return application_get_segment_manager_properties (app_wrk->app_index); } segment_manager_properties_t * diff --git a/src/vnet/session/session.api b/src/vnet/session/session.api index 24ebfba5dba..aa73212da85 100644 --- a/src/vnet/session/session.api +++ b/src/vnet/session/session.api @@ -294,11 +294,13 @@ define bind_sock { /** \brief Unbind @param client_index - opaque cookie to identify the sender @param context - sender context, to match reply w/ request + @param wrk_index - index of worker requesting the bind @param handle - bind handle obtained from bind reply */ autoreply define unbind_sock { u32 client_index; u32 context; + u32 wrk_index; u64 handle; }; diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 57ac384a519..952a5a90141 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -505,9 +505,9 @@ session_enqueue_notify (stream_session_t * s, u8 lock) /* *INDENT-ON* */ if (lock) - return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); + return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); - return application_send_event (app, s, FIFO_EVENT_APP_RX); + return app_worker_send_event (app, s, FIFO_EVENT_APP_RX); } int @@ -520,9 +520,9 @@ session_dequeue_notify (stream_session_t * s) return -1; if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL) - return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); + return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); - return application_send_event (app, s, FIFO_EVENT_APP_TX); + return app_worker_send_event (app, s, FIFO_EVENT_APP_TX); } /** @@ -971,112 +971,44 @@ session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque); } +/** + * Ask transport to listen on session endpoint. + * + * @param s Session for which listen will be called. Note that unlike + * established sessions, listen sessions are not associated to a + * thread. + * @param sep Local endpoint to be listened on. + */ int -session_listen_vc (stream_session_t * s, session_endpoint_t * sep) -{ - transport_connection_t *tc; - u32 tci; - - /* Transport bind/listen */ - tci = tp_vfts[sep->transport_proto].bind (s->session_index, - session_endpoint_to_transport - (sep)); - - if (tci == (u32) ~ 0) - return -1; - - /* Attach transport to session */ - s->connection_index = tci; - tc = tp_vfts[sep->transport_proto].get_listener (tci); - - /* Weird but handle it ... */ - if (tc == 0) - return -1; - - /* Add to the main lookup table */ - session_lookup_add_connection (tc, s->session_index); - return 0; -} - -int -session_listen_cl (stream_session_t * s, session_endpoint_t * sep) +session_listen (stream_session_t * ls, session_endpoint_extended_t * sep) { transport_connection_t *tc; - app_worker_t *server; - segment_manager_t *sm; - u32 tci; + transport_endpoint_t *tep; + u32 tc_index; - /* Transport bind/listen */ - tci = tp_vfts[sep->transport_proto].bind (s->session_index, - session_endpoint_to_transport - (sep)); + /* Transport bind/listen */ + tep = session_endpoint_to_transport (sep); + tc_index = tp_vfts[sep->transport_proto].bind (ls->session_index, tep); - if (tci == (u32) ~ 0) + if (tc_index == (u32) ~ 0) return -1; /* Attach transport to session */ - s->connection_index = tci; - tc = tp_vfts[sep->transport_proto].get_listener (tci); - - /* Weird but handle it ... */ - if (tc == 0) - return -1; - - server = app_worker_get (s->app_wrk_index); - sm = app_worker_get_listen_segment_manager (server, s); - if (session_alloc_fifos (sm, s)) - return -1; + ls->connection_index = tc_index; - /* Add to the main lookup table */ - session_lookup_add_connection (tc, s->session_index); + /* Add to the main lookup table after transport was initialized */ + tc = tp_vfts[sep->transport_proto].get_listener (tc_index); + session_lookup_add_connection (tc, ls->session_index); return 0; } -int -session_listen_app (stream_session_t * s, session_endpoint_t * sep) -{ - session_endpoint_extended_t esep; - clib_memcpy (&esep, sep, sizeof (*sep)); - esep.app_wrk_index = s->app_wrk_index; - - return tp_vfts[sep->transport_proto].bind (s->session_index, - (transport_endpoint_t *) & esep); -} - -typedef int (*session_listen_service_fn) (stream_session_t *, - session_endpoint_t *); - -/* *INDENT-OFF* */ -static session_listen_service_fn -session_listen_srv_fns[TRANSPORT_N_SERVICES] = { - session_listen_vc, - session_listen_cl, - session_listen_app, -}; -/* *INDENT-ON* */ - -/** - * Ask transport to listen on local transport endpoint. - * - * @param s Session for which listen will be called. Note that unlike - * established sessions, listen sessions are not associated to a - * thread. - * @param tep Local endpoint to be listened on. - */ -int -stream_session_listen (stream_session_t * s, session_endpoint_t * sep) -{ - transport_service_type_t tst = tp_vfts[sep->transport_proto].service_type; - return session_listen_srv_fns[tst] (s, sep); -} - /** * Ask transport to stop listening on local transport endpoint. * * @param s Session to stop listening on. It must be in state LISTENING. */ int -stream_session_stop_listen (stream_session_t * s) +session_stop_listen (stream_session_t * s) { transport_proto_t tp = session_get_transport_proto (s); transport_connection_t *tc; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 184fa997736..19609da89e5 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -24,7 +24,7 @@ #define HALF_OPEN_LOOKUP_INVALID_VALUE ((u64)~0) #define INVALID_INDEX ((u32)~0) -#define SESSION_PROXY_LISTENER_INDEX ((u32)~0 - 1) +#define SESSION_PROXY_LISTENER_INDEX ((u8)~0 - 1) #define SESSION_LOCAL_HANDLE_PREFIX 0x7FFFFFFF /* TODO decide how much since we have pre-data as well */ @@ -550,8 +550,8 @@ void stream_session_reset_notify (transport_connection_t * tc); int stream_session_accept (transport_connection_t * tc, u32 listener_index, u8 notify); int session_open (u32 app_index, session_endpoint_t * tep, u32 opaque); -int stream_session_listen (stream_session_t * s, session_endpoint_t * tep); -int stream_session_stop_listen (stream_session_t * s); +int session_listen (stream_session_t * s, session_endpoint_extended_t * sep); +int session_stop_listen (stream_session_t * s); void stream_session_disconnect (stream_session_t * s); void stream_session_disconnect_transport (stream_session_t * s); void stream_session_cleanup (stream_session_t * s); diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 78c05c34431..aa29090383a 100755 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -255,7 +255,7 @@ send_session_accept_callback (stream_session_t * s) } else { - ll = application_get_local_listen_session (server_wrk, + ll = application_get_local_listen_session (server, ls->listener_index); if (ll->transport_listener_index != ~0) { @@ -445,7 +445,7 @@ mq_send_session_accepted_cb (stream_session_t * s) memset (evt, 0, sizeof (*evt)); evt->event_type = SESSION_CTRL_EVT_ACCEPTED; mp = (session_accepted_msg_t *) evt->data; - mp->context = app_wrk->wrk_index; + mp->context = app->app_index; mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo); mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo); @@ -489,9 +489,7 @@ mq_send_session_accepted_cb (stream_session_t * s) } else { - ll = - application_get_local_listen_session (app_wrk, - ls->listener_index); + ll = application_get_local_listen_session (app, ls->listener_index); if (ll->transport_listener_index != ~0) { listener = listen_session_get (ll->transport_listener_index); @@ -687,7 +685,7 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context, else { local_session_t *local; - local = application_get_local_listen_session_from_handle (handle); + local = application_get_local_listener_w_handle (handle); mp->lcl_port = local->port; mp->lcl_is_ip4 = session_type_is_ip4 (local->session_type); } @@ -1239,6 +1237,7 @@ vl_api_unbind_sock_t_handler (vl_api_unbind_sock_t * mp) { a->app_index = app->app_index; a->handle = mp->handle; + a->wrk_map_index = mp->wrk_index; if ((error = vnet_unbind (a))) { rv = clib_error_get_code (error); @@ -1255,7 +1254,7 @@ vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp) { vl_api_connect_session_reply_t *rmp; vnet_connect_args_t _a, *a = &_a; - application_t *app; + application_t *app = 0; clib_error_t *error = 0; int rv = 0; @@ -1307,6 +1306,12 @@ vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp) done: REPLY_MACRO (VL_API_CONNECT_SESSION_REPLY); + + if (app && application_use_mq_for_ctrl (app)) + { + app_worker_t *app_wrk = application_get_worker (app, mp->wrk_index); + mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, 1); + } } static void diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 2c425abeaf3..119cdd812ad 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -43,10 +43,19 @@ session_mq_accepted_reply_handler (void *data) if (session_handle_is_local (mp->handle)) { + app_worker_t *app_wrk; + application_t *app; ls = application_get_local_session_from_handle (mp->handle); - if (!ls || ls->app_wrk_index != mp->context) + if (!ls) { - clib_warning ("server %u doesn't own local handle %llu", + clib_warning ("unknown local handle 0x%lx", mp->handle); + return; + } + app_wrk = app_worker_get (ls->app_wrk_index); + app = application_get (app_wrk->app_index); + if (app->app_index != mp->context) + { + clib_warning ("server %u doesn't own local handle 0x%lx", mp->context, mp->handle); return; } @@ -72,7 +81,7 @@ session_mq_accepted_reply_handler (void *data) { app_worker_t *app; app = app_worker_get (s->app_wrk_index); - application_send_event (app, s, FIFO_EVENT_APP_RX); + app_worker_send_event (app, s, FIFO_EVENT_APP_RX); } } } @@ -726,7 +735,7 @@ session_tx_fifo_dequeue_internal (vlib_main_t * vm, stream_session_t * s, int *n_tx_pkts) { application_t *app; - app = application_get (s->opaque); + app = application_get (s->t_app_index); svm_fifo_unset_event (s->server_tx_fifo); return app->cb_fns.builtin_app_tx_callback (s); } diff --git a/src/vnet/session/session_test.c b/src/vnet/session/session_test.c index 6f162a81509..058644d91d4 100644 --- a/src/vnet/session/session_test.c +++ b/src/vnet/session/session_test.c @@ -262,7 +262,7 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input) }; ip4_address_t intf_addr = { - .as_u32 = clib_host_to_net_u32 (0x06000105), + .as_u32 = clib_host_to_net_u32 (0x07000105), }; intf_sep.ip.ip4 = intf_addr; @@ -1376,7 +1376,7 @@ session_test_proxy (vlib_main_t * vm, unformat_input_t * input) char *show_listeners = "sh session listeners tcp verbose"; char *show_local_listeners = "sh app ns table default"; unformat_input_t tmp_input; - u32 server_index, app_index, server_wrk_index; + u32 server_index, app_index; u32 dummy_server_api_index = ~0, sw_if_index = 0; clib_error_t *error = 0; u8 is_filtered = 0; @@ -1384,7 +1384,6 @@ session_test_proxy (vlib_main_t * vm, unformat_input_t * input) transport_connection_t *tc; u16 lcl_port = 1234, rmt_port = 4321; app_namespace_t *app_ns; - application_t *server; int verbose = 0; while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) @@ -1444,8 +1443,6 @@ session_test_proxy (vlib_main_t * vm, unformat_input_t * input) error = vnet_application_attach (&attach_args); SESSION_TEST ((error == 0), "server attachment should work"); server_index = attach_args.app_index; - server = application_get (server_index); - server_wrk_index = application_get_default_worker (server)->wrk_index; if (verbose) { @@ -1462,7 +1459,7 @@ session_test_proxy (vlib_main_t * vm, unformat_input_t * input) SESSION_TEST ((tc != 0), "lookup 1.2.3.4 1234 5.6.7.8 4321 should be " "successful"); s = listen_session_get (tc->s_index); - SESSION_TEST ((s->app_wrk_index == server_wrk_index), "lookup should return" + SESSION_TEST ((s->app_index == server_index), "lookup should return" " the server"); tc = session_lookup_connection_wt4 (0, &rmt_ip, &rmt_ip, lcl_port, rmt_port, diff --git a/src/vnet/session/stream_session.h b/src/vnet/session/stream_session.h index 567962a35db..b08f9592510 100644 --- a/src/vnet/session/stream_session.h +++ b/src/vnet/session/stream_session.h @@ -60,7 +60,7 @@ typedef struct _stream_session_t /** Session index in per_thread pool */ u32 session_index; - /** app worker pool index */ + /** App worker pool index */ u32 app_wrk_index; u8 thread_index; @@ -78,6 +78,19 @@ typedef struct _stream_session_t { /** Parent listener session if the result of an accept */ u32 listener_index; + + /** Application index if a listener */ + u32 app_index; + }; + + union + { + /** Transport app index for apps acting as transports */ + u32 t_app_index; + + /** Index in listener app's listener db */ + u32 listener_db_index; + /** Opaque, for general use */ u32 opaque; }; @@ -103,17 +116,25 @@ typedef struct local_session_ /** Server index */ u32 app_wrk_index; + /** Port for connection. Overlaps thread_index/enqueue_epoch */ + u16 port; + /** Segment index where fifos were allocated */ u32 svm_segment_index; - u32 listener_index; + /** Transport listener index. Overlaps connection index */ + u32 transport_listener_index; - /** Port for connection */ - u16 port; + union + { + u32 listener_index; + u32 app_index; + }; + + u32 listener_db_index; /** Has transport embedded when listener not purely local */ session_type_t listener_session_type; - u32 transport_listener_index; /** * Client data diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c index 4fb0dfb3c91..ae426fba594 100644 --- a/src/vnet/tls/tls.c +++ b/src/vnet/tls/tls.c @@ -49,7 +49,7 @@ tls_add_vpp_q_evt (svm_fifo_t * f, u8 evt_type) static inline int tls_add_app_q_evt (app_worker_t * app, stream_session_t * app_session) { - return application_send_event (app, app_session, FIFO_EVENT_APP_RX); + return app_worker_send_event (app, app_session, FIFO_EVENT_APP_RX); } u32 @@ -168,7 +168,7 @@ tls_notify_app_accept (tls_ctx_t * ctx) app_session->session_type = app_listener->session_type; app_session->listener_index = app_listener->session_index; sm = app_worker_get_listen_segment_manager (app_wrk, app_listener); - app_session->opaque = tls_main.app_index; + app_session->t_app_index = tls_main.app_index; if ((rv = session_alloc_fifos (sm, app_session))) { @@ -202,7 +202,7 @@ tls_notify_app_connected (tls_ctx_t * ctx, u8 is_failed) app_session->connection_index = ctx->tls_ctx_handle; app_session->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_TLS, ctx->tcp_is_ip4); - app_session->opaque = tls_main.app_index; + app_session->t_app_index = tls_main.app_index; if (session_alloc_fifos (sm, app_session)) goto failed; @@ -522,7 +522,7 @@ tls_disconnect (u32 ctx_handle, u32 thread_index) u32 tls_start_listen (u32 app_listener_index, transport_endpoint_t * tep) { - app_worker_t *tls_app_wrk, *app_wrk; + app_worker_t *app_wrk; tls_main_t *tm = &tls_main; session_handle_t tls_handle; session_endpoint_extended_t *sep; @@ -547,10 +547,9 @@ tls_start_listen (u32 app_listener_index, transport_endpoint_t * tep) /* TODO hide this by calling vnet_bind() */ tls_app = application_get (tm->app_index); - tls_app_wrk = application_get_default_worker (tls_app); +// tls_app_wrk = application_get_default_worker (tls_app); sep->transport_proto = TRANSPORT_PROTO_TCP; - if (app_worker_start_listen (tls_app_wrk, (session_endpoint_t *) sep, - &tls_handle)) + if (application_start_listen (tls_app, sep, &tls_handle)) return ~0; tls_listener = listen_session_get_from_handle (tls_handle); @@ -580,7 +579,8 @@ tls_stop_listen (u32 lctx_index) tls_engine_type_t engine_type; lctx = tls_listener_ctx_get (lctx_index); - app_worker_stop_listen (lctx->tls_session_handle, tm->app_index); + /* TODO use unbind */ + application_stop_listen (tm->app_index, 0, lctx->tls_session_handle); engine_type = lctx->tls_ctx_engine; tls_vfts[engine_type].ctx_stop_listen (lctx); diff --git a/test/scripts/socket_test.sh b/test/scripts/socket_test.sh index 1dc9ab24d70..5fd8b5f9db6 100755 --- a/test/scripts/socket_test.sh +++ b/test/scripts/socket_test.sh @@ -17,8 +17,8 @@ docker_os="ubuntu" vcl_ldpreload_lib="libvcl_ldpreload.so.0.0.0" user_gid="$(id -g)" vpp_app="vpp" -sock_srvr_app="bin/sock_test_server" -sock_clnt_app="bin/sock_test_client" +sock_srvr_app="sock_test_server" +sock_clnt_app="sock_test_client" sock_srvr_addr="127.0.0.1" sock_srvr_port="22000" iperf_srvr_app="iperf3 -V4d -s" diff --git a/test/test_vcl.py b/test/test_vcl.py index 32db155a0d0..8a7faad6f0c 100644 --- a/test/test_vcl.py +++ b/test/test_vcl.py @@ -19,12 +19,12 @@ class VCLAppWorker(Worker): app = appname env.update({'LD_PRELOAD': "%s/libvcl_ldpreload.so" % vcl_lib_dir}) + elif "sock" in appname: + app = "%s/vpp/bin/%s" % (build_dir, appname) + env.update({'LD_PRELOAD': + "%s/libvcl_ldpreload.so" % vcl_lib_dir}) else: - app = "%s/%s" % (vcl_lib_dir, appname) - if not os.path.isfile(app): - app = "%s/vpp/%s" % (build_dir, appname) - env.update({'LD_PRELOAD': - "%s/libvcl_ldpreload.so" % vcl_lib_dir}) + app = "%s/vpp/bin/%s" % (build_dir, appname) self.args = [app] + args super(VCLAppWorker, self).__init__(self.args, logger, env) @@ -222,7 +222,7 @@ class VCLCutThruTestCase(VCLTestCase): self.client_echo_test_args = ["-E", self.echo_phrase, "-X", self.server_addr, self.server_port] self.client_iperf3_timeout = 20 - self.client_iperf3_args = ["-V4d", "-c", self.server_addr] + self.client_iperf3_args = ["-V4d", "-t 5", "-c", self.server_addr] self.server_iperf3_args = ["-V4d", "-s"] self.client_uni_dir_nsock_timeout = 60 self.client_uni_dir_nsock_test_args = ["-I", "5", "-U", "-X", @@ -241,8 +241,8 @@ class VCLCutThruTestCase(VCLTestCase): def test_ldp_cut_thru_echo(self): """ run LDP cut thru echo test """ - self.cut_thru_test("bin/sock_test_server", self.server_args, - "bin/sock_test_client", self.client_echo_test_args) + self.cut_thru_test("sock_test_server", self.server_args, + "sock_test_client", self.client_echo_test_args) def test_ldp_cut_thru_iperf3(self): """ run LDP cut thru iperf3 test """ @@ -263,8 +263,8 @@ class VCLCutThruTestCase(VCLTestCase): """ run LDP cut thru uni-directional (multiple sockets) test """ self.timeout = self.client_uni_dir_nsock_timeout - self.cut_thru_test("bin/sock_test_server", self.server_args, - "bin/sock_test_client", + self.cut_thru_test("sock_test_server", self.server_args, + "sock_test_client", self.client_uni_dir_nsock_test_args) @unittest.skipUnless(running_extended_tests(), "part of extended tests") @@ -272,23 +272,23 @@ class VCLCutThruTestCase(VCLTestCase): """ run LDP cut thru bi-directional (multiple sockets) test """ self.timeout = self.client_bi_dir_nsock_timeout - self.cut_thru_test("bin/sock_test_server", self.server_args, - "bin/sock_test_client", + self.cut_thru_test("sock_test_server", self.server_args, + "sock_test_client", self.client_bi_dir_nsock_test_args) def test_vcl_cut_thru_echo(self): """ run VCL cut thru echo test """ - self.cut_thru_test("bin/vcl_test_server", self.server_args, - "bin/vcl_test_client", self.client_echo_test_args) + self.cut_thru_test("vcl_test_server", self.server_args, + "vcl_test_client", self.client_echo_test_args) @unittest.skipUnless(running_extended_tests(), "part of extended tests") def test_vcl_cut_thru_uni_dir_nsock(self): """ run VCL cut thru uni-directional (multiple sockets) test """ self.timeout = self.client_uni_dir_nsock_timeout - self.cut_thru_test("bin/vcl_test_server", self.server_args, - "bin/vcl_test_client", + self.cut_thru_test("vcl_test_server", self.server_args, + "vcl_test_client", self.client_uni_dir_nsock_test_args) @unittest.skipUnless(running_extended_tests(), "part of extended tests") @@ -296,8 +296,8 @@ class VCLCutThruTestCase(VCLTestCase): """ run VCL cut thru bi-directional (multiple sockets) test """ self.timeout = self.client_bi_dir_nsock_timeout - self.cut_thru_test("bin/vcl_test_server", self.server_args, - "bin/vcl_test_client", + self.cut_thru_test("vcl_test_server", self.server_args, + "vcl_test_client", self.client_bi_dir_nsock_test_args) @@ -320,20 +320,20 @@ class VCLThruHostStackTestCase(VCLTestCase): def test_ldp_thru_host_stack_echo(self): """ run LDP thru host stack echo test """ - self.thru_host_stack_test("bin/sock_test_server", self.server_args, - "bin/sock_test_client", + self.thru_host_stack_test("sock_test_server", self.server_args, + "sock_test_client", self.client_echo_test_args) # TBD: Remove these when VPP thru host teardown config bug is fixed. - self.thru_host_stack_test("bin/vcl_test_server", self.server_args, - "bin/vcl_test_client", + self.thru_host_stack_test("vcl_test_server", self.server_args, + "vcl_test_client", self.client_echo_test_args) def test_vcl_thru_host_stack_echo(self): """ run VCL thru host stack echo test """ # TBD: Enable this when VPP thru host teardown config bug is fixed. - # self.thru_host_stack_test("bin/vcl_test_server", self.server_args, - # "bin/vcl_test_client", + # self.thru_host_stack_test("vcl_test_server", self.server_args, + # "vcl_test_client", # self.client_echo_test_args) # TBD: Remove VCLThruHostStackExtended*TestCase classes and move @@ -369,8 +369,8 @@ class VCLThruHostStackExtendedATestCase(VCLTestCase): """ run VCL thru host stack bi-directional (multiple sockets) test """ self.timeout = self.client_bi_dir_nsock_timeout - self.thru_host_stack_test("bin/vcl_test_server", self.server_args, - "bin/vcl_test_client", + self.thru_host_stack_test("vcl_test_server", self.server_args, + "vcl_test_client", self.client_bi_dir_nsock_test_args) @@ -470,8 +470,8 @@ class VCLThruHostStackExtendedDTestCase(VCLTestCase): """ run VCL thru host stack uni-directional (multiple sockets) test """ self.timeout = self.client_uni_dir_nsock_timeout - self.thru_host_stack_test("bin/vcl_test_server", self.server_args, - "bin/vcl_test_client", + self.thru_host_stack_test("vcl_test_server", self.server_args, + "vcl_test_client", self.client_uni_dir_nsock_test_args) @@ -483,7 +483,7 @@ class VCLThruHostStackIperfTestCase(VCLTestCase): self.thru_host_stack_setup() self.client_iperf3_timeout = 20 - self.client_iperf3_args = ["-V4d", "-c", self.loop0.local_ip4] + self.client_iperf3_args = ["-V4d", "-t 5", "-c", self.loop0.local_ip4] self.server_iperf3_args = ["-V4d", "-s"] def tearDown(self): @@ -520,7 +520,8 @@ class VCLIpv6CutThruTestCase(VCLTestCase): self.client_ipv6_echo_test_args = ["-6", "-E", self.echo_phrase, "-X", self.server_ipv6_addr, self.server_port] - self.client_ipv6_iperf3_args = ["-V6d", "-c", self.server_ipv6_addr] + self.client_ipv6_iperf3_args = ["-V6d", "-t 5", "-c", + self.server_ipv6_addr] self.server_ipv6_iperf3_args = ["-V6d", "-s"] self.client_ipv6_uni_dir_nsock_test_args = ["-6", "-I", "5", "-U", "-X", @@ -539,9 +540,9 @@ class VCLIpv6CutThruTestCase(VCLTestCase): def test_ldp_ipv6_cut_thru_echo(self): """ run LDP IPv6 cut thru echo test """ - self.cut_thru_test("bin/sock_test_server", + self.cut_thru_test("sock_test_server", self.server_ipv6_args, - "bin/sock_test_client", + "sock_test_client", self.client_ipv6_echo_test_args) def test_ldp_ipv6_cut_thru_iperf3(self): @@ -564,8 +565,8 @@ class VCLIpv6CutThruTestCase(VCLTestCase): """ run LDP IPv6 cut thru uni-directional (multiple sockets) test """ self.timeout = self.client_uni_dir_nsock_timeout - self.cut_thru_test("bin/sock_test_server", self.server_ipv6_args, - "bin/sock_test_client", + self.cut_thru_test("sock_test_server", self.server_ipv6_args, + "sock_test_client", self.client_ipv6_uni_dir_nsock_test_args) @unittest.skipUnless(running_extended_tests(), "part of extended tests") @@ -573,16 +574,16 @@ class VCLIpv6CutThruTestCase(VCLTestCase): """ run LDP IPv6 cut thru bi-directional (multiple sockets) test """ self.timeout = self.client_bi_dir_nsock_timeout - self.cut_thru_test("bin/sock_test_server", self.server_ipv6_args, - "bin/sock_test_client", + self.cut_thru_test("sock_test_server", self.server_ipv6_args, + "sock_test_client", self.client_ipv6_bi_dir_nsock_test_args) def test_vcl_ipv6_cut_thru_echo(self): """ run VCL IPv6 cut thru echo test """ - self.cut_thru_test("bin/vcl_test_server", + self.cut_thru_test("vcl_test_server", self.server_ipv6_args, - "bin/vcl_test_client", + "vcl_test_client", self.client_ipv6_echo_test_args) @unittest.skipUnless(running_extended_tests(), "part of extended tests") @@ -590,8 +591,8 @@ class VCLIpv6CutThruTestCase(VCLTestCase): """ run VCL IPv6 cut thru uni-directional (multiple sockets) test """ self.timeout = self.client_uni_dir_nsock_timeout - self.cut_thru_test("bin/vcl_test_server", self.server_ipv6_args, - "bin/vcl_test_client", + self.cut_thru_test("vcl_test_server", self.server_ipv6_args, + "vcl_test_client", self.client_ipv6_uni_dir_nsock_test_args) @unittest.skipUnless(running_extended_tests(), "part of extended tests") @@ -599,8 +600,8 @@ class VCLIpv6CutThruTestCase(VCLTestCase): """ run VCL IPv6 cut thru bi-directional (multiple sockets) test """ self.timeout = self.client_bi_dir_nsock_timeout - self.cut_thru_test("bin/vcl_test_server", self.server_ipv6_args, - "bin/vcl_test_client", + self.cut_thru_test("vcl_test_server", self.server_ipv6_args, + "vcl_test_client", self.client_ipv6_bi_dir_nsock_test_args) @@ -623,22 +624,22 @@ class VCLIpv6ThruHostStackTestCase(VCLTestCase): def test_ldp_ipv6_thru_host_stack_echo(self): """ run LDP IPv6 thru host stack echo test """ - self.thru_host_stack_test("bin/sock_test_server", + self.thru_host_stack_test("sock_test_server", self.server_ipv6_args, - "bin/sock_test_client", + "sock_test_client", self.client_ipv6_echo_test_args) # TBD: Remove these when VPP thru host teardown config bug is fixed. - self.thru_host_stack_test("bin/vcl_test_server", + self.thru_host_stack_test("vcl_test_server", self.server_ipv6_args, - "bin/vcl_test_client", + "vcl_test_client", self.client_ipv6_echo_test_args) def test_vcl_ipv6_thru_host_stack_echo(self): """ run VCL IPv6 thru host stack echo test """ -# self.thru_host_stack_test("bin/vcl_test_server", +# self.thru_host_stack_test("vcl_test_server", # self.server_ipv6_args, -# "bin/vcl_test_client", +# "vcl_test_client", # self.client_ipv6_echo_test_args) # TBD: Remove VCLIpv6ThruHostStackExtended*TestCase classes and move @@ -675,8 +676,8 @@ class VCLIpv6ThruHostStackExtendedATestCase(VCLTestCase): """ run VCL thru host stack bi-directional (multiple sockets) test """ self.timeout = self.client_bi_dir_nsock_timeout - self.thru_host_stack_test("bin/vcl_test_server", self.server_ipv6_args, - "bin/vcl_test_client", + self.thru_host_stack_test("vcl_test_server", self.server_ipv6_args, + "vcl_test_client", self.client_ipv6_bi_dir_nsock_test_args) @@ -709,9 +710,9 @@ class VCLIpv6ThruHostStackExtendedBTestCase(VCLTestCase): """ run LDP thru host stack bi-directional (multiple sockets) test """ self.timeout = self.client_bi_dir_nsock_timeout - self.thru_host_stack_test("bin/sock_test_server", + self.thru_host_stack_test("sock_test_server", self.server_ipv6_args, - "bin/sock_test_client", + "sock_test_client", self.client_ipv6_bi_dir_nsock_test_args) @@ -745,9 +746,9 @@ class VCLIpv6ThruHostStackExtendedCTestCase(VCLTestCase): """ run LDP thru host stack uni-directional (multiple sockets) test """ self.timeout = self.client_uni_dir_nsock_timeout - self.thru_host_stack_test("bin/sock_test_server", + self.thru_host_stack_test("sock_test_server", self.server_ipv6_args, - "bin/sock_test_client", + "sock_test_client", self.client_ipv6_uni_dir_nsock_test_args) @@ -781,8 +782,8 @@ class VCLIpv6ThruHostStackExtendedDTestCase(VCLTestCase): """ run VCL thru host stack uni-directional (multiple sockets) test """ self.timeout = self.client_uni_dir_nsock_timeout - self.thru_host_stack_test("bin/vcl_test_server", self.server_ipv6_args, - "bin/vcl_test_client", + self.thru_host_stack_test("vcl_test_server", self.server_ipv6_args, + "vcl_test_client", self.client_ipv6_uni_dir_nsock_test_args) @@ -794,7 +795,8 @@ class VCLIpv6ThruHostStackIperfTestCase(VCLTestCase): self.thru_host_stack_ipv6_setup() self.client_iperf3_timeout = 20 - self.client_ipv6_iperf3_args = ["-V6d", "-c", self.loop0.local_ip6] + self.client_ipv6_iperf3_args = ["-V6d", "-t 5", "-c", + self.loop0.local_ip6] self.server_ipv6_iperf3_args = ["-V6d", "-s"] def tearDown(self): -- 2.16.6