From a44d6b133bfb7ee0fb11d6ae8d9f0f00e57f242c Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Wed, 3 Oct 2018 14:29:10 -0700 Subject: [PATCH] udp_echo: fix cut-through server mode Change-Id: I20dd2071c936eb2c1870d45860553007c8e20587 Signed-off-by: Florin Coras --- src/tests/vnet/session/udp_echo.c | 423 +++++++++++++++++++------------------- src/vnet/session/application.c | 21 +- src/vnet/session/application.h | 24 +-- 3 files changed, 243 insertions(+), 225 deletions(-) diff --git a/src/tests/vnet/session/udp_echo.c b/src/tests/vnet/session/udp_echo.c index 5691ca4f042..9872eed27ba 100644 --- a/src/tests/vnet/session/udp_echo.c +++ b/src/tests/vnet/session/udp_echo.c @@ -90,6 +90,7 @@ typedef struct /* Our event queue */ svm_msg_q_t *our_event_queue; + svm_msg_q_t *ct_event_queue; /* $$$ single thread only for the moment */ svm_msg_q_t *vpp_event_queue; @@ -128,15 +129,11 @@ typedef struct uword *segments_table; u8 do_echo; u8 have_return; + u64 total_to_send; u64 bytes_to_send; + u64 bytes_sent; } udp_echo_main_t; -#if CLIB_DEBUG > 0 -#define NITER 10000 -#else -#define NITER 4000000 -#endif - udp_echo_main_t udp_echo_main; static void @@ -371,8 +368,8 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * return; } - utm->our_event_queue = - uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *); + utm->our_event_queue = uword_to_pointer (mp->app_event_queue_address, + svm_msg_q_t *); utm->state = STATE_ATTACHED; } @@ -416,6 +413,8 @@ wait_for_state_change (udp_echo_main_t * utm, connection_state_t state) { if (utm->state == state) return 0; + if (utm->state == STATE_FAILED) + return -1; } return -1; } @@ -431,7 +430,7 @@ cut_through_thread_fn (void *arg) u8 *my_copy_buffer = 0; udp_echo_main_t *utm = &udp_echo_main; i32 actual_transfer; - int rv; + int rv, do_dequeue = 0; u32 buffer_offset; while (utm->cut_through_session_index == ~0) @@ -444,31 +443,42 @@ cut_through_thread_fn (void *arg) vec_validate (my_copy_buffer, 64 * 1024 - 1); - while (true) + while (1) { - /* We read from the tx fifo and write to the rx fifo */ do { - actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, - vec_len (my_copy_buffer), - my_copy_buffer); + /* We read from the tx fifo and write to the rx fifo */ + if (utm->have_return || do_dequeue) + actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, + vec_len + (my_copy_buffer), + my_copy_buffer); + else + { + /* We don't do anything with the data, drop it */ + actual_transfer = svm_fifo_max_dequeue (rx_fifo); + svm_fifo_dequeue_drop (rx_fifo, actual_transfer); + } } while (actual_transfer <= 0); server_bytes_received += actual_transfer; - buffer_offset = 0; - while (actual_transfer > 0) + if (utm->have_return) { - rv = svm_fifo_enqueue_nowait (tx_fifo, actual_transfer, - my_copy_buffer + buffer_offset); - if (rv > 0) + buffer_offset = 0; + while (actual_transfer > 0) { - actual_transfer -= rv; - buffer_offset += rv; - server_bytes_sent += rv; - } + rv = svm_fifo_enqueue_nowait (tx_fifo, actual_transfer, + my_copy_buffer + buffer_offset); + if (rv > 0) + { + actual_transfer -= rv; + buffer_offset += rv; + server_bytes_sent += rv; + } + } } if (PREDICT_FALSE (utm->time_to_stop)) break; @@ -500,18 +510,21 @@ session_accepted_handler (session_accepted_msg_t * mp) pool_get (utm->sessions, session); memset (session, 0, sizeof (*session)); session_index = session - utm->sessions; + session->session_index = session_index; /* Cut-through case */ if (mp->server_event_queue_address) { clib_warning ("cut-through session"); - utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address, - svm_msg_q_t *); + session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address, + svm_msg_q_t *); + sleep (1); rx_fifo->master_session_index = session_index; tx_fifo->master_session_index = session_index; utm->cut_through_session_index = session_index; session->rx_fifo = rx_fifo; session->tx_fifo = tx_fifo; + session->is_dgram = 0; rv = pthread_create (&utm->cut_through_thread_handle, NULL /*attr */ , cut_through_thread_fn, 0); @@ -520,7 +533,6 @@ session_accepted_handler (session_accepted_msg_t * mp) clib_warning ("pthread_create returned %d", rv); rv = VNET_API_ERROR_SYSCALL_ERROR_1; } - utm->do_echo = 1; } else { @@ -543,13 +555,13 @@ session_accepted_handler (session_accepted_msg_t * mp) (f64) pool_elts (utm->sessions) / (now - start_time)); } - app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, + app_alloc_ctrl_evt_to_vpp (utm->vpp_event_queue, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY); rmp = (session_accepted_reply_msg_t *) app_evt->evt->data; rmp->handle = mp->handle; rmp->context = mp->context; rmp->retval = rv; - app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); + app_send_ctrl_evt_to_vpp (utm->vpp_event_queue, app_evt); CLIB_MEMORY_BARRIER (); utm->state = STATE_READY; @@ -571,6 +583,7 @@ session_disconnected_handler (session_disconnected_msg_t * mp) { session = pool_elt_at_index (utm->sessions, p[0]); hash_unset (utm->session_index_by_vpp_handles, mp->handle); + clib_warning ("disconnecting %u", session->session_index); pool_put (utm->sessions, session); } else @@ -607,27 +620,32 @@ session_connected_handler (session_connected_msg_t * mp) ASSERT (mp->server_rx_fifo && mp->server_tx_fifo); pool_get (utm->sessions, session); + session->session_index = session - utm->sessions; session->rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); - session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, - svm_msg_q_t *); + /* Cut-through case */ if (mp->client_event_queue_address) { clib_warning ("cut-through session"); - utm->cut_through_session_index = session - utm->sessions; - utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address, - svm_msg_q_t *); - utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address, - svm_msg_q_t *); - utm->do_echo = 1; + session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address, + svm_msg_q_t *); + utm->ct_event_queue = uword_to_pointer (mp->client_event_queue_address, + svm_msg_q_t *); + utm->cut_through_session_index = session->session_index; + session->is_dgram = 0; + sleep (1); + session->rx_fifo->client_session_index = session->session_index; + session->tx_fifo->client_session_index = session->session_index; } else { - utm->connected_session = session - utm->sessions; + utm->connected_session = session->session_index; utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); + session->rx_fifo->client_session_index = session->session_index; + session->tx_fifo->client_session_index = session->session_index; clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip, sizeof (ip46_address_t)); session->transport.is_ip4 = mp->is_ip4; @@ -650,11 +668,49 @@ session_connected_handler (session_connected_msg_t * mp) utm->state = STATE_READY; } +static void +session_bound_handler (session_bound_msg_t * mp) +{ + udp_echo_main_t *utm = &udp_echo_main; + svm_fifo_t *rx_fifo, *tx_fifo; + app_session_t *session; + u32 session_index; + + if (mp->retval) + { + clib_warning ("bind failed: %d", mp->retval); + utm->state = STATE_FAILED; + return; + } + + rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *); + tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *); + + pool_get (utm->sessions, session); + memset (session, 0, sizeof (*session)); + session_index = session - utm->sessions; + + rx_fifo->client_session_index = session_index; + tx_fifo->client_session_index = session_index; + session->rx_fifo = rx_fifo; + session->tx_fifo = tx_fifo; + clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip, + sizeof (ip46_address_t)); + session->transport.is_ip4 = mp->lcl_is_ip4; + session->transport.lcl_port = mp->lcl_port; + session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *); + + utm->state = utm->is_connected ? STATE_BOUND : STATE_READY; +} + static void handle_mq_event (session_event_t * e) { switch (e->event_type) { + case SESSION_CTRL_EVT_BOUND: + session_bound_handler ((session_bound_msg_t *) e->data); + break; case SESSION_CTRL_EVT_ACCEPTED: session_accepted_handler ((session_accepted_msg_t *) e->data); break; @@ -683,135 +739,37 @@ udp_client_send_connect (udp_echo_main_t * utm) vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & cmp); } -static void -client_send_cut_through (udp_echo_main_t * utm, app_session_t * session) -{ - int i; - u8 *test_data = 0; - u64 bytes_received = 0, bytes_sent = 0; - i32 bytes_to_read; - int rv; - f64 before, after, delta, bytes_per_second; - svm_fifo_t *rx_fifo, *tx_fifo; - int buffer_offset, bytes_to_send = 0; - - /* - * Prepare test data - */ - vec_validate (test_data, 64 * 1024 - 1); - for (i = 0; i < vec_len (test_data); i++) - test_data[i] = i & 0xff; - - rx_fifo = session->rx_fifo; - tx_fifo = session->tx_fifo; - - before = clib_time_now (&utm->clib_time); - - vec_validate (utm->rx_buf, vec_len (test_data) - 1); - - for (i = 0; i < NITER; i++) - { - bytes_to_send = vec_len (test_data); - buffer_offset = 0; - while (bytes_to_send > 0) - { - rv = svm_fifo_enqueue_nowait (tx_fifo, bytes_to_send, - test_data + buffer_offset); - - if (rv > 0) - { - bytes_to_send -= rv; - buffer_offset += rv; - bytes_sent += rv; - } - } - - bytes_to_read = svm_fifo_max_dequeue (rx_fifo); - bytes_to_read = vec_len (utm->rx_buf) > bytes_to_read ? - bytes_to_read : vec_len (utm->rx_buf); - - buffer_offset = 0; - while (bytes_to_read > 0) - { - rv = svm_fifo_dequeue_nowait (rx_fifo, - bytes_to_read, - utm->rx_buf + buffer_offset); - if (rv > 0) - { - bytes_to_read -= rv; - buffer_offset += rv; - bytes_received += rv; - } - } - } - while (bytes_received < bytes_sent) - { - rv = - svm_fifo_dequeue_nowait (rx_fifo, vec_len (utm->rx_buf), utm->rx_buf); - if (rv > 0) - { -#if CLIB_DEBUG > 0 - int j; - for (j = 0; j < rv; j++) - { - if (utm->rx_buf[j] != ((bytes_received + j) & 0xff)) - { - clib_warning ("error at byte %lld, 0x%x not 0x%x", - bytes_received + j, - utm->rx_buf[j], - ((bytes_received + j) & 0xff)); - } - } -#endif - bytes_received += (u64) rv; - } - } - - after = clib_time_now (&utm->clib_time); - delta = after - before; - bytes_per_second = 0.0; - - if (delta > 0.0) - bytes_per_second = (f64) bytes_received / delta; - - fformat (stdout, - "Done: %lld recv bytes in %.2f seconds, %.2f bytes/sec...\n\n", - bytes_received, delta, bytes_per_second); - fformat (stdout, - "Done: %lld sent bytes in %.2f seconds, %.2f bytes/sec...\n\n", - bytes_sent, delta, bytes_per_second); - fformat (stdout, - "client -> server -> client round trip: %.2f Gbit/sec \n\n", - (bytes_per_second * 8.0) / 1e9); -} - static void send_test_chunk (udp_echo_main_t * utm, app_session_t * s, u32 bytes) { + u64 test_buf_len, bytes_this_chunk, test_buf_offset; + u8 *test_data = utm->connect_test_data; - int test_buf_offset = 0; - u64 bytes_sent = 0; u32 bytes_to_snd, enq_space, min_chunk; - int rv; - - min_chunk = clib_min (65536, s->tx_fifo->nitems); - bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes; - if (bytes_to_snd > vec_len (test_data)) - bytes_to_snd = vec_len (test_data); + session_evt_type_t et = FIFO_EVENT_APP_TX; + int written; + + test_buf_len = vec_len (test_data); + test_buf_offset = utm->bytes_sent % test_buf_len; + bytes_this_chunk = clib_min (test_buf_len - test_buf_offset, + utm->bytes_to_send); + enq_space = svm_fifo_max_enqueue (s->tx_fifo); + bytes_this_chunk = clib_min (bytes_this_chunk, enq_space); + et += (s->session_index == utm->cut_through_session_index); + + if (s->is_dgram) + written = app_send_dgram_raw (s->tx_fifo, &s->transport, s->vpp_evt_q, + test_data + test_buf_offset, + bytes_this_chunk, et, SVM_Q_WAIT); + else + written = app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, + test_data + test_buf_offset, + bytes_this_chunk, et, SVM_Q_WAIT); - while (bytes_to_snd > 0 && !utm->time_to_stop) + if (written > 0) { - enq_space = svm_fifo_max_enqueue (s->tx_fifo); - if (enq_space < clib_min (bytes_to_snd, min_chunk)) - continue; - - rv = app_send (s, test_data + test_buf_offset, bytes_to_snd, 0); - if (rv > 0) - { - bytes_to_snd -= rv; - test_buf_offset += rv; - bytes_sent += rv; - } + utm->bytes_to_send -= written; + utm->bytes_sent += written; } } @@ -822,30 +780,27 @@ recv_test_chunk (udp_echo_main_t * utm, app_session_t * s) } void -client_send_data (udp_echo_main_t * utm) +client_send_data (udp_echo_main_t * utm, u32 session_index) { f64 start_time, end_time, delta; app_session_t *session; char *transfer_type; - u32 n_iterations; u8 *test_data; int i; - vec_validate (utm->connect_test_data, 1024 * 1024 - 1); + vec_validate_aligned (utm->connect_test_data, 1024 * 1024 - 1, + CLIB_CACHE_LINE_BYTES); for (i = 0; i < vec_len (utm->connect_test_data); i++) utm->connect_test_data[i] = i & 0xff; test_data = utm->connect_test_data; - session = pool_elt_at_index (utm->sessions, utm->connected_session); + session = pool_elt_at_index (utm->sessions, session_index); ASSERT (vec_len (test_data) > 0); + utm->total_to_send = utm->bytes_to_send; vec_validate (utm->rx_buf, vec_len (test_data) - 1); - n_iterations = utm->bytes_to_send / vec_len (test_data); - if (!n_iterations) - n_iterations = 1; - start_time = clib_time_now (&utm->clib_time); - for (i = 0; i < n_iterations; i++) + while (!utm->time_to_stop && utm->bytes_to_send) { send_test_chunk (utm, session, 0); if (utm->have_return) @@ -865,12 +820,12 @@ client_send_data (udp_echo_main_t * utm) delta = end_time - start_time; transfer_type = utm->have_return ? "full-duplex" : "half-duplex"; clib_warning ("%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds", - utm->bytes_to_send, utm->bytes_to_send / (1ULL << 20), - utm->bytes_to_send / (1ULL << 30), delta); - clib_warning ("%.2f bytes/second %s", ((f64) utm->bytes_to_send) / (delta), + utm->total_to_send, utm->total_to_send / (1ULL << 20), + utm->total_to_send / (1ULL << 30), delta); + clib_warning ("%.2f bytes/second %s", ((f64) utm->total_to_send) / (delta), transfer_type); clib_warning ("%.4f gbit/second %s", - (((f64) utm->bytes_to_send * 8.0) / delta / 1e9), + (((f64) utm->total_to_send * 8.0) / delta / 1e9), transfer_type); } @@ -900,28 +855,21 @@ client_test (udp_echo_main_t * utm) udp_client_send_connect (utm); start_time = clib_time_now (&utm->clib_time); - while (pool_elts (utm->sessions) != 1 - && clib_time_now (&utm->clib_time) - start_time < timeout - && utm->state != STATE_FAILED) - + while (pool_elts (utm->sessions) != 1 && utm->state != STATE_FAILED) { svm_msg_q_sub (utm->our_event_queue, &msg, SVM_Q_WAIT, 0); e = svm_msg_q_msg_data (utm->our_event_queue, &msg); handle_mq_event (e); svm_msg_q_free_msg (utm->our_event_queue, &msg); + + if (clib_time_now (&utm->clib_time) - start_time >= timeout) + break; } if (utm->cut_through_session_index != ~0) - { - session = pool_elt_at_index (utm->sessions, - utm->cut_through_session_index); - client_send_cut_through (utm, session); - } + client_send_data (utm, utm->cut_through_session_index); else - { - session = pool_elt_at_index (utm->sessions, utm->connected_session); - client_send_data (utm); - } + client_send_data (utm, utm->connected_session); application_detach (utm); wait_for_state_change (utm, STATE_DETACHED); @@ -1024,13 +972,21 @@ vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp) utm->state = STATE_START; } -#define foreach_tcp_echo_msg \ -_(BIND_URI_REPLY, bind_uri_reply) \ -_(UNBIND_URI_REPLY, unbind_uri_reply) \ -_(MAP_ANOTHER_SEGMENT, map_another_segment) \ -_(UNMAP_SEGMENT, unmap_segment) \ -_(APPLICATION_ATTACH_REPLY, application_attach_reply) \ -_(APPLICATION_DETACH_REPLY, application_detach_reply) \ +static void + vl_api_app_cut_through_registration_add_t_handler + (vl_api_app_cut_through_registration_add_t * mp) +{ + +} + +#define foreach_tcp_echo_msg \ +_(BIND_URI_REPLY, bind_uri_reply) \ +_(UNBIND_URI_REPLY, unbind_uri_reply) \ +_(MAP_ANOTHER_SEGMENT, map_another_segment) \ +_(UNMAP_SEGMENT, unmap_segment) \ +_(APPLICATION_ATTACH_REPLY, application_attach_reply) \ +_(APPLICATION_DETACH_REPLY, application_detach_reply) \ +_(APP_CUT_THROUGH_REGISTRATION_ADD, app_cut_through_registration_add) \ void tcp_echo_api_hookup (udp_echo_main_t * utm) @@ -1081,52 +1037,107 @@ init_error_string_table (udp_echo_main_t * utm) } void -server_handle_fifo_event_rx (udp_echo_main_t * utm, session_event_t * e) +server_handle_fifo_event_rx (udp_echo_main_t * utm, u32 session_index) { - app_session_t *s; + svm_fifo_t *rx_fifo, *tx_fifo; + int n_read; + app_session_t *session; int rv; + u32 max_dequeue, offset, max_transfer, rx_buf_len; + session_evt_type_t et = FIFO_EVENT_APP_TX; - s = pool_elt_at_index (utm->sessions, e->fifo->client_session_index); - app_recv (s, utm->rx_buf, vec_len (utm->rx_buf)); + session = pool_elt_at_index (utm->sessions, session_index); + rx_buf_len = vec_len (utm->rx_buf); + rx_fifo = session->rx_fifo; + tx_fifo = session->tx_fifo; + + et += (session->session_index == utm->cut_through_session_index); + + max_dequeue = svm_fifo_max_dequeue (rx_fifo); + /* Allow enqueuing of a new event */ + svm_fifo_unset_event (rx_fifo); - if (utm->do_echo) + if (PREDICT_FALSE (!max_dequeue)) + return; + + /* Read the max_dequeue */ + do { - do + max_transfer = clib_min (rx_buf_len, max_dequeue); + if (session->is_dgram) + n_read = app_recv_dgram_raw (rx_fifo, utm->rx_buf, max_transfer, + &session->transport, 0, 0); + else + n_read = app_recv_stream_raw (rx_fifo, utm->rx_buf, max_transfer, 0, + 0); + + if (n_read > 0) + max_dequeue -= n_read; + + /* Reflect if a non-drop session */ + if (utm->have_return && n_read > 0) { - rv = app_send_stream (s, utm->rx_buf, vec_len (utm->rx_buf), 0); + offset = 0; + do + { + if (session->is_dgram) + rv = app_send_dgram_raw (tx_fifo, &session->transport, + session->vpp_evt_q, + &utm->rx_buf[offset], n_read, et, + SVM_Q_WAIT); + else + rv = app_send_stream_raw (tx_fifo, session->vpp_evt_q, + &utm->rx_buf[offset], n_read, et, + SVM_Q_WAIT); + if (rv > 0) + { + n_read -= rv; + offset += rv; + } + } + while ((rv <= 0 || n_read > 0) && !utm->time_to_stop); + + /* If event wasn't set, add one */ + if (svm_fifo_set_event (tx_fifo)) + app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo, + et, SVM_Q_WAIT); } - while (rv == SVM_FIFO_FULL); } + while ((n_read < 0 || max_dequeue > 0) && !utm->time_to_stop); } -void +static void server_handle_event_queue (udp_echo_main_t * utm) { session_event_t *e; svm_msg_q_msg_t msg; + svm_msg_q_t *mq = utm->our_event_queue; + int i; while (utm->state != STATE_READY) sleep (5); while (1) { - if (svm_msg_q_sub (utm->our_event_queue, &msg, SVM_Q_WAIT, 0)) + if (svm_msg_q_sub (mq, &msg, SVM_Q_WAIT, 0)) { clib_warning ("svm msg q returned"); continue; } - e = svm_msg_q_msg_data (utm->our_event_queue, &msg); + e = svm_msg_q_msg_data (mq, &msg); switch (e->event_type) { case FIFO_EVENT_APP_RX: - server_handle_fifo_event_rx (utm, e); + server_handle_fifo_event_rx (utm, e->fifo->client_session_index); + break; + case SESSION_IO_EVT_CT_TX: break; default: handle_mq_event (e); break; } - svm_msg_q_free_msg (utm->our_event_queue, &msg); + svm_msg_q_free_msg (mq, &msg); if (PREDICT_FALSE (utm->time_to_stop == 1)) return; if (PREDICT_FALSE (utm->time_to_print_stats == 1)) @@ -1214,7 +1225,7 @@ main (int argc, char **argv) svm_fifo_segment_main_init (0x200000000ULL, 20); - vec_validate (utm->rx_buf, 8192); + vec_validate (utm->rx_buf, 128 << 10); utm->session_index_by_vpp_handles = hash_create (0, sizeof (uword)); utm->my_pid = getpid (); utm->configured_segment_size = 1 << 20; diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 260ae588209..61909660ad6 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -872,10 +872,10 @@ app_worker_get_connect_segment_manager (app_worker_t * app) segment_manager_t * app_worker_get_listen_segment_manager (app_worker_t * app, - stream_session_t * s) + stream_session_t * listener) { uword *smp; - smp = hash_get (app->listeners_table, listen_session_get_handle (s)); + smp = hash_get (app->listeners_table, listen_session_get_handle (listener)); ASSERT (smp != 0); return segment_manager_get (*smp); } @@ -1721,21 +1721,28 @@ application_local_session_cleanup (app_worker_t * client_wrk, local_session_t * ls) { svm_fifo_segment_private_t *seg; + stream_session_t *listener; segment_manager_t *sm; uword client_key; u8 has_transport; - has_transport = session_has_transport ((stream_session_t *) ls); - client_key = application_client_local_connect_key (ls); + /* Retrieve listener transport type as it is the one that decides where + * the fifos are allocated */ + has_transport = application_local_session_listener_has_transport (ls); if (!has_transport) sm = application_get_local_segment_manager_w_session (server_wrk, ls); else - sm = app_worker_get_listen_segment_manager (server_wrk, - (stream_session_t *) ls); + { + listener = listen_session_get (ls->listener_index); + sm = app_worker_get_listen_segment_manager (server_wrk, listener); + } seg = segment_manager_get_segment (sm, ls->svm_segment_index); if (client_wrk) - hash_unset (client_wrk->local_connects, client_key); + { + client_key = application_client_local_connect_key (ls); + hash_unset (client_wrk->local_connects, client_key); + } if (!has_transport) { diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index e83b7a6ba6e..e2f2279d77f 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -231,7 +231,7 @@ segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *, stream_session_t *); segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *); int app_worker_alloc_connects_segment_manager (app_worker_t * app); -int app_worker_add_segment_notify (u32 app_wrk_index, ssvm_private_t * fs); +int app_worker_add_segment_notify (u32 app_or_wrk, ssvm_private_t * fs); u32 app_worker_n_listeners (app_worker_t * app); stream_session_t *app_worker_first_listener (app_worker_t * app, u8 fib_proto, @@ -245,7 +245,7 @@ clib_error_t *vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a); int application_start_listen (application_t * app, session_endpoint_extended_t * tep, session_handle_t * handle); -int application_stop_listen (u32 app_index, u32 app_wrk_index, +int application_stop_listen (u32 app_index, u32 app_or_wrk, session_handle_t handle); application_t *application_alloc (void); @@ -268,7 +268,7 @@ int application_is_builtin (application_t * app); int application_is_builtin_proxy (application_t * app); u32 application_session_table (application_t * app, u8 fib_proto); u32 application_local_session_table (application_t * app); -u8 *application_name_from_index (u32 app_wrk_index); +u8 *application_name_from_index (u32 app_or_wrk); u8 application_has_local_scope (application_t * app); u8 application_has_global_scope (application_t * app); u8 application_use_mq_for_ctrl (application_t * app); @@ -297,15 +297,15 @@ local_session_t int application_start_local_listen (application_t * server, session_endpoint_extended_t * sep, session_handle_t * handle); -int application_stop_local_listen (u32 app_index, u32 app_wrk_index, +int application_stop_local_listen (u32 app_index, u32 app_or_wrk, session_handle_t lh); int application_local_session_connect (app_worker_t * client, app_worker_t * server, local_session_t * ls, u32 opaque); int application_local_session_connect_notify (local_session_t * ls); -int application_local_session_disconnect (u32 app_wrk_index, +int application_local_session_disconnect (u32 app_or_wrk, local_session_t * ls); -int application_local_session_disconnect_w_index (u32 app_wrk_index, +int application_local_session_disconnect_w_index (u32 app_or_wrk, u32 ls_index); void app_worker_local_sessions_free (app_worker_t * app); @@ -330,20 +330,20 @@ local_session_id (local_session_t * ls) } always_inline void -local_session_parse_id (u32 ls_id, u32 * app_wrk_index, u32 * session_index) +local_session_parse_id (u32 ls_id, u32 * app_or_wrk, u32 * session_index) { - *app_wrk_index = ls_id >> 16; - *session_index = ls_id & 0xFFF; + *app_or_wrk = ls_id >> 16; + *session_index = ls_id & 0xFF; } always_inline void -local_session_parse_handle (session_handle_t handle, u32 * server_index, +local_session_parse_handle (session_handle_t handle, u32 * app_or_wrk_index, u32 * session_index) { u32 bottom; ASSERT ((handle >> 32) == SESSION_LOCAL_HANDLE_PREFIX); bottom = (handle & 0xFFFFFFFF); - local_session_parse_id (bottom, server_index, session_index); + local_session_parse_id (bottom, app_or_wrk_index, session_index); } always_inline session_handle_t @@ -377,7 +377,7 @@ application_local_session_listener_has_transport (local_session_t * ls) return (tp != TRANSPORT_PROTO_NONE); } -void mq_send_local_session_disconnected_cb (u32 app_wrk_index, +void mq_send_local_session_disconnected_cb (u32 app_or_wrk, local_session_t * ls); uword unformat_application_proto (unformat_input_t * input, va_list * args); -- 2.16.6