X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=606f71739bd91936853b211a2748b796e203b438;hb=a27a46eaebee7b1d84a6ce998d9c92048b0654b6;hp=4cf0f9e7e494fc14c3ae58c9293783528d178b49;hpb=d9818dd68c162079f3ddb5443a78d0d91d55d0fe;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 4cf0f9e7e49..606f71739bd 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017 Cisco and/or its affiliates. + * Copyright (c) 2017-2019 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -20,12 +20,10 @@ #include #include #include -#include #include #include session_manager_main_t session_manager_main; -extern transport_proto_vft_t *tp_vfts; static inline int session_send_evt_to_thread (void *data, void *args, u32 thread_index, @@ -72,7 +70,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, break; case FIFO_EVENT_BUILTIN_TX: case FIFO_EVENT_DISCONNECT: - evt->session_handle = session_handle ((stream_session_t *) data); + evt->session_handle = session_handle ((session_t *) data); break; default: clib_warning ("evt unhandled!"); @@ -98,8 +96,7 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index, } int -session_send_ctrl_evt_to_thread (stream_session_t * s, - session_evt_type_t evt_type) +session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type) { /* only event supported for now is disconnect */ ASSERT (evt_type == FIFO_EVENT_DISCONNECT); @@ -120,7 +117,7 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) } static void -session_program_transport_close (stream_session_t * s) +session_program_transport_close (session_t * s) { u32 thread_index = vlib_get_thread_index (); session_manager_worker_t *wrk; @@ -140,11 +137,11 @@ session_program_transport_close (stream_session_t * s) session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT); } -stream_session_t * +session_t * session_alloc (u32 thread_index) { session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index]; - stream_session_t *s; + session_t *s; u8 will_expand = 0; pool_get_aligned_will_expand (wrk->sessions, will_expand, CLIB_CACHE_LINE_BYTES); @@ -166,7 +163,7 @@ session_alloc (u32 thread_index) } void -session_free (stream_session_t * s) +session_free (session_t * s) { pool_put (session_manager_main.wrk[s->thread_index].sessions, s); if (CLIB_DEBUG) @@ -174,10 +171,10 @@ session_free (stream_session_t * s) } void -session_free_w_fifos (stream_session_t * s) +session_free_w_fifos (session_t * s) { - segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo, - s->server_tx_fifo); + segment_manager_dealloc_fifos (s->svm_segment_index, s->rx_fifo, + s->tx_fifo); session_free (s); } @@ -187,7 +184,7 @@ session_free_w_fifos (stream_session_t * s) * Transport connection must still be valid. */ static void -session_delete (stream_session_t * s) +session_delete (session_t * s) { int rv; @@ -198,34 +195,10 @@ session_delete (stream_session_t * s) session_free_w_fifos (s); } -int -session_alloc_fifos (segment_manager_t * sm, stream_session_t * s) -{ - svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0; - u32 fifo_segment_index; - int rv; - - if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo, - &server_tx_fifo, - &fifo_segment_index))) - return rv; - /* Initialize backpointers */ - server_rx_fifo->master_session_index = s->session_index; - server_rx_fifo->master_thread_index = s->thread_index; - - server_tx_fifo->master_session_index = s->session_index; - server_tx_fifo->master_thread_index = s->thread_index; - - s->server_rx_fifo = server_rx_fifo; - s->server_tx_fifo = server_tx_fifo; - s->svm_segment_index = fifo_segment_index; - return 0; -} - -static stream_session_t * +static session_t * session_alloc_for_connection (transport_connection_t * tc) { - stream_session_t *s; + session_t *s; u32 thread_index = tc->thread_index; ASSERT (thread_index == vlib_get_thread_index () @@ -242,28 +215,6 @@ session_alloc_for_connection (transport_connection_t * tc) return s; } -static int -session_alloc_and_init (segment_manager_t * sm, transport_connection_t * tc, - u8 alloc_fifos, stream_session_t ** ret_s) -{ - stream_session_t *s; - int rv; - - s = session_alloc_for_connection (tc); - if (alloc_fifos && (rv = session_alloc_fifos (sm, s))) - { - session_free (s); - *ret_s = 0; - return rv; - } - - /* Add to the main lookup table */ - session_lookup_add_connection (tc, session_handle (s)); - - *ret_s = s; - return 0; -} - /** * Discards bytes from buffer chain * @@ -301,7 +252,7 @@ session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b, * Enqueue buffer chain tail */ always_inline int -session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b, +session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b, u32 offset, u8 is_in_order) { vlib_buffer_t *chain_b; @@ -332,7 +283,7 @@ session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b, continue; if (is_in_order) { - rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data); + rv = svm_fifo_enqueue_nowait (s->rx_fifo, len, data); if (rv == len) { written += rv; @@ -355,8 +306,7 @@ session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b, } else { - rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len, - data); + rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data); if (rv) { clib_warning ("failed to enqueue multi-buffer seg"); @@ -393,14 +343,14 @@ session_enqueue_stream_connection (transport_connection_t * tc, vlib_buffer_t * b, u32 offset, u8 queue_event, u8 is_in_order) { - stream_session_t *s; + session_t *s; int enqueued = 0, rv, in_order_off; s = session_get (tc->s_index, tc->thread_index); if (is_in_order) { - enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, + enqueued = svm_fifo_enqueue_nowait (s->rx_fifo, b->current_length, vlib_buffer_get_current (b)); if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) @@ -414,7 +364,7 @@ session_enqueue_stream_connection (transport_connection_t * tc, } else { - rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, + rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, b->current_length, vlib_buffer_get_current (b)); if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv)) @@ -442,18 +392,18 @@ session_enqueue_stream_connection (transport_connection_t * tc, } int -session_enqueue_dgram_connection (stream_session_t * s, +session_enqueue_dgram_connection (session_t * s, session_dgram_hdr_t * hdr, vlib_buffer_t * b, u8 proto, u8 queue_event) { int enqueued = 0, rv, in_order_off; - ASSERT (svm_fifo_max_enqueue (s->server_rx_fifo) + ASSERT (svm_fifo_max_enqueue (s->rx_fifo) >= b->current_length + sizeof (*hdr)); - svm_fifo_enqueue_nowait (s->server_rx_fifo, sizeof (session_dgram_hdr_t), + svm_fifo_enqueue_nowait (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr); - enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length, + enqueued = svm_fifo_enqueue_nowait (s->rx_fifo, b->current_length, vlib_buffer_get_current (b)); if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0)) { @@ -483,12 +433,12 @@ u8 stream_session_no_space (transport_connection_t * tc, u32 thread_index, u16 data_len) { - stream_session_t *s = session_get (tc->s_index, thread_index); + session_t *s = session_get (tc->s_index, thread_index); if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY)) return 1; - if (data_len > svm_fifo_max_enqueue (s->server_rx_fifo)) + if (data_len > svm_fifo_max_enqueue (s->rx_fifo)) return 1; return 0; @@ -497,25 +447,49 @@ stream_session_no_space (transport_connection_t * tc, u32 thread_index, u32 session_tx_fifo_max_dequeue (transport_connection_t * tc) { - stream_session_t *s = session_get (tc->s_index, tc->thread_index); - if (!s->server_tx_fifo) + session_t *s = session_get (tc->s_index, tc->thread_index); + if (!s->tx_fifo) return 0; - return svm_fifo_max_dequeue (s->server_tx_fifo); + return svm_fifo_max_dequeue (s->tx_fifo); } int stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer, u32 offset, u32 max_bytes) { - stream_session_t *s = session_get (tc->s_index, tc->thread_index); - return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer); + session_t *s = session_get (tc->s_index, tc->thread_index); + return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer); } u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) { - stream_session_t *s = session_get (tc->s_index, tc->thread_index); - return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes); + session_t *s = session_get (tc->s_index, tc->thread_index); + return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes); +} + +static inline int +session_notify_subscribers (u32 app_index, session_t * s, + svm_fifo_t * f, session_evt_type_t evt_type) +{ + app_worker_t *app_wrk; + application_t *app; + int i; + + app = application_get (app_index); + if (!app) + return -1; + + for (i = 0; i < f->n_subscribers; i++) + { + app_wrk = application_get_worker (app, f->subscribers[i]); + if (!app_wrk) + continue; + if (app_worker_lock_and_send_event (app_wrk, s, evt_type)) + return -1; + } + + return 0; } /** @@ -527,12 +501,12 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) * @return 0 on success or negative number if failed to send notification. */ static inline int -session_enqueue_notify (stream_session_t * s) +session_enqueue_notify (session_t * s) { - app_worker_t *app; + app_worker_t *app_wrk; - app = app_worker_get_if_valid (s->app_wrk_index); - if (PREDICT_FALSE (!app)) + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (PREDICT_FALSE (!app_wrk)) { SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index); return 0; @@ -541,23 +515,41 @@ session_enqueue_notify (stream_session_t * s) /* *INDENT-OFF* */ SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({ ed->data[0] = FIFO_EVENT_APP_RX; - ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo); + ed->data[1] = svm_fifo_max_dequeue (s->rx_fifo); })); /* *INDENT-ON* */ - return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); + if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, + FIFO_EVENT_APP_RX))) + return -1; + + if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo))) + return session_notify_subscribers (app_wrk->app_index, s, + s->rx_fifo, FIFO_EVENT_APP_RX); + + return 0; } int -session_dequeue_notify (stream_session_t * s) +session_dequeue_notify (session_t * s) { - app_worker_t *app; + app_worker_t *app_wrk; + + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (PREDICT_FALSE (!app_wrk)) + return -1; - app = app_worker_get_if_valid (s->app_wrk_index); - if (PREDICT_FALSE (!app)) + if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, + FIFO_EVENT_APP_TX))) return -1; - return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_TX); + if (PREDICT_FALSE (s->tx_fifo->n_subscribers)) + return session_notify_subscribers (app_wrk->app_index, s, + s->tx_fifo, FIFO_EVENT_APP_TX); + + svm_fifo_clear_tx_ntf (s->tx_fifo); + + return 0; } /** @@ -572,7 +564,7 @@ int session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) { session_manager_worker_t *wrk = session_manager_get_worker (thread_index); - stream_session_t *s; + session_t *s; int i, errors = 0; u32 *indices; @@ -616,29 +608,25 @@ void stream_session_init_fifos_pointers (transport_connection_t * tc, u32 rx_pointer, u32 tx_pointer) { - stream_session_t *s; + session_t *s; s = session_get (tc->s_index, tc->thread_index); - svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer); - svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer); + svm_fifo_init_pointers (s->rx_fifo, rx_pointer); + svm_fifo_init_pointers (s->tx_fifo, tx_pointer); } int session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) { u32 opaque = 0, new_ti, new_si; - stream_session_t *new_s = 0; - segment_manager_t *sm; app_worker_t *app_wrk; - application_t *app; - u8 alloc_fifos; - int error = 0; - u64 handle; + session_t *s = 0; + u64 ho_handle; /* * Find connection handle and cleanup half-open table */ - handle = session_lookup_half_open_handle (tc); - if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE) + ho_handle = session_lookup_half_open_handle (tc); + if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE) { SESSION_DBG ("half-open was removed!"); return -1; @@ -648,56 +636,40 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) /* Get the app's index from the handle we stored when opening connection * and the opaque (api_context for external apps) from transport session * index */ - app_wrk = app_worker_get_if_valid (handle >> 32); + app_wrk = app_worker_get_if_valid (ho_handle >> 32); if (!app_wrk) return -1; + opaque = tc->s_index; - app = application_get (app_wrk->app_index); - /* - * Allocate new session with fifos (svm segments are allocated if needed) - */ - if (!is_fail) - { - sm = app_worker_get_connect_segment_manager (app_wrk); - alloc_fifos = !application_is_builtin_proxy (app); - if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s)) - { - is_fail = 1; - error = -1; - } - else - { - new_s->session_state = SESSION_STATE_CONNECTING; - new_s->app_wrk_index = app_wrk->wrk_index; - new_si = new_s->session_index; - new_ti = new_s->thread_index; - } - } + if (is_fail) + return app_worker_connect_notify (app_wrk, s, opaque); - /* - * Notify client application - */ - if (app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, - new_s, is_fail)) + s = session_alloc_for_connection (tc); + s->session_state = SESSION_STATE_CONNECTING; + s->app_wrk_index = app_wrk->wrk_index; + new_si = s->session_index; + new_ti = s->thread_index; + + if (app_worker_init_connected (app_wrk, s)) { - SESSION_DBG ("failed to notify app"); - if (!is_fail) - { - new_s = session_get (new_si, new_ti); - session_transport_close (new_s); - } + session_free (s); + app_worker_connect_notify (app_wrk, 0, opaque); + return -1; } - else + + if (app_worker_connect_notify (app_wrk, s, opaque)) { - if (!is_fail) - { - new_s = session_get (new_si, new_ti); - new_s->session_state = SESSION_STATE_READY; - } + s = session_get (new_si, new_ti); + session_free_w_fifos (s); + return -1; } - return error; + s = session_get (new_si, new_ti); + s->session_state = SESSION_STATE_READY; + session_lookup_add_connection (tc, session_handle (s)); + + return 0; } typedef struct _session_switch_pool_args @@ -712,14 +684,13 @@ static void session_switch_pool (void *cb_args) { session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args; - transport_proto_t tp; - stream_session_t *s; + session_t *s; ASSERT (args->thread_index == vlib_get_thread_index ()); s = session_get (args->session_index, args->thread_index); - s->server_tx_fifo->master_session_index = args->new_session_index; - s->server_tx_fifo->master_thread_index = args->new_thread_index; - tp = session_get_transport_proto (s); - tp_vfts[tp].cleanup (s->connection_index, s->thread_index); + s->tx_fifo->master_session_index = args->new_session_index; + s->tx_fifo->master_thread_index = args->new_thread_index; + transport_cleanup (session_get_transport_proto (s), s->connection_index, + s->thread_index); session_free (s); clib_mem_free (cb_args); } @@ -729,10 +700,9 @@ session_switch_pool (void *cb_args) */ int session_dgram_connect_notify (transport_connection_t * tc, - u32 old_thread_index, - stream_session_t ** new_session) + u32 old_thread_index, session_t ** new_session) { - stream_session_t *new_s; + session_t *new_s; session_switch_pool_args_t *rpc_args; /* @@ -740,8 +710,8 @@ session_dgram_connect_notify (transport_connection_t * tc, */ new_s = session_clone_safe (tc->s_index, old_thread_index); new_s->connection_index = tc->c_index; - new_s->server_rx_fifo->master_session_index = new_s->session_index; - new_s->server_rx_fifo->master_thread_index = new_s->thread_index; + new_s->rx_fifo->master_session_index = new_s->session_index; + new_s->rx_fifo->master_thread_index = new_s->thread_index; new_s->session_state = SESSION_STATE_READY; session_lookup_add_connection (tc, session_handle (new_s)); @@ -763,22 +733,6 @@ session_dgram_connect_notify (transport_connection_t * tc, return 0; } -int -stream_session_accept_notify (transport_connection_t * tc) -{ - app_worker_t *app_wrk; - application_t *app; - stream_session_t *s; - - s = session_get (tc->s_index, tc->thread_index); - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (!app_wrk) - return -1; - s->session_state = SESSION_STATE_ACCEPTING; - app = application_get (app_wrk->app_index); - return app->cb_fns.session_accept_callback (s); -} - /** * Notification from transport that connection is being closed. * @@ -791,7 +745,7 @@ session_transport_closing_notify (transport_connection_t * tc) { app_worker_t *app_wrk; application_t *app; - stream_session_t *s; + session_t *s; s = session_get (tc->s_index, tc->thread_index); if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) @@ -815,17 +769,23 @@ session_transport_closing_notify (transport_connection_t * tc) void session_transport_delete_notify (transport_connection_t * tc) { - stream_session_t *s; + session_t *s; /* App might've been removed already */ if (!(s = session_get_if_valid (tc->s_index, tc->thread_index))) return; /* Make sure we don't try to send anything more */ - svm_fifo_dequeue_drop_all (s->server_tx_fifo); + svm_fifo_dequeue_drop_all (s->tx_fifo); switch (s->session_state) { + case SESSION_STATE_CREATED: + /* Session was created but accept notification was not yet sent to the + * app. Cleanup everything. */ + session_lookup_del_session (s); + session_free_w_fifos (s); + break; case SESSION_STATE_ACCEPTING: case SESSION_STATE_TRANSPORT_CLOSING: /* If transport finishes or times out before we get a reply @@ -870,7 +830,7 @@ session_transport_delete_notify (transport_connection_t * tc) void session_transport_closed_notify (transport_connection_t * tc) { - stream_session_t *s; + session_t *s; if (!(s = session_get_if_valid (tc->s_index, tc->thread_index))) return; @@ -892,11 +852,11 @@ session_transport_closed_notify (transport_connection_t * tc) void session_transport_reset_notify (transport_connection_t * tc) { - stream_session_t *s; + session_t *s; app_worker_t *app_wrk; application_t *app; s = session_get (tc->s_index, tc->thread_index); - svm_fifo_dequeue_drop_all (s->server_tx_fifo); + svm_fifo_dequeue_drop_all (s->tx_fifo); if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) return; s->session_state = SESSION_STATE_TRANSPORT_CLOSING; @@ -905,34 +865,44 @@ session_transport_reset_notify (transport_connection_t * tc) app->cb_fns.session_reset_callback (s); } +int +session_stream_accept_notify (transport_connection_t * tc) +{ + app_worker_t *app_wrk; + session_t *s; + + s = session_get (tc->s_index, tc->thread_index); + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + return -1; + s->session_state = SESSION_STATE_ACCEPTING; + return app_worker_accept_notify (app_wrk, s); +} + /** * Accept a stream session. Optionally ping the server by callback. */ int -stream_session_accept (transport_connection_t * tc, u32 listener_index, +session_stream_accept (transport_connection_t * tc, u32 listener_index, u8 notify) { - stream_session_t *s, *listener; - app_worker_t *app_wrk; - segment_manager_t *sm; + session_t *s; int rv; - /* Find the server */ - listener = listen_session_get (listener_index); - app_wrk = application_listener_select_worker (listener, 0); + s = session_alloc_for_connection (tc); + s->listener_index = listener_index; + s->session_state = SESSION_STATE_CREATED; - sm = app_worker_get_listen_segment_manager (app_wrk, listener); - if ((rv = session_alloc_and_init (sm, tc, 1, &s))) + if ((rv = app_worker_init_accepted (s))) return rv; - s->app_wrk_index = app_wrk->wrk_index; - s->listener_index = listener_index; + session_lookup_add_connection (tc, session_handle (s)); /* Shoulder-tap the server */ if (notify) { - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.session_accept_callback (s); + app_worker_t *app_wrk = app_worker_get (s->app_wrk_index); + return app_worker_accept_notify (app_wrk, s); } return 0; @@ -943,37 +913,32 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; transport_endpoint_cfg_t *tep; - segment_manager_t *sm; app_worker_t *app_wrk; - stream_session_t *s; - application_t *app; + session_t *s; int rv; tep = session_endpoint_to_transport_cfg (rmt); - rv = tp_vfts[rmt->transport_proto].open (tep); + rv = transport_connect (rmt->transport_proto, tep); if (rv < 0) { SESSION_DBG ("Transport failed to open connection."); return VNET_API_ERROR_SESSION_CONNECT; } - tc = tp_vfts[rmt->transport_proto].get_half_open ((u32) rv); + tc = transport_get_half_open (rmt->transport_proto, (u32) rv); - /* For dgram type of service, allocate session and fifos now. - */ + /* For dgram type of service, allocate session and fifos now */ app_wrk = app_worker_get (app_wrk_index); - sm = app_worker_get_connect_segment_manager (app_wrk); - - if (session_alloc_and_init (sm, tc, 1, &s)) - return -1; + s = session_alloc_for_connection (tc); s->app_wrk_index = app_wrk->wrk_index; s->session_state = SESSION_STATE_OPENED; + if (app_worker_init_connected (app_wrk, s)) + { + session_free (s); + return -1; + } - /* Tell the app about the new event fifo for this session */ - app = application_get (app_wrk->app_index); - app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, s, 0); - - return 0; + return app_worker_connect_notify (app_wrk, s, opaque); } int @@ -985,14 +950,14 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) int rv; tep = session_endpoint_to_transport_cfg (rmt); - rv = tp_vfts[rmt->transport_proto].open (tep); + rv = transport_connect (rmt->transport_proto, tep); if (rv < 0) { SESSION_DBG ("Transport failed to open connection."); return VNET_API_ERROR_SESSION_CONNECT; } - tc = tp_vfts[rmt->transport_proto].get_half_open ((u32) rv); + tc = transport_get_half_open (rmt->transport_proto, (u32) rv); /* If transport offers a stream service, only allocate session once the * connection has been established. @@ -1020,7 +985,7 @@ session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) sep->app_wrk_index = app_wrk_index; sep->opaque = opaque; - return tp_vfts[rmt->transport_proto].open (tep_cfg); + return transport_connect (rmt->transport_proto, tep_cfg); } typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32); @@ -1049,7 +1014,8 @@ static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = { int session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { - transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type; + transport_service_type_t tst; + tst = transport_protocol_service_type (rmt->transport_proto); return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque); } @@ -1062,7 +1028,7 @@ session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) * @param sep Local endpoint to be listened on. */ int -session_listen (stream_session_t * ls, session_endpoint_cfg_t * sep) +session_listen (session_t * ls, session_endpoint_cfg_t * sep) { transport_connection_t *tc; transport_endpoint_t *tep; @@ -1071,7 +1037,7 @@ session_listen (stream_session_t * ls, session_endpoint_cfg_t * sep) /* Transport bind/listen */ tep = session_endpoint_to_transport (sep); s_index = ls->session_index; - tc_index = tp_vfts[sep->transport_proto].bind (s_index, tep); + tc_index = transport_start_listen (sep->transport_proto, s_index, tep); if (tc_index == (u32) ~ 0) return -1; @@ -1081,8 +1047,8 @@ session_listen (stream_session_t * ls, session_endpoint_cfg_t * sep) ls->connection_index = tc_index; /* Add to the main lookup table after transport was initialized */ - tc = tp_vfts[sep->transport_proto].get_listener (tc_index); - session_lookup_add_connection (tc, s_index); + tc = transport_get_listener (sep->transport_proto, tc_index); + session_lookup_add_connection (tc, listen_session_get_handle (ls)); return 0; } @@ -1092,25 +1058,20 @@ session_listen (stream_session_t * ls, session_endpoint_cfg_t * sep) * @param s Session to stop listening on. It must be in state LISTENING. */ int -session_stop_listen (stream_session_t * s) +session_stop_listen (session_t * s) { transport_proto_t tp = session_get_transport_proto (s); transport_connection_t *tc; + if (s->session_state != SESSION_STATE_LISTENING) - { - clib_warning ("not a listening session"); - return -1; - } + return -1; - tc = tp_vfts[tp].get_listener (s->connection_index); + tc = transport_get_listener (tp, s->connection_index); if (!tc) - { - clib_warning ("no transport"); - return VNET_API_ERROR_ADDRESS_NOT_IN_USE; - } + return VNET_API_ERROR_ADDRESS_NOT_IN_USE; session_lookup_del_connection (tc); - tp_vfts[tp].unbind (s->connection_index); + transport_stop_listen (tp, s->connection_index); return 0; } @@ -1121,7 +1082,7 @@ session_stop_listen (stream_session_t * s) * requests are served before transport is notified. */ void -session_close (stream_session_t * s) +session_close (session_t * s) { if (!s) return; @@ -1135,7 +1096,7 @@ session_close (stream_session_t * s) /* Session already closed. Clear the tx fifo */ if (s->session_state == SESSION_STATE_CLOSED) - svm_fifo_dequeue_drop_all (s->server_tx_fifo); + svm_fifo_dequeue_drop_all (s->tx_fifo); return; } @@ -1151,7 +1112,7 @@ session_close (stream_session_t * s) * Must be called from the session's thread. */ void -session_transport_close (stream_session_t * s) +session_transport_close (session_t * s) { /* If transport is already closed, just free the session */ if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED) @@ -1166,13 +1127,13 @@ session_transport_close (stream_session_t * s) * point, either after sending everything or after a timeout, call delete * notify. This will finally lead to the complete cleanup of the session. */ - if (svm_fifo_max_dequeue (s->server_tx_fifo)) + if (svm_fifo_max_dequeue (s->tx_fifo)) s->session_state = SESSION_STATE_CLOSED_WAITING; else s->session_state = SESSION_STATE_CLOSED; - tp_vfts[session_get_transport_proto (s)].close (s->connection_index, - s->thread_index); + transport_close (session_get_transport_proto (s), s->connection_index, + s->thread_index); } /** @@ -1183,41 +1144,19 @@ session_transport_close (stream_session_t * s) * closed. */ void -session_transport_cleanup (stream_session_t * s) +session_transport_cleanup (session_t * s) { s->session_state = SESSION_STATE_CLOSED; /* Delete from main lookup table before we axe the the transport */ session_lookup_del_session (s); - tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index, - s->thread_index); + transport_cleanup (session_get_transport_proto (s), s->connection_index, + s->thread_index); /* Since we called cleanup, no delete notification will come. So, make * sure the session is properly freed. */ session_free_w_fifos (s); } -transport_service_type_t -session_transport_service_type (stream_session_t * s) -{ - transport_proto_t tp; - tp = session_get_transport_proto (s); - return transport_protocol_service_type (tp); -} - -transport_tx_fn_type_t -session_transport_tx_fn_type (stream_session_t * s) -{ - transport_proto_t tp; - tp = session_get_transport_proto (s); - return transport_protocol_tx_fn_type (tp); -} - -u8 -session_tx_is_dgram (stream_session_t * s) -{ - return (session_transport_tx_fn_type (s) == TRANSPORT_TX_DGRAM); -} - /** * Allocate event queues in the shared-memory segment * @@ -1233,7 +1172,7 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm) u32 evt_q_length = 2048, evt_size = sizeof (session_event_t); ssvm_private_t *eqs = &smm->evt_qs_segment; api_main_t *am = &api_main; - u64 eqs_size = 64 << 20; + uword eqs_size = 64 << 20; pid_t vpp_pid = getpid (); void *oldheap; int i; @@ -1270,7 +1209,7 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm) svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = { {evt_q_length, evt_size, 0} , - {evt_q_length << 1, 256, 0} + {evt_q_length >> 1, 256, 0} }; cfg->consumer_pid = 0; cfg->n_rings = 2; @@ -1344,32 +1283,29 @@ session_register_transport (transport_proto_t transport_proto, } transport_connection_t * -session_get_transport (stream_session_t * s) +session_get_transport (session_t * s) { - transport_proto_t tp; if (s->session_state != SESSION_STATE_LISTENING) - { - tp = session_get_transport_proto (s); - return tp_vfts[tp].get_connection (s->connection_index, - s->thread_index); - } - return 0; + return transport_get_connection (session_get_transport_proto (s), + s->connection_index, s->thread_index); + else + return transport_get_listener (session_get_transport_proto (s), + s->connection_index); } transport_connection_t * -listen_session_get_transport (stream_session_t * s) +listen_session_get_transport (session_t * s) { - transport_proto_t tp = session_get_transport_proto (s); - return tp_vfts[tp].get_listener (s->connection_index); + return transport_get_listener (session_get_transport_proto (s), + s->connection_index); } int -listen_session_get_local_session_endpoint (stream_session_t * listener, +listen_session_get_local_session_endpoint (session_t * listener, session_endpoint_t * sep) { - transport_proto_t tp = session_get_transport_proto (listener); transport_connection_t *tc; - tc = tp_vfts[tp].get_listener (listener->connection_index); + tc = listen_session_get_transport (listener); if (!tc) { clib_warning ("no transport"); @@ -1629,6 +1565,9 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input) ; else if (unformat (input, "evt_qs_memfd_seg")) smm->evt_qs_use_memfd_seg = 1; + else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size, + &smm->evt_qs_segment_size)) + ; else return clib_error_return (0, "unknown input `%U'", format_unformat_error, input);