X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=88b38f15a61351bd21122835b947931926c0af2f;hb=refs%2Fchanges%2F11%2F8611%2F22;hp=843d474fa198d3ddcdfb8aa99b4b10ada0ed727a;hpb=ab0289a85c45699878d203b4a0d2e5b38c36cc55;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 843d474fa19..88b38f15a61 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -85,45 +85,111 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc, /* Add to the main lookup table */ value = stream_session_handle (s); - stream_session_table_add_for_tc (tc, value); + session_lookup_add_connection (tc, value); *ret_s = s; return 0; } -/** Enqueue buffer chain tail */ +/** + * Discards bytes from buffer chain + * + * It discards n_bytes_to_drop starting at first buffer after chain_b + */ +always_inline void +session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b, + vlib_buffer_t ** chain_b, + u32 n_bytes_to_drop) +{ + vlib_buffer_t *next = *chain_b; + u32 to_drop = n_bytes_to_drop; + ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT); + while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT)) + { + next = vlib_get_buffer (vm, next->next_buffer); + if (next->current_length > to_drop) + { + vlib_buffer_advance (next, to_drop); + to_drop = 0; + } + else + { + to_drop -= next->current_length; + next->current_length = 0; + } + } + *chain_b = next; + + if (to_drop == 0) + b->total_length_not_including_first_buffer -= n_bytes_to_drop; +} + +/** + * Enqueue buffer chain tail + */ always_inline int session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b, u32 offset, u8 is_in_order) { vlib_buffer_t *chain_b; - u32 chain_bi = b->next_buffer, len; + u32 chain_bi, len, diff; vlib_main_t *vm = vlib_get_main (); u8 *data; - u16 written = 0; + u32 written = 0; int rv = 0; + if (is_in_order && offset) + { + diff = offset - b->current_length; + if (diff > b->total_length_not_including_first_buffer) + return 0; + chain_b = b; + session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff); + chain_bi = vlib_get_buffer_index (vm, chain_b); + } + else + chain_bi = b->next_buffer; + do { chain_b = vlib_get_buffer (vm, chain_bi); data = vlib_buffer_get_current (chain_b); len = chain_b->current_length; + if (!len) + continue; if (is_in_order) { rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data); - if (rv < len) + if (rv == len) + { + written += rv; + } + else if (rv < len) { return (rv > 0) ? (written + rv) : written; } - written += rv; + else if (rv > len) + { + written += rv; + + /* written more than what was left in chain */ + if (written > b->total_length_not_including_first_buffer) + return written; + + /* drop the bytes that have already been delivered */ + session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len); + } } else { rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len, data); if (rv) - return -1; + { + clib_warning ("failed to enqueue multi-buffer seg"); + return -1; + } offset += len; } } @@ -155,22 +221,22 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, u32 offset, u8 queue_event, u8 is_in_order) { stream_session_t *s; - int enqueued = 0, rv; + int enqueued = 0, rv, in_order_off; - s = stream_session_get (tc->s_index, tc->thread_index); + s = session_get (tc->s_index, tc->thread_index); if (is_in_order) { - enqueued = - svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length, - vlib_buffer_get_current (b)); - if (PREDICT_FALSE - ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0)) + enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, + b->current_length, + vlib_buffer_get_current (b)); + if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) + && enqueued >= 0)) { - rv = session_enqueue_chain_tail (s, b, 0, 1); - if (rv <= 0) - return enqueued; - enqueued += rv; + in_order_off = enqueued > b->current_length ? enqueued : 0; + rv = session_enqueue_chain_tail (s, b, in_order_off, 1); + if (rv > 0) + enqueued += rv; } } else @@ -179,9 +245,10 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, b->current_length, vlib_buffer_get_current (b)); if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv)) - rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0); - if (rv) - return -1; + session_enqueue_chain_tail (s, b, offset + b->current_length, 0); + /* if something was enqueued, report even this as success for ooo + * segment handling */ + return rv; } if (queue_event) @@ -200,10 +267,7 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, } } - if (is_in_order) - return enqueued; - - return 0; + return enqueued; } /** Check if we have space in rx fifo to push more bytes */ @@ -211,7 +275,7 @@ u8 stream_session_no_space (transport_connection_t * tc, u32 thread_index, u16 data_len) { - stream_session_t *s = stream_session_get (tc->s_index, thread_index); + stream_session_t *s = session_get (tc->s_index, thread_index); if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY)) return 1; @@ -225,7 +289,7 @@ stream_session_no_space (transport_connection_t * tc, u32 thread_index, u32 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc) { - stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index); + stream_session_t *s = session_get (tc->s_index, tc->thread_index); if (!s->server_tx_fifo) return 0; return svm_fifo_max_dequeue (s->server_tx_fifo); @@ -235,14 +299,14 @@ int stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer, u32 offset, u32 max_bytes) { - stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index); + stream_session_t *s = session_get (tc->s_index, tc->thread_index); return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer); } u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) { - stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index); + stream_session_t *s = session_get (tc->s_index, tc->thread_index); return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes); } @@ -263,7 +327,13 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block) static u32 serial_number; if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED)) - return 0; + { + /* Session is closed so app will never clean up. Flush rx fifo */ + u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo); + if (to_dequeue) + svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue); + return 0; + } /* Get session's server */ app = application_get_if_valid (s->app_index); @@ -362,7 +432,7 @@ stream_session_init_fifos_pointers (transport_connection_t * tc, u32 rx_pointer, u32 tx_pointer) { stream_session_t *s; - s = stream_session_get (tc->s_index, tc->thread_index); + s = session_get (tc->s_index, tc->thread_index); svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer); svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer); } @@ -376,19 +446,23 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) u32 opaque = 0; int error = 0; - handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip, - tc->lcl_port, tc->rmt_port, - tc->transport_proto); + handle = session_lookup_half_open_handle (tc); if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE) { - clib_warning ("This can't be good!"); + SESSION_DBG ("half-open was removed!"); return -1; } + /* Cleanup half-open table */ + session_lookup_del_half_open (tc); + /* Get the app's index from the handle we stored when opening connection * and the opaque (api_context for external apps) from transport session - * index*/ - app = application_get (handle >> 32); + * index */ + app = application_get_if_valid (handle >> 32); + if (!app) + return -1; + opaque = tc->s_index; if (!is_fail) @@ -411,7 +485,7 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) if (app->cb_fns.session_connected_callback (app->index, opaque, new_s, is_fail)) { - clib_warning ("failed to notify app"); + SESSION_DBG ("failed to notify app"); if (!is_fail) stream_session_disconnect (new_s); } @@ -421,9 +495,6 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) new_s->session_state = SESSION_STATE_READY; } - /* Cleanup session lookup */ - stream_session_half_open_table_del (tc); - return error; } @@ -433,7 +504,7 @@ stream_session_accept_notify (transport_connection_t * tc) application_t *server; stream_session_t *s; - s = stream_session_get (tc->s_index, tc->thread_index); + s = session_get (tc->s_index, tc->thread_index); server = application_get (s->app_index); server->cb_fns.session_accept_callback (s); } @@ -451,13 +522,13 @@ stream_session_disconnect_notify (transport_connection_t * tc) application_t *server; stream_session_t *s; - s = stream_session_get (tc->s_index, tc->thread_index); + s = session_get (tc->s_index, tc->thread_index); server = application_get (s->app_index); server->cb_fns.session_disconnect_callback (s); } /** - * Cleans up session and associated app if needed. + * Cleans up session and lookup table. */ void stream_session_delete (stream_session_t * s) @@ -466,7 +537,7 @@ stream_session_delete (stream_session_t * s) int rv; /* Delete from the main lookup table. */ - if ((rv = stream_session_table_del (s))) + if ((rv = session_lookup_del_session (s))) clib_warning ("hash delete error, rv %d", rv); /* Cleanup fifo segments */ @@ -481,9 +552,10 @@ stream_session_delete (stream_session_t * s) /** * Notification from transport that connection is being deleted * - * This should be called only on previously fully established sessions. For - * instance failed connects should call stream_session_connect_notify and - * indicate that the connect has failed. + * This removes the session if it is still valid. It should be called only on + * previously fully established sessions. For instance failed connects should + * call stream_session_connect_notify and indicate that the connect has + * failed. */ void stream_session_delete_notify (transport_connection_t * tc) @@ -493,9 +565,7 @@ stream_session_delete_notify (transport_connection_t * tc) /* App might've been removed already */ s = stream_session_get_if_valid (tc->s_index, tc->thread_index); if (!s) - { - return; - } + return; stream_session_delete (s); } @@ -507,7 +577,7 @@ stream_session_reset_notify (transport_connection_t * tc) { stream_session_t *s; application_t *app; - s = stream_session_get (tc->s_index, tc->thread_index); + s = session_get (tc->s_index, tc->thread_index); app = application_get (s->app_index); app->cb_fns.session_reset_callback (s); @@ -518,14 +588,16 @@ stream_session_reset_notify (transport_connection_t * tc) */ int stream_session_accept (transport_connection_t * tc, u32 listener_index, - u8 sst, u8 notify) + u8 notify) { application_t *server; stream_session_t *s, *listener; segment_manager_t *sm; - + session_type_t sst; int rv; + sst = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4); + /* Find the server */ listener = listen_session_get (sst, listener_index); server = application_get (listener->app_index); @@ -560,22 +632,23 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index, * @param res Resulting transport connection . */ int -stream_session_open (u32 app_index, session_type_t st, - transport_endpoint_t * rmt, +stream_session_open (u32 app_index, session_endpoint_t * rmt, transport_connection_t ** res) { transport_connection_t *tc; + session_type_t sst; int rv; u64 handle; - rv = tp_vfts[st].open (rmt); + sst = session_type_from_proto_and_ip (rmt->transport_proto, rmt->is_ip4); + rv = tp_vfts[sst].open (session_endpoint_to_transport (rmt)); if (rv < 0) { clib_warning ("Transport failed to open connection."); - return VNET_API_ERROR_SESSION_CONNECT_FAIL; + return VNET_API_ERROR_SESSION_CONNECT; } - tc = tp_vfts[st].get_half_open ((u32) rv); + tc = tp_vfts[sst].get_half_open ((u32) rv); /* Save app and tc index. The latter is needed to help establish the * connection while the former is needed when the connect notify comes @@ -583,7 +656,7 @@ stream_session_open (u32 app_index, session_type_t st, handle = (((u64) app_index) << 32) | (u64) tc->c_index; /* Add to the half-open lookup table */ - stream_session_half_open_table_add (tc, handle); + session_lookup_add_half_open (tc, handle); *res = tc; @@ -599,13 +672,14 @@ stream_session_open (u32 app_index, session_type_t st, * @param tep Local endpoint to be listened on. */ int -stream_session_listen (stream_session_t * s, transport_endpoint_t * tep) +stream_session_listen (stream_session_t * s, session_endpoint_t * tep) { transport_connection_t *tc; u32 tci; /* Transport bind/listen */ - tci = tp_vfts[s->session_type].bind (s->session_index, tep); + tci = tp_vfts[s->session_type].bind (s->session_index, + session_endpoint_to_transport (tep)); if (tci == (u32) ~ 0) return -1; @@ -619,7 +693,7 @@ stream_session_listen (stream_session_t * s, transport_endpoint_t * tep) return -1; /* Add to the main lookup table */ - stream_session_table_add_for_tc (tc, s->session_index); + session_lookup_add_connection (tc, s->session_index); return 0; } @@ -647,7 +721,7 @@ stream_session_stop_listen (stream_session_t * s) return VNET_API_ERROR_ADDRESS_NOT_IN_USE; } - stream_session_table_del_for_tc (tc); + session_lookup_del_connection (tc); tp_vfts[s->session_type].unbind (s->connection_index); return 0; } @@ -658,6 +732,7 @@ session_send_session_evt_to_thread (u64 session_handle, u32 thread_index) { static u16 serial_number = 0; + u32 tries = 0; session_fifo_event_t evt; unix_shared_memory_queue_t *q; @@ -667,21 +742,14 @@ session_send_session_evt_to_thread (u64 session_handle, evt.event_id = serial_number++; q = session_manager_get_vpp_event_queue (thread_index); - - /* Based on request block (or not) for lack of space */ - if (PREDICT_TRUE (q->cursize < q->maxsize)) + while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1)) { - if (unix_shared_memory_queue_add (q, (u8 *) & evt, - 1 /* do wait for mutex */ )) + if (tries++ == 3) { - clib_warning ("failed to enqueue evt"); + TCP_DBG ("failed to enqueue evt"); + break; } } - else - { - clib_warning ("queue full"); - return; - } } /** @@ -712,7 +780,7 @@ stream_session_cleanup (stream_session_t * s) s->session_state = SESSION_STATE_CLOSED; /* Delete from the main lookup table to avoid more enqueues */ - rv = stream_session_table_del (s); + rv = session_lookup_del_session (s); if (rv) clib_warning ("hash delete error, rv %d", rv); @@ -769,6 +837,26 @@ session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4) return SESSION_N_TYPES; } +int +listen_session_get_local_session_endpoint (stream_session_t * listener, + session_endpoint_t * sep) +{ + transport_connection_t *tc; + tc = + tp_vfts[listener->session_type].get_listener (listener->connection_index); + if (!tc) + { + clib_warning ("no transport"); + return -1; + } + + /* N.B. The ip should not be copied because this is the local endpoint */ + sep->port = tc->lcl_port; + sep->transport_proto = tc->transport_proto; + sep->is_ip4 = tc->is_ip4; + return 0; +} + static clib_error_t * session_manager_main_enable (vlib_main_t * vm) { @@ -813,36 +901,29 @@ session_manager_main_enable (vlib_main_t * vm) session_vpp_event_queue_allocate (smm, i); /* Preallocate sessions */ - if (num_threads == 1) + if (smm->preallocated_sessions) { - for (i = 0; i < smm->preallocated_sessions; i++) + if (num_threads == 1) { - stream_session_t *ss __attribute__ ((unused)); - pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES); + pool_init_fixed (smm->sessions[0], smm->preallocated_sessions); } - - for (i = 0; i < smm->preallocated_sessions; i++) - pool_put_index (smm->sessions[0], i); - } - else - { - int j; - preallocated_sessions_per_worker = smm->preallocated_sessions / - (num_threads - 1); - - for (j = 1; j < num_threads; j++) + else { - for (i = 0; i < preallocated_sessions_per_worker; i++) + int j; + preallocated_sessions_per_worker = + (1.1 * (f64) smm->preallocated_sessions / + (f64) (num_threads - 1)); + + for (j = 1; j < num_threads; j++) { - stream_session_t *ss __attribute__ ((unused)); - pool_get_aligned (smm->sessions[j], ss, CLIB_CACHE_LINE_BYTES); + pool_init_fixed (smm->sessions[j], + preallocated_sessions_per_worker); } - for (i = 0; i < preallocated_sessions_per_worker; i++) - pool_put_index (smm->sessions[j], i); } } session_lookup_init (); + app_namespaces_init (); smm->is_enabled = 1; @@ -867,14 +948,14 @@ session_node_enable_disable (u8 is_en) clib_error_t * vnet_session_enable_disable (vlib_main_t * vm, u8 is_en) { + clib_error_t *error = 0; if (is_en) { if (session_manager_main.is_enabled) return 0; session_node_enable_disable (is_en); - - return session_manager_main_enable (vm); + error = session_manager_main_enable (vm); } else { @@ -882,7 +963,7 @@ vnet_session_enable_disable (vlib_main_t * vm, u8 is_en) session_node_enable_disable (is_en); } - return 0; + return error; } clib_error_t *