From: Florin Coras Date: Mon, 4 Mar 2019 18:56:23 +0000 (-0800) Subject: session: use vpp to switch io events for ct sessions X-Git-Tag: v19.04-rc1~304 X-Git-Url: https://gerrit.fd.io/r/gitweb?p=vpp.git;a=commitdiff_plain;h=653e43f06a974121343b2c1f0e4533926020877b session: use vpp to switch io events for ct sessions Instead of allocating pairs of message queues per cut-thru session and having the applications map them, this uses vpp as an io event message switch. Change-Id: I51db1c7564df479a7d1a3288342394251fd188bb Signed-off-by: Florin Coras --- diff --git a/src/tests/vnet/session/udp_echo.c b/src/tests/vnet/session/udp_echo.c index 462e113dbbd..9fda73d1307 100644 --- a/src/tests/vnet/session/udp_echo.c +++ b/src/tests/vnet/session/udp_echo.c @@ -510,39 +510,14 @@ session_accepted_handler (session_accepted_msg_t * mp) session_index = session - utm->sessions; session->session_index = session_index; - /* Cut-through case */ - if (mp->server_event_queue_address) - { - clib_warning ("cut-through session"); - session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address, - svm_msg_q_t *); - sleep (1); - rx_fifo->master_session_index = session_index; - tx_fifo->master_session_index = session_index; - utm->cut_through_session_index = session_index; - session->rx_fifo = rx_fifo; - session->tx_fifo = tx_fifo; - session->is_dgram = 0; - - rv = pthread_create (&utm->cut_through_thread_handle, - NULL /*attr */ , cut_through_thread_fn, 0); - if (rv) - { - clib_warning ("pthread_create returned %d", rv); - rv = VNET_API_ERROR_SYSCALL_ERROR_1; - } - } - else - { - rx_fifo->client_session_index = session_index; - tx_fifo->client_session_index = session_index; - session->rx_fifo = rx_fifo; - session->tx_fifo = tx_fifo; - clib_memcpy_fast (&session->transport.rmt_ip, mp->ip, - sizeof (ip46_address_t)); - session->transport.is_ip4 = mp->is_ip4; - session->transport.rmt_port = mp->port; - } + rx_fifo->client_session_index = session_index; + tx_fifo->client_session_index = session_index; + session->rx_fifo = rx_fifo; + session->tx_fifo = tx_fifo; + clib_memcpy_fast (&session->transport.rmt_ip, mp->ip, + sizeof (ip46_address_t)); + session->transport.is_ip4 = mp->is_ip4; + session->transport.rmt_port = mp->port; hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index); if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0) @@ -623,18 +598,17 @@ session_connected_handler (session_connected_msg_t * mp) session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); /* Cut-through case */ - if (mp->client_event_queue_address) + if (mp->ct_rx_fifo) { clib_warning ("cut-through session"); - session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address, + session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); - utm->ct_event_queue = uword_to_pointer (mp->client_event_queue_address, - svm_msg_q_t *); utm->cut_through_session_index = session->session_index; session->is_dgram = 0; sleep (1); session->rx_fifo->client_session_index = session->session_index; session->tx_fifo->client_session_index = session->session_index; + /* TODO use ct fifos */ } else { @@ -744,7 +718,6 @@ send_test_chunk (udp_echo_main_t * utm, app_session_t * s, u32 bytes) u8 *test_data = utm->connect_test_data; u32 bytes_to_snd, enq_space, min_chunk; - session_evt_type_t et = FIFO_EVENT_APP_TX; int written; test_buf_len = vec_len (test_data); @@ -753,17 +726,9 @@ send_test_chunk (udp_echo_main_t * utm, app_session_t * s, u32 bytes) utm->bytes_to_send); enq_space = svm_fifo_max_enqueue (s->tx_fifo); bytes_this_chunk = clib_min (bytes_this_chunk, enq_space); - et += (s->session_index == utm->cut_through_session_index); - - if (s->is_dgram) - written = app_send_dgram_raw (s->tx_fifo, &s->transport, s->vpp_evt_q, - test_data + test_buf_offset, - bytes_this_chunk, et, SVM_Q_WAIT); - else - written = app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, - test_data + test_buf_offset, - bytes_this_chunk, et, SVM_Q_WAIT); + written = app_send (s, test_data + test_buf_offset, bytes_this_chunk, + SVM_Q_WAIT); if (written > 0) { utm->bytes_to_send -= written; @@ -1004,15 +969,12 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, u32 session_index) app_session_t *session; int rv; u32 max_dequeue, offset, max_transfer, rx_buf_len; - session_evt_type_t et = FIFO_EVENT_APP_TX; session = pool_elt_at_index (utm->sessions, session_index); rx_buf_len = vec_len (utm->rx_buf); rx_fifo = session->rx_fifo; tx_fifo = session->tx_fifo; - et += (session->session_index == utm->cut_through_session_index); - max_dequeue = svm_fifo_max_dequeue (rx_fifo); /* Allow enqueuing of a new event */ svm_fifo_unset_event (rx_fifo); @@ -1040,15 +1002,8 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, u32 session_index) offset = 0; do { - if (session->is_dgram) - rv = app_send_dgram_raw (tx_fifo, &session->transport, - session->vpp_evt_q, - &utm->rx_buf[offset], n_read, et, - SVM_Q_WAIT); - else - rv = app_send_stream_raw (tx_fifo, session->vpp_evt_q, - &utm->rx_buf[offset], n_read, et, - SVM_Q_WAIT); + rv = app_send (session, &utm->rx_buf[offset], n_read, + SVM_Q_WAIT); if (rv > 0) { n_read -= rv; @@ -1060,7 +1015,7 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, u32 session_index) /* If event wasn't set, add one */ if (svm_fifo_set_event (tx_fifo)) app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo, - et, SVM_Q_WAIT); + SESSION_IO_EVT_TX, SVM_Q_WAIT); } } while ((n_read < 0 || max_dequeue > 0) && !utm->time_to_stop); @@ -1087,11 +1042,9 @@ server_handle_event_queue (udp_echo_main_t * utm) e = svm_msg_q_msg_data (mq, &msg); switch (e->event_type) { - case FIFO_EVENT_APP_RX: + case SESSION_IO_EVT_RX: server_handle_fifo_event_rx (utm, e->fifo->client_session_index); break; - case SESSION_IO_EVT_CT_TX: - break; default: handle_mq_event (e); diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c index 6baa7c1f927..d86e7738bdd 100644 --- a/src/vcl/vcl_bapi.c +++ b/src/vcl/vcl_bapi.c @@ -293,41 +293,6 @@ vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp) VDBG (1, "Unmapped segment: %d", segment_handle); } -static void - vl_api_app_cut_through_registration_add_t_handler - (vl_api_app_cut_through_registration_add_t * mp) -{ - vcl_cut_through_registration_t *ctr; - u32 mqc_index = ~0; - vcl_worker_t *wrk; - int *fds = 0; - - if (mp->n_fds) - { - ASSERT (mp->n_fds == 2); - vec_validate (fds, mp->n_fds); - vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5); - } - - wrk = vcl_worker_get (mp->wrk_index); - ctr = vcl_ct_registration_lock_and_alloc (wrk); - ctr->mq = uword_to_pointer (mp->evt_q_address, svm_msg_q_t *); - ctr->peer_mq = uword_to_pointer (mp->peer_evt_q_address, svm_msg_q_t *); - VDBG (0, "Adding ct registration %u", vcl_ct_registration_index (wrk, ctr)); - - if (mp->n_fds && (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)) - { - svm_msg_q_set_consumer_eventfd (ctr->mq, fds[0]); - svm_msg_q_set_producer_eventfd (ctr->peer_mq, fds[1]); - mqc_index = vcl_mq_epoll_add_evfd (wrk, ctr->mq); - ctr->epoll_evt_conn_index = mqc_index; - vec_free (fds); - } - vcl_ct_registration_lookup_add (wrk, mp->evt_q_address, - vcl_ct_registration_index (wrk, ctr)); - vcl_ct_registration_unlock (wrk); -} - static void vl_api_bind_sock_reply_t_handler (vl_api_bind_sock_reply_t * mp) { @@ -400,7 +365,6 @@ _(APPLICATION_TLS_CERT_ADD_REPLY, application_tls_cert_add_reply) \ _(APPLICATION_TLS_KEY_ADD_REPLY, application_tls_key_add_reply) \ _(MAP_ANOTHER_SEGMENT, map_another_segment) \ _(UNMAP_SEGMENT, unmap_segment) \ -_(APP_CUT_THROUGH_REGISTRATION_ADD, app_cut_through_registration_add) \ _(APP_WORKER_ADD_DEL_REPLY, app_worker_add_del_reply) \ void diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c index 6c364e376f7..e38b663507c 100644 --- a/src/vcl/vcl_private.c +++ b/src/vcl/vcl_private.c @@ -68,75 +68,6 @@ vcl_wait_for_app_state_change (app_state_t app_state) return VPPCOM_ETIMEDOUT; } -vcl_cut_through_registration_t * -vcl_ct_registration_lock_and_alloc (vcl_worker_t * wrk) -{ - vcl_cut_through_registration_t *cr; - clib_spinlock_lock (&wrk->ct_registration_lock); - pool_get (wrk->cut_through_registrations, cr); - memset (cr, 0, sizeof (*cr)); - cr->epoll_evt_conn_index = -1; - return cr; -} - -u32 -vcl_ct_registration_index (vcl_worker_t * wrk, - vcl_cut_through_registration_t * ctr) -{ - return (ctr - wrk->cut_through_registrations); -} - -void -vcl_ct_registration_lock (vcl_worker_t * wrk) -{ - clib_spinlock_lock (&wrk->ct_registration_lock); -} - -void -vcl_ct_registration_unlock (vcl_worker_t * wrk) -{ - clib_spinlock_unlock (&wrk->ct_registration_lock); -} - -vcl_cut_through_registration_t * -vcl_ct_registration_get (vcl_worker_t * wrk, u32 ctr_index) -{ - if (pool_is_free_index (wrk->cut_through_registrations, ctr_index)) - return 0; - return pool_elt_at_index (wrk->cut_through_registrations, ctr_index); -} - -vcl_cut_through_registration_t * -vcl_ct_registration_lock_and_lookup (vcl_worker_t * wrk, uword mq_addr) -{ - uword *p; - clib_spinlock_lock (&wrk->ct_registration_lock); - p = hash_get (wrk->ct_registration_by_mq, mq_addr); - if (!p) - return 0; - return vcl_ct_registration_get (wrk, p[0]); -} - -void -vcl_ct_registration_lookup_add (vcl_worker_t * wrk, uword mq_addr, - u32 ctr_index) -{ - hash_set (wrk->ct_registration_by_mq, mq_addr, ctr_index); -} - -void -vcl_ct_registration_lookup_del (vcl_worker_t * wrk, uword mq_addr) -{ - hash_unset (wrk->ct_registration_by_mq, mq_addr); -} - -void -vcl_ct_registration_del (vcl_worker_t * wrk, - vcl_cut_through_registration_t * ctr) -{ - pool_put (wrk->cut_through_registrations, ctr); -} - vcl_mq_evt_conn_t * vcl_mq_evt_conn_alloc (vcl_worker_t * wrk) { @@ -235,8 +166,6 @@ vcl_worker_cleanup (vcl_worker_t * wrk, u8 notify_vpp) if (wrk->mqs_epfd > 0) close (wrk->mqs_epfd); hash_free (wrk->session_index_by_vpp_handles); - hash_free (wrk->ct_registration_by_mq); - clib_spinlock_free (&wrk->ct_registration_lock); vec_free (wrk->mq_events); vec_free (wrk->mq_msg_vector); vcl_worker_free (wrk); @@ -286,8 +215,6 @@ vcl_worker_alloc_and_init () } wrk->session_index_by_vpp_handles = hash_create (0, sizeof (uword)); - wrk->ct_registration_by_mq = hash_create (0, sizeof (uword)); - clib_spinlock_init (&wrk->ct_registration_lock); clib_time_init (&wrk->clib_time); vec_validate (wrk->mq_events, 64); vec_validate (wrk->mq_msg_vector, 128); @@ -418,6 +345,9 @@ vcl_session_read_ready (vcl_session_t * session) if (session->session_state & STATE_LISTEN) return clib_fifo_elts (session->accept_evts_fifo); + if (vcl_session_is_ct (session)) + return svm_fifo_max_dequeue (session->ct_rx_fifo); + return svm_fifo_max_dequeue (session->rx_fifo); } @@ -452,6 +382,9 @@ vcl_session_write_ready (vcl_session_t * session) return rv; } + if (vcl_session_is_ct (session)) + return svm_fifo_max_enqueue (session->ct_tx_fifo); + return svm_fifo_max_enqueue (session->tx_fifo); } diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 2187499282f..dfe8b160edc 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -162,6 +162,9 @@ typedef struct u64 vpp_handle; u32 vpp_thread_index; + svm_fifo_t *ct_rx_fifo; + svm_fifo_t *ct_tx_fifo; + /* Socket configuration state */ u8 is_vep; u8 is_vep_session; @@ -270,15 +273,6 @@ typedef struct vcl_worker_ /** For deadman timers */ clib_time_t clib_time; - /** Pool of cut through registrations */ - vcl_cut_through_registration_t *cut_through_registrations; - - /** Lock for accessing ct registration pool */ - clib_spinlock_t ct_registration_lock; - - /** Cut-through registration by mq address hash table */ - uword *ct_registration_by_mq; - /** Vector acting as buffer for mq messages */ svm_msg_q_msg_t *mq_msg_vector; @@ -481,7 +475,7 @@ const char *vppcom_session_state_str (vcl_session_state_t state); static inline u8 vcl_session_is_ct (vcl_session_t * s) { - return (s->our_evt_q != 0); + return (s->ct_tx_fifo != 0); } static inline u8 @@ -516,19 +510,6 @@ vcl_session_closed_error (vcl_session_t * s) * Helpers */ int vcl_wait_for_app_state_change (app_state_t app_state); -vcl_cut_through_registration_t - * vcl_ct_registration_lock_and_alloc (vcl_worker_t * wrk); -void vcl_ct_registration_del (vcl_worker_t * wrk, - vcl_cut_through_registration_t * ctr); -u32 vcl_ct_registration_index (vcl_worker_t * wrk, - vcl_cut_through_registration_t * ctr); -void vcl_ct_registration_lock (vcl_worker_t * wrk); -void vcl_ct_registration_unlock (vcl_worker_t * wrk); -vcl_cut_through_registration_t - * vcl_ct_registration_lock_and_lookup (vcl_worker_t * wrk, uword mq_addr); -void vcl_ct_registration_lookup_add (vcl_worker_t * wrk, uword mq_addr, - u32 ctr_index); -void vcl_ct_registration_lookup_del (vcl_worker_t * wrk, uword mq_addr); vcl_mq_evt_conn_t *vcl_mq_evt_conn_alloc (vcl_worker_t * wrk); u32 vcl_mq_evt_conn_index (vcl_worker_t * wrk, vcl_mq_evt_conn_t * mqc); vcl_mq_evt_conn_t *vcl_mq_evt_conn_get (vcl_worker_t * wrk, u32 mq_conn_idx); @@ -581,10 +562,7 @@ vcl_n_workers (void) static inline svm_msg_q_t * vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s) { - if (vcl_session_is_ct (s)) - return wrk->vpp_event_queues[0]; - else - return wrk->vpp_event_queues[s->vpp_thread_index]; + return wrk->vpp_event_queues[s->vpp_thread_index]; } void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 7c076d3b315..3abde98288a 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -273,7 +273,6 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp) mp->listener_handle); if (!listen_session) { - svm_msg_q_t *evt_q; evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); clib_warning ("VCL<%d>: ERROR: couldn't find listen session: " "unknown vpp listener handle %llx", @@ -287,39 +286,23 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp) rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); - if (mp->server_event_queue_address) - { - session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address, - svm_msg_q_t *); - session->our_evt_q = uword_to_pointer (mp->server_event_queue_address, - svm_msg_q_t *); - if (vcl_wait_for_segment (mp->segment_handle)) - { - clib_warning ("segment for session %u couldn't be mounted!", - session->session_index); - return VCL_INVALID_SESSION_INDEX; - } - rx_fifo->master_session_index = session->session_index; - tx_fifo->master_session_index = session->session_index; - rx_fifo->master_thread_index = vcl_get_worker_index (); - tx_fifo->master_thread_index = vcl_get_worker_index (); - vec_validate (wrk->vpp_event_queues, 0); - evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); - wrk->vpp_event_queues[0] = evt_q; - } - else + if (vcl_wait_for_segment (mp->segment_handle)) { - session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, - svm_msg_q_t *); - rx_fifo->client_session_index = session->session_index; - tx_fifo->client_session_index = session->session_index; - rx_fifo->client_thread_index = vcl_get_worker_index (); - tx_fifo->client_thread_index = vcl_get_worker_index (); - vpp_wrk_index = tx_fifo->master_thread_index; - vec_validate (wrk->vpp_event_queues, vpp_wrk_index); - wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q; + clib_warning ("segment for session %u couldn't be mounted!", + session->session_index); + return VCL_INVALID_SESSION_INDEX; } + session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, + svm_msg_q_t *); + rx_fifo->client_session_index = session->session_index; + tx_fifo->client_session_index = session->session_index; + rx_fifo->client_thread_index = vcl_get_worker_index (); + tx_fifo->client_thread_index = vcl_get_worker_index (); + vpp_wrk_index = tx_fifo->master_thread_index; + vec_validate (wrk->vpp_event_queues, vpp_wrk_index); + wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q; + session->vpp_handle = mp->handle; session->vpp_thread_index = rx_fifo->master_thread_index; session->client_context = mp->context; @@ -356,7 +339,6 @@ vcl_session_connected_handler (vcl_worker_t * wrk, u32 session_index, vpp_wrk_index; svm_fifo_t *rx_fifo, *tx_fifo; vcl_session_t *session = 0; - svm_msg_q_t *evt_q; session_index = mp->context; session = vcl_session_get (wrk, session_index); @@ -380,8 +362,8 @@ vcl_session_connected_handler (vcl_worker_t * wrk, tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); if (vcl_wait_for_segment (mp->segment_handle)) { - clib_warning ("segment for session %u couldn't be mounted!", - session->session_index); + VDBG (0, "segment for session %u couldn't be mounted!", + session->session_index); return VCL_INVALID_SESSION_INDEX; } @@ -390,24 +372,22 @@ vcl_session_connected_handler (vcl_worker_t * wrk, rx_fifo->client_thread_index = vcl_get_worker_index (); tx_fifo->client_thread_index = vcl_get_worker_index (); - if (mp->client_event_queue_address) - { - session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address, - svm_msg_q_t *); - session->our_evt_q = uword_to_pointer (mp->client_event_queue_address, - svm_msg_q_t *); + session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, + svm_msg_q_t *); + vpp_wrk_index = tx_fifo->master_thread_index; + vec_validate (wrk->vpp_event_queues, vpp_wrk_index); + wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q; - vec_validate (wrk->vpp_event_queues, 0); - evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); - wrk->vpp_event_queues[0] = evt_q; - } - else + if (mp->ct_rx_fifo) { - session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, - svm_msg_q_t *); - vpp_wrk_index = tx_fifo->master_thread_index; - vec_validate (wrk->vpp_event_queues, vpp_wrk_index); - wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q; + session->ct_rx_fifo = uword_to_pointer (mp->ct_rx_fifo, svm_fifo_t *); + session->ct_tx_fifo = uword_to_pointer (mp->ct_tx_fifo, svm_fifo_t *); + if (vcl_wait_for_segment (mp->ct_segment_handle)) + { + VDBG (0, "ct segment for session %u couldn't be mounted!", + session->session_index); + return VCL_INVALID_SESSION_INDEX; + } } session->rx_fifo = rx_fifo; @@ -667,10 +647,11 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e) switch (e->event_type) { - case FIFO_EVENT_APP_RX: - case FIFO_EVENT_APP_TX: - case SESSION_IO_EVT_CT_RX: - case SESSION_IO_EVT_CT_TX: + case SESSION_IO_EVT_RX: + case SESSION_IO_EVT_TX: + session = vcl_session_get (wrk, e->fifo->client_session_index); + if (!session || !(session->session_state & STATE_OPEN)) + break; vec_add1 (wrk->unhandled_evts_vector, *e); break; case SESSION_CTRL_EVT_ACCEPTED: @@ -1123,23 +1104,6 @@ vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * session, } } - if (vcl_session_is_ct (session)) - { - vcl_cut_through_registration_t *ctr; - uword mq_addr; - - mq_addr = pointer_to_uword (session->our_evt_q); - ctr = vcl_ct_registration_lock_and_lookup (wrk, mq_addr); - ASSERT (ctr); - if (ctr->epoll_evt_conn_index != ~0) - vcl_mq_epoll_del_evfd (wrk, ctr->epoll_evt_conn_index); - VDBG (0, "Removing ct registration %u", - vcl_ct_registration_index (wrk, ctr)); - vcl_ct_registration_del (wrk, ctr); - vcl_ct_registration_lookup_del (wrk, mq_addr); - vcl_ct_registration_unlock (wrk); - } - VDBG (0, "session %u [0x%llx] removed", session->session_index, vpp_handle); cleanup: @@ -1336,7 +1300,6 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep, session_accepted_msg_t accepted_msg; vcl_session_t *listen_session = 0; vcl_session_t *client_session = 0; - svm_msg_q_t *vpp_evt_q; vcl_session_msg_t *evt; u64 listen_vpp_handle; svm_msg_q_msg_t msg; @@ -1411,13 +1374,8 @@ handle: sizeof (ip6_address_t)); } - if (accepted_msg.server_event_queue_address) - vpp_evt_q = uword_to_pointer (accepted_msg.vpp_event_queue_address, - svm_msg_q_t *); - else - vpp_evt_q = client_session->vpp_evt_q; - - vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context, + vcl_send_session_accepted_reply (client_session->vpp_evt_q, + client_session->client_context, client_session->vpp_handle, 0); VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u " @@ -1533,11 +1491,8 @@ vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep) static u8 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct) { - if (!is_ct) - return (e->event_type == FIFO_EVENT_APP_RX - && e->fifo->client_session_index == sid); - else - return (e->event_type == SESSION_IO_EVT_CT_TX); + return (e->event_type == SESSION_IO_EVT_RX + && e->fifo->client_session_index == sid); } static inline int @@ -1570,17 +1525,18 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK); is_ct = vcl_session_is_ct (s); - mq = is_ct ? s->our_evt_q : wrk->app_event_queue; - rx_fifo = s->rx_fifo; + mq = wrk->app_event_queue; + rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo; s->has_rx_evt = 0; + if (is_ct) + svm_fifo_unset_event (s->rx_fifo); + if (svm_fifo_is_empty (rx_fifo)) { + svm_fifo_unset_event (rx_fifo); if (is_nonblocking) - { - svm_fifo_unset_event (rx_fifo); - return VPPCOM_EWOULDBLOCK; - } + return VPPCOM_EWOULDBLOCK; while (svm_fifo_is_empty (rx_fifo)) { if (vcl_session_is_closing (s)) @@ -1595,7 +1551,10 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, e = svm_msg_q_msg_data (mq, &msg); svm_msg_q_unlock (mq); if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct)) - vcl_handle_mq_event (wrk, e); + { + clib_warning ("THIS ONE type %u", e->event_type); + vcl_handle_mq_event (wrk, e); + } svm_msg_q_free_msg (mq, &msg); } } @@ -1608,13 +1567,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, if (svm_fifo_is_empty (rx_fifo)) svm_fifo_unset_event (rx_fifo); - if (is_ct && svm_fifo_needs_tx_ntf (rx_fifo, n_read)) - { - svm_fifo_clear_tx_ntf (s->rx_fifo); - app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX, - SVM_Q_WAIT); - } - VDBG (2, "vpp handle 0x%llx, sid %u: read %d bytes from (%p)", s->vpp_handle, session_handle, n_read, rx_fifo); @@ -1659,6 +1611,9 @@ vppcom_session_read_segments (uint32_t session_handle, rx_fifo = s->rx_fifo; s->has_rx_evt = 0; + if (is_ct) + svm_fifo_unset_event (s->rx_fifo); + if (svm_fifo_is_empty (rx_fifo)) { if (is_nonblocking) @@ -1688,14 +1643,6 @@ vppcom_session_read_segments (uint32_t session_handle, n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds); svm_fifo_unset_event (rx_fifo); - if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems) - { - /* If the peer is not polling send notification */ - if (!svm_fifo_has_event (s->rx_fifo)) - app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, - SESSION_IO_EVT_CT_RX, SVM_Q_WAIT); - } - return n_read; } @@ -1729,11 +1676,8 @@ vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes) static u8 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct) { - if (!is_ct) - return (e->event_type == FIFO_EVENT_APP_TX - && e->fifo->client_session_index == sid); - else - return (e->event_type == SESSION_IO_EVT_CT_RX); + return (e->event_type == SESSION_IO_EVT_TX + && e->fifo->client_session_index == sid); } static inline int @@ -1772,10 +1716,10 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n, return vcl_session_closed_error (s);; } - tx_fifo = s->tx_fifo; is_ct = vcl_session_is_ct (s); + tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo; is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK); - mq = is_ct ? s->our_evt_q : wrk->app_event_queue; + mq = wrk->app_event_queue; if (svm_fifo_is_full (tx_fifo)) { if (is_nonblocking) @@ -1801,17 +1745,20 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n, } } - ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX); - et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s); - if (is_flush && !vcl_session_is_ct (s)) + et = SESSION_IO_EVT_TX; + if (is_flush && !is_ct) et = SESSION_IO_EVT_TX_FLUSH; if (s->is_dgram) n_write = app_send_dgram_raw (tx_fifo, &s->transport, - s->vpp_evt_q, buf, n, et, SVM_Q_WAIT); + s->vpp_evt_q, buf, n, et, + !is_ct /* do_evt */ , SVM_Q_WAIT); else n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et, - SVM_Q_WAIT); + !is_ct /* do_evt */ , SVM_Q_WAIT); + + if (is_ct && svm_fifo_set_event (s->tx_fifo)) + app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo, et, SVM_Q_WAIT); ASSERT (n_write > 0); @@ -1835,32 +1782,6 @@ vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n) 1 /* is_flush */ ); } - -static vcl_session_t * -vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type) -{ - vcl_session_t *s; - s = vcl_session_get (wrk, f->client_session_index); - if (s) - { - /* rx fifo */ - if (type == 0 && s->rx_fifo == f) - return s; - /* tx fifo */ - if (type == 1 && s->tx_fifo == f) - return s; - } - s = vcl_session_get (wrk, f->master_session_index); - if (s) - { - if (type == 0 && s->rx_fifo == f) - return s; - if (type == 1 && s->tx_fifo == f) - return s; - } - return 0; -} - #define vcl_fifo_rx_evt_valid_or_break(_fifo) \ if (PREDICT_FALSE (svm_fifo_is_empty (_fifo))) \ { \ @@ -1905,29 +1826,6 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, *bits_set += 1; } break; - case SESSION_IO_EVT_CT_TX: - vcl_fifo_rx_evt_valid_or_break (e->fifo); - session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0); - if (!session) - break; - sid = session->session_index; - if (sid < n_bits && read_map) - { - clib_bitmap_set_no_check ((uword *) read_map, sid, 1); - *bits_set += 1; - } - break; - case SESSION_IO_EVT_CT_RX: - session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1); - if (!session) - break; - sid = session->session_index; - if (sid < n_bits && write_map) - { - clib_bitmap_set_no_check ((uword *) write_map, sid, 1); - *bits_set += 1; - } - break; case SESSION_CTRL_EVT_ACCEPTED: session = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data); @@ -2038,31 +1936,9 @@ vppcom_select_condvar (vcl_worker_t * wrk, int n_bits, vcl_si_set * except_map, double time_to_wait, u32 * bits_set) { - double total_wait = 0, wait_slice; - vcl_cut_through_registration_t *cr; - time_to_wait = (time_to_wait == -1) ? 1e6 : time_to_wait; - wait_slice = wrk->cut_through_registrations ? 10e-6 : time_to_wait; - do - { - vcl_ct_registration_lock (wrk); - /* *INDENT-OFF* */ - pool_foreach (cr, wrk->cut_through_registrations, ({ - vcl_select_handle_mq (wrk, cr->mq, n_bits, read_map, write_map, except_map, - 0, bits_set); - })); - /* *INDENT-ON* */ - vcl_ct_registration_unlock (wrk); - - vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map, - write_map, except_map, wait_slice, bits_set); - total_wait += wait_slice; - if (*bits_set) - return *bits_set; - } - while (total_wait < time_to_wait); - - return 0; + return vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map, + write_map, except_map, time_to_wait, bits_set); } static int @@ -2477,7 +2353,7 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, switch (e->event_type) { - case FIFO_EVENT_APP_RX: + case SESSION_IO_EVT_RX: ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ()); vcl_fifo_rx_evt_valid_or_break (e->fifo); sid = e->fifo->client_session_index; @@ -2491,7 +2367,7 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, session_evt_data = session->vep.ev.data.u64; session->has_rx_evt = 1; break; - case FIFO_EVENT_APP_TX: + case SESSION_IO_EVT_TX: sid = e->fifo->client_session_index; if (!(session = vcl_session_get (wrk, sid))) break; @@ -2503,33 +2379,6 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, session_evt_data = session->vep.ev.data.u64; svm_fifo_reset_tx_ntf (session->tx_fifo); break; - case SESSION_IO_EVT_CT_TX: - vcl_fifo_rx_evt_valid_or_break (e->fifo); - session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0); - if (PREDICT_FALSE (!session)) - break; - sid = session->session_index; - session_events = session->vep.ev.events; - if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt) - break; - add_event = 1; - events[*num_ev].events |= EPOLLIN; - session_evt_data = session->vep.ev.data.u64; - session->has_rx_evt = 1; - break; - case SESSION_IO_EVT_CT_RX: - session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1); - if (PREDICT_FALSE (!session)) - break; - sid = session->session_index; - session_events = session->vep.ev.events; - if (!(EPOLLOUT & session_events)) - break; - add_event = 1; - events[*num_ev].events |= EPOLLOUT; - session_evt_data = session->vep.ev.data.u64; - svm_fifo_reset_tx_ntf (session->tx_fifo); - break; case SESSION_CTRL_EVT_ACCEPTED: session = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data); @@ -2663,33 +2512,9 @@ static int vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events, int maxevents, u32 n_evts, double wait_for_time) { - vcl_cut_through_registration_t *cr; - double total_wait = 0, wait_slice; - int rv; - wait_for_time = (wait_for_time == -1) ? (double) 1e6 : wait_for_time; - wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time; - - do - { - vcl_ct_registration_lock (wrk); - /* *INDENT-OFF* */ - pool_foreach (cr, wrk->cut_through_registrations, ({ - vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &n_evts); - })); - /* *INDENT-ON* */ - vcl_ct_registration_unlock (wrk); - - rv = vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events, - maxevents, n_evts ? 0 : wait_slice, - &n_evts); - if (rv) - total_wait += wait_slice; - if (n_evts) - return n_evts; - } - while (total_wait < wait_for_time); - return n_evts; + return vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events, + maxevents, wait_for_time, &n_evts); } static int diff --git a/src/vnet/session-apps/echo_server.c b/src/vnet/session-apps/echo_server.c index 2762347d441..941c16d0e08 100644 --- a/src/vnet/session-apps/echo_server.c +++ b/src/vnet/session-apps/echo_server.c @@ -240,14 +240,16 @@ echo_server_rx_callback (session_t * s) n_written = app_send_stream_raw (tx_fifo, esm->vpp_queue[thread_index], esm->rx_buf[thread_index], - actual_transfer, SESSION_IO_EVT_TX, 0); + actual_transfer, SESSION_IO_EVT_TX, + 1 /* do_evt */ , 0); } else { n_written = app_send_dgram_raw (tx_fifo, &at, esm->vpp_queue[s->thread_index], esm->rx_buf[thread_index], - actual_transfer, SESSION_IO_EVT_TX, 0); + actual_transfer, SESSION_IO_EVT_TX, + 1 /* do_evt */ , 0); } if (n_written != max_transfer) diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 935a352a436..d4dfeec54dc 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -283,8 +283,6 @@ typedef struct session_accepted_msg_ uword server_tx_fifo; u64 segment_handle; uword vpp_event_queue_address; - uword server_event_queue_address; - uword client_event_queue_address; u16 port; u8 is_ip4; u8 ip[16]; @@ -309,9 +307,10 @@ typedef struct session_connected_msg_ uword server_rx_fifo; uword server_tx_fifo; u64 segment_handle; + uword ct_rx_fifo; + uword ct_tx_fifo; + u64 ct_segment_handle; uword vpp_event_queue_address; - uword client_event_queue_address; - uword server_event_queue_address; u32 segment_size; u8 segment_name_length; u8 segment_name[64]; @@ -454,7 +453,7 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type, always_inline int app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at, svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 evt_type, - u8 noblock) + u8 do_evt, u8 noblock) { u32 max_enqueue, actual_write; session_dgram_hdr_t hdr; @@ -478,7 +477,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at, if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0) { - if (svm_fifo_set_event (f)) + if (do_evt && svm_fifo_set_event (f)) app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock); } ASSERT (rv); @@ -489,18 +488,19 @@ always_inline int app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock) { return app_send_dgram_raw (s->tx_fifo, &s->transport, s->vpp_evt_q, data, - len, SESSION_IO_EVT_TX, noblock); + len, SESSION_IO_EVT_TX, 1 /* do_evt */ , + noblock); } always_inline int app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data, - u32 len, u8 evt_type, u8 noblock) + u32 len, u8 evt_type, u8 do_evt, u8 noblock) { int rv; if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0) { - if (svm_fifo_set_event (f)) + if (do_evt && svm_fifo_set_event (f)) app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock); } return rv; @@ -510,7 +510,7 @@ always_inline int app_send_stream (app_session_t * s, u8 * data, u32 len, u8 noblock) { return app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, data, len, - SESSION_IO_EVT_TX, noblock); + SESSION_IO_EVT_TX, 1 /* do_evt */ , noblock); } always_inline int diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c index 9378e5e6380..745b202f580 100644 --- a/src/vnet/session/application_local.c +++ b/src/vnet/session/application_local.c @@ -16,9 +16,28 @@ #include #include -ct_connection_t *connections; +static ct_connection_t *connections; -ct_connection_t * +static void +ct_enable_disable_main_pre_input_node (u8 is_add) +{ + u32 n_conns; + + n_conns = pool_elts (connections); + if (n_conns > 2) + return; + + if (n_conns > 0 && is_add) + vlib_node_set_state (vlib_get_main (), + session_queue_pre_input_node.index, + VLIB_NODE_STATE_POLLING); + else if (n_conns == 0) + vlib_node_set_state (vlib_get_main (), + session_queue_pre_input_node.index, + VLIB_NODE_STATE_DISABLED); +} + +static ct_connection_t * ct_connection_alloc (void) { ct_connection_t *ct; @@ -31,7 +50,7 @@ ct_connection_alloc (void) return ct; } -ct_connection_t * +static ct_connection_t * ct_connection_get (u32 ct_index) { if (pool_is_free_index (connections, ct_index)) @@ -39,7 +58,7 @@ ct_connection_get (u32 ct_index) return pool_elt_at_index (connections, ct_index); } -void +static void ct_connection_free (ct_connection_t * ct) { if (CLIB_DEBUG) @@ -110,8 +129,14 @@ ct_session_connect_notify (session_t * ss) cs->session_state = SESSION_STATE_CONNECTING; cs->app_wrk_index = client_wrk->wrk_index; cs->connection_index = cct->c_c_index; + cs->t_app_index = client_wrk->app_index; cct->c_s_index = cs->session_index; + cct->client_rx_fifo = ss->tx_fifo; + cct->client_tx_fifo = ss->rx_fifo; + + cct->client_rx_fifo->refcnt++; + cct->client_tx_fifo->refcnt++; /* This will allocate fifos for the session. They won't be used for * exchanging data but they will be used to close the connection if @@ -135,47 +160,25 @@ ct_session_connect_notify (session_t * ss) return 0; } -static void -ct_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq) -{ - int fd; - - /* - * segment manager initializes only the producer eventds, since vpp is - * typically the producer. But for local sessions, we also pass to the - * apps the mqs they listen on for events from peer apps, so they are also - * consumer fds. - */ - fd = svm_msg_q_get_producer_eventfd (sq); - svm_msg_q_set_consumer_eventfd (sq, fd); - fd = svm_msg_q_get_producer_eventfd (cq); - svm_msg_q_set_consumer_eventfd (cq, fd); -} - -int +static int ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk, ct_connection_t * ct, session_t * ls, session_t * ll) { - u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10; - u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index; - segment_manager_properties_t *props, *cprops; + u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index, seg_size; + segment_manager_properties_t *props; svm_fifo_segment_private_t *seg; - application_t *server, *client; + application_t *server; segment_manager_t *sm; - svm_msg_q_t *sq, *cq; + u32 margin = 16 << 10; u64 segment_handle; int seg_index, rv; server = application_get (server_wrk->app_index); - client = application_get (client_wrk->app_index); props = application_segment_manager_properties (server); - cprops = application_segment_manager_properties (client); - evt_q_elts = props->evt_q_size + cprops->evt_q_size; - evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts); round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size); round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size); - seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin; + seg_size = round_rx_fifo_sz + round_tx_fifo_sz + margin; sm = app_worker_get_listen_segment_manager (server_wrk, ll); seg_index = segment_manager_add_segment (sm, seg_size); @@ -185,14 +188,7 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk, return seg_index; } seg = segment_manager_get_segment_w_lock (sm, seg_index); - sq = segment_manager_alloc_queue (seg, props); - cq = segment_manager_alloc_queue (seg, cprops); - - if (props->use_mq_eventfd) - ct_session_fix_eventds (sq, cq); - ct->server_evt_q = pointer_to_uword (sq); - ct->client_evt_q = pointer_to_uword (cq); rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size, props->tx_fifo_size, &ls->rx_fifo, &ls->tx_fifo); @@ -204,8 +200,8 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk, } sm_index = segment_manager_index (sm); - ls->rx_fifo->ct_session_index = ls->session_index; - ls->tx_fifo->ct_session_index = ls->session_index; + ls->rx_fifo->master_session_index = ls->session_index; + ls->tx_fifo->master_session_index = ls->session_index; ls->rx_fifo->segment_manager = sm_index; ls->tx_fifo->segment_manager = sm_index; ls->rx_fifo->segment_index = seg_index; @@ -228,7 +224,7 @@ failed: return rv; } -int +static int ct_connect (app_worker_t * client_wrk, session_t * ll, session_endpoint_cfg_t * sep) { @@ -255,6 +251,7 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, cct->c_is_ip4 = sep->is_ip4; clib_memcpy (&cct->c_rmt_ip, &sep->ip, sizeof (sep->ip)); cct->actual_tp = ll_ct->actual_tp; + cct->is_client = 1; /* * Init server transport @@ -285,6 +282,7 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, server_wrk = application_listener_select_worker (ll); ss->app_wrk_index = server_wrk->wrk_index; + ss->t_app_index = server_wrk->app_index; sct->c_s_index = ss->session_index; sct->server_wrk = ss->app_wrk_index; @@ -306,14 +304,12 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, return -1; } - cct->client_evt_q = sct->client_evt_q; - cct->server_evt_q = sct->server_evt_q; cct->segment_handle = sct->segment_handle; - + ct_enable_disable_main_pre_input_node (1 /* is_add */ ); return 0; } -u32 +static u32 ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep) { session_endpoint_cfg_t *sep; @@ -326,25 +322,27 @@ ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep) clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip)); ct->c_lcl_port = sep->port; ct->actual_tp = sep->transport_proto; + ct_enable_disable_main_pre_input_node (1 /* is_add */ ); return ct->c_c_index; } -u32 +static u32 ct_stop_listen (u32 ct_index) { ct_connection_t *ct; ct = ct_connection_get (ct_index); ct_connection_free (ct); + ct_enable_disable_main_pre_input_node (0 /* is_add */ ); return 0; } -transport_connection_t * +static transport_connection_t * ct_listener_get (u32 ct_index) { return (transport_connection_t *) ct_connection_get (ct_index); } -int +static int ct_session_connect (transport_endpoint_cfg_t * tep) { session_endpoint_cfg_t *sep_ext; @@ -407,10 +405,11 @@ global_scope: return 1; } -void +static void ct_session_close (u32 ct_index, u32 thread_index) { ct_connection_t *ct, *peer_ct; + app_worker_t *app_wrk; session_t *s; ct = ct_connection_get (ct_index); @@ -422,13 +421,18 @@ ct_session_close (u32 ct_index, u32 thread_index) } s = session_get (ct->c_s_index, 0); - app_worker_del_segment_notify (app_worker_get (s->app_wrk_index), - ct->segment_handle); + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (app_wrk) + app_worker_del_segment_notify (app_wrk, ct->segment_handle); session_free_w_fifos (s); + if (ct->is_client) + segment_manager_dealloc_fifos (ct->client_rx_fifo, ct->client_tx_fifo); + ct_connection_free (ct); + ct_enable_disable_main_pre_input_node (0 /* is_add */ ); } -transport_connection_t * +static transport_connection_t * ct_session_get (u32 ct_index, u32 thread_index) { return (transport_connection_t *) ct_connection_get (ct_index); @@ -460,7 +464,7 @@ format_ct_connection_id (u8 * s, va_list * args) return s; } -u8 * +static u8 * format_ct_listener (u8 * s, va_list * args) { u32 tc_index = va_arg (*args, u32); @@ -472,7 +476,7 @@ format_ct_listener (u8 * s, va_list * args) return s; } -u8 * +static u8 * format_ct_connection (u8 * s, va_list * args) { ct_connection_t *ct = va_arg (*args, ct_connection_t *); @@ -492,7 +496,7 @@ format_ct_connection (u8 * s, va_list * args) return s; } -u8 * +static u8 * format_ct_session (u8 * s, va_list * args) { u32 ct_index = va_arg (*args, u32); @@ -526,11 +530,29 @@ const static transport_proto_vft_t cut_thru_proto = { }; /* *INDENT-ON* */ +int +ct_session_tx (session_t * s) +{ + ct_connection_t *ct, *peer_ct; + session_t *peer_s; + + ct = (ct_connection_t *) session_get_transport (s); + peer_ct = ct_connection_get (ct->peer_index); + if (!peer_ct) + return -1; + peer_s = session_get (peer_ct->c_s_index, 0); + if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + return 0; + return session_enqueue_notify (peer_s); +} + static clib_error_t * ct_transport_init (vlib_main_t * vm) { transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto, FIB_PROTOCOL_IP4, ~0); + transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto, + FIB_PROTOCOL_IP6, ~0); return 0; } diff --git a/src/vnet/session/application_local.h b/src/vnet/session/application_local.h index 5d6e6c1ec7b..7b937d32f0b 100644 --- a/src/vnet/session/application_local.h +++ b/src/vnet/session/application_local.h @@ -27,16 +27,18 @@ typedef struct ct_connection_ u32 server_wrk; u32 transport_listener_index; transport_proto_t actual_tp; - u64 server_evt_q; - u64 client_evt_q; u32 client_opaque; u32 peer_index; u64 segment_handle; + svm_fifo_t *client_rx_fifo; + svm_fifo_t *client_tx_fifo; + u8 is_client; } ct_connection_t; session_t *ct_session_get_peer (session_t * s); void ct_session_endpoint (session_t * ll, session_endpoint_t * sep); int ct_session_connect_notify (session_t * ls); +int ct_session_tx (session_t * s); #endif /* SRC_VNET_SESSION_APPLICATION_LOCAL_H_ */ diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index 9dfa3aa0243..7c888882093 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -543,7 +543,7 @@ app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock) return app->cb_fns.builtin_app_rx_callback (s); } - if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo)) + if (svm_fifo_has_event (s->rx_fifo)) return 0; mq = app_wrk->event_queue; @@ -608,9 +608,8 @@ app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock) typedef int (app_send_evt_handler_fn) (app_worker_t *app, session_t *s, u8 lock); -static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = { +static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = { app_send_io_evt_rx, - 0, app_send_io_evt_tx, }; /* *INDENT-ON* */ diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c index b7467bbbd43..25b641de167 100644 --- a/src/vnet/session/segment_manager.c +++ b/src/vnet/session/segment_manager.c @@ -408,12 +408,10 @@ segment_manager_del_sessions (segment_manager_t * sm) */ while (fifo) { - if (fifo->ct_session_index != SVM_FIFO_INVALID_SESSION_INDEX) - session = session_get (fifo->ct_session_index, 0); - else - session = session_get (fifo->master_session_index, - fifo->master_thread_index); - vec_add1 (handles, session_handle (session)); + session = session_get_if_valid (fifo->master_session_index, + fifo->master_thread_index); + if (session) + vec_add1 (handles, session_handle (session)); fifo = fifo->next; } diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 57567926a1c..6e24d562f98 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -124,13 +124,6 @@ session_program_transport_close (session_t * s) session_worker_t *wrk; session_event_t *evt; - if (!session_has_transport (s)) - { - /* Polling may not be enabled on main thread so close now */ - session_transport_close (s); - return; - } - /* If we are in the handler thread, or being called with the worker barrier * held, just append a new event to pending disconnects vector. */ if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index) @@ -483,7 +476,7 @@ session_notify_subscribers (u32 app_index, session_t * s, * @return 0 on success or negative number if failed to send notification. */ static inline int -session_enqueue_notify (session_t * s) +session_enqueue_notify_inline (session_t * s) { app_worker_t *app_wrk; @@ -512,6 +505,12 @@ session_enqueue_notify (session_t * s) return 0; } +int +session_enqueue_notify (session_t * s) +{ + return session_enqueue_notify_inline (s); +} + int session_dequeue_notify (session_t * s) { @@ -560,7 +559,11 @@ session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index) errors++; continue; } - if (PREDICT_FALSE (session_enqueue_notify (s))) + + if (svm_fifo_is_empty (s->rx_fifo)) + continue; + + if (PREDICT_FALSE (session_enqueue_notify_inline (s))) errors++; } diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index a7c91949eb3..cea1b375108 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -187,6 +187,7 @@ typedef struct session_main_ extern session_main_t session_main; extern vlib_node_registration_t session_queue_node; extern vlib_node_registration_t session_queue_process_node; +extern vlib_node_registration_t session_queue_pre_input_node; #define SESSION_Q_PROCESS_FLUSH_FRAMES 1 #define SESSION_Q_PROCESS_STOP 2 @@ -196,6 +197,9 @@ session_is_valid (u32 si, u8 thread_index) { session_t *s; s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si); + if (s->session_state == SESSION_STATE_CLOSED) + return 1; + if (s->thread_index != thread_index || s->session_index != si) return 0; return 1; @@ -331,6 +335,7 @@ void session_transport_close (session_t * s); void session_transport_cleanup (session_t * s); int session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type); +int session_enqueue_notify (session_t * s); int session_dequeue_notify (session_t * s); int session_send_io_evt_to_thread_custom (void *data, u32 thread_index, session_evt_type_t evt_type); diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 8ee25a9a2ce..525f63799be 100755 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -149,54 +149,6 @@ send_del_segment_callback (u32 api_client_index, u64 segment_handle) return 0; } -static int -send_app_cut_through_registration_add (u32 api_client_index, - u32 wrk_map_index, u64 mq_addr, - u64 peer_mq_addr) -{ - vl_api_app_cut_through_registration_add_t *mp; - vl_api_registration_t *reg; - svm_msg_q_t *mq, *peer_mq; - int fds[2]; - - reg = vl_mem_api_client_index_to_registration (api_client_index); - if (!reg) - { - clib_warning ("no registration: %u", api_client_index); - return -1; - } - - mp = vl_mem_api_alloc_as_if_client_w_reg (reg, sizeof (*mp)); - clib_memset (mp, 0, sizeof (*mp)); - mp->_vl_msg_id = - clib_host_to_net_u16 (VL_API_APP_CUT_THROUGH_REGISTRATION_ADD); - - mp->evt_q_address = mq_addr; - mp->peer_evt_q_address = peer_mq_addr; - mp->wrk_index = wrk_map_index; - - mq = uword_to_pointer (mq_addr, svm_msg_q_t *); - peer_mq = uword_to_pointer (peer_mq_addr, svm_msg_q_t *); - - if (svm_msg_q_get_producer_eventfd (mq) != -1) - { - mp->fd_flags |= SESSION_FD_F_MQ_EVENTFD; - mp->n_fds = 2; - /* app will overwrite exactly the fds we pass here. So - * when we swap mq with peer_mq (accept vs connect) the - * fds will still be valid */ - fds[0] = svm_msg_q_get_consumer_eventfd (mq); - fds[1] = svm_msg_q_get_producer_eventfd (peer_mq); - } - - vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp); - - if (mp->n_fds != 0) - session_send_fds (reg, fds, mp->n_fds); - - return 0; -} - static int mq_try_lock_and_alloc_msg (svm_msg_q_t * app_mq, svm_msg_q_msg_t * msg) { @@ -268,25 +220,17 @@ mq_send_session_accepted_cb (session_t * s) } else { - u8 main_thread = vlib_num_workers ()? 1 : 0; ct_connection_t *ct; ct = (ct_connection_t *) session_get_transport (s); - send_app_cut_through_registration_add (app_wrk->api_client_index, - app_wrk->wrk_map_index, - ct->server_evt_q, - ct->client_evt_q); - listener = listen_session_get (s->listener_index); al = app_listener_get (app, listener->al_index); mp->listener_handle = app_listener_handle (al); mp->is_ip4 = session_type_is_ip4 (listener->session_type); mp->handle = session_handle (s); mp->port = ct->c_rmt_port; - vpp_queue = session_main_get_vpp_event_queue (main_thread); + vpp_queue = session_main_get_vpp_event_queue (0); mp->vpp_event_queue_address = pointer_to_uword (vpp_queue); - mp->client_event_queue_address = ct->client_evt_q; - mp->server_event_queue_address = ct->server_evt_q; } svm_msg_q_add_and_unlock (app_mq, msg); @@ -415,26 +359,22 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context, } else { - u8 main_thread = vlib_num_workers ()? 1 : 0; ct_connection_t *cct; session_t *ss; cct = (ct_connection_t *) session_get_transport (s); - send_app_cut_through_registration_add (app_wrk->api_client_index, - app_wrk->wrk_map_index, - cct->client_evt_q, - cct->server_evt_q); - mp->handle = session_handle (s); mp->lcl_port = cct->c_lcl_port; - vpp_mq = session_main_get_vpp_event_queue (main_thread); + mp->is_ip4 = cct->c_is_ip4; + vpp_mq = session_main_get_vpp_event_queue (0); mp->vpp_event_queue_address = pointer_to_uword (vpp_mq); - mp->client_event_queue_address = cct->client_evt_q; - mp->server_event_queue_address = cct->server_evt_q; + mp->server_rx_fifo = pointer_to_uword (s->rx_fifo); + mp->server_tx_fifo = pointer_to_uword (s->tx_fifo); + mp->segment_handle = session_segment_handle (s); ss = ct_session_get_peer (s); - mp->server_rx_fifo = pointer_to_uword (ss->tx_fifo); - mp->server_tx_fifo = pointer_to_uword (ss->rx_fifo); - mp->segment_handle = session_segment_handle (ss); + mp->ct_rx_fifo = pointer_to_uword (ss->tx_fifo); + mp->ct_tx_fifo = pointer_to_uword (ss->rx_fifo); + mp->ct_segment_handle = session_segment_handle (ss); } done: @@ -505,11 +445,20 @@ done: return 0; } +static int +mq_app_tx_callback (session_t * s) +{ + if (session_has_transport (s)) + return 0; + return ct_session_tx (s); +} + static session_cb_vft_t session_mq_cb_vft = { .session_accept_callback = mq_send_session_accepted_cb, .session_disconnect_callback = mq_send_session_disconnected_cb, .session_connected_callback = mq_send_session_connected_cb, .session_reset_callback = mq_send_session_reset_cb, + .builtin_app_tx_callback = mq_app_tx_callback, .add_segment_callback = send_add_segment_callback, .del_segment_callback = send_del_segment_callback, }; diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index d0936c7e13d..db5123b8b2d 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -74,9 +74,9 @@ session_mq_accepted_reply_handler (void *data) if (!session_has_transport (s)) { + s->session_state = SESSION_STATE_READY; if (ct_session_connect_notify (s)) return; - s->session_state = SESSION_STATE_READY; } else { @@ -1234,6 +1234,26 @@ VLIB_REGISTER_NODE (session_queue_process_node) = }; /* *INDENT-ON* */ +static_always_inline uword +session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, + vlib_frame_t * frame) +{ + session_main_t *sm = &session_main; + if (!sm->wrk[0].vpp_event_queue) + return 0; + return session_queue_node_fn (vm, node, frame); +} + +/* *INDENT-OFF* */ +VLIB_REGISTER_NODE (session_queue_pre_input_node) = +{ + .function = session_queue_pre_input_inline, + .type = VLIB_NODE_TYPE_PRE_INPUT, + .name = "session-queue-main", + .state = VLIB_NODE_STATE_DISABLED, +}; +/* *INDENT-ON* */ + /* * fd.io coding-style-patch-verification: ON * diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index c4240ab420a..9e51d69db42 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -278,9 +278,7 @@ session_parse_handle (session_handle_t handle, u32 * index, typedef enum { SESSION_IO_EVT_RX, - SESSION_IO_EVT_CT_RX, SESSION_IO_EVT_TX, - SESSION_IO_EVT_CT_TX, SESSION_IO_EVT_TX_FLUSH, SESSION_IO_EVT_BUILTIN_RX, SESSION_IO_EVT_BUILTIN_TX, diff --git a/test/test_vcl.py b/test/test_vcl.py index 71ad902d208..9a8662d2275 100644 --- a/test/test_vcl.py +++ b/test/test_vcl.py @@ -259,8 +259,8 @@ class LDPCutThruTestCase(VCLTestCase): self.server_port] def tearDown(self): + self.logger.debug(self.vapi.cli("show session verbose 2")) self.cut_thru_tear_down() - super(LDPCutThruTestCase, self).tearDown() @unittest.skipUnless(running_extended_tests, "part of extended tests")