X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=ee22ccbe0adeba5f6a672d578669ec0a3c90ff3c;hb=c87c91d8b0e85997debaf575f2e30cc2702edf25;hp=991bcd5a53f79ca45b1bab25b945dbde82bec43c;hpb=52851e6aa9304054fd1059c8dd284abf8e532bf2;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 991bcd5a53f..ee22ccbe0ad 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -92,38 +92,104 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc, return 0; } -/** Enqueue buffer chain tail */ +/** + * Discards bytes from buffer chain + * + * It discards n_bytes_to_drop starting at first buffer after chain_b + */ +always_inline void +session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b, + vlib_buffer_t ** chain_b, + u32 n_bytes_to_drop) +{ + vlib_buffer_t *next = *chain_b; + u32 to_drop = n_bytes_to_drop; + ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT); + while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT)) + { + next = vlib_get_buffer (vm, next->next_buffer); + if (next->current_length > to_drop) + { + vlib_buffer_advance (next, to_drop); + to_drop = 0; + } + else + { + to_drop -= next->current_length; + next->current_length = 0; + } + } + *chain_b = next; + + if (to_drop == 0) + b->total_length_not_including_first_buffer -= n_bytes_to_drop; +} + +/** + * Enqueue buffer chain tail + */ always_inline int session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b, u32 offset, u8 is_in_order) { vlib_buffer_t *chain_b; - u32 chain_bi = b->next_buffer; + u32 chain_bi, len, diff; vlib_main_t *vm = vlib_get_main (); - u8 *data, len; - u16 written = 0; + u8 *data; + u32 written = 0; int rv = 0; + if (is_in_order && offset) + { + diff = offset - b->current_length; + if (diff > b->total_length_not_including_first_buffer) + return 0; + chain_b = b; + session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff); + chain_bi = vlib_get_buffer_index (vm, chain_b); + } + else + chain_bi = b->next_buffer; + do { chain_b = vlib_get_buffer (vm, chain_bi); data = vlib_buffer_get_current (chain_b); len = chain_b->current_length; + if (!len) + continue; if (is_in_order) { rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data); - if (rv < len) + if (rv == len) + { + written += rv; + } + else if (rv < len) { return (rv > 0) ? (written + rv) : written; } - written += rv; + else if (rv > len) + { + written += rv; + + /* written more than what was left in chain */ + if (written > b->total_length_not_including_first_buffer) + return written; + + /* drop the bytes that have already been delivered */ + session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len); + } } else { rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len, data); if (rv) - return -1; + { + clib_warning ("failed to enqueue multi-buffer seg"); + return -1; + } offset += len; } } @@ -155,22 +221,22 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, u32 offset, u8 queue_event, u8 is_in_order) { stream_session_t *s; - int enqueued = 0, rv; + int enqueued = 0, rv, in_order_off; s = stream_session_get (tc->s_index, tc->thread_index); if (is_in_order) { - enqueued = - svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length, - vlib_buffer_get_current (b)); - if (PREDICT_FALSE - ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0)) + enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, + b->current_length, + vlib_buffer_get_current (b)); + if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) + && enqueued >= 0)) { - rv = session_enqueue_chain_tail (s, b, 0, 1); - if (rv <= 0) - return enqueued; - enqueued += rv; + in_order_off = enqueued > b->current_length ? enqueued : 0; + rv = session_enqueue_chain_tail (s, b, in_order_off, 1); + if (rv > 0) + enqueued += rv; } } else @@ -179,9 +245,10 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, b->current_length, vlib_buffer_get_current (b)); if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv)) - rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0); - if (rv) - return -1; + session_enqueue_chain_tail (s, b, offset + b->current_length, 0); + /* if something was enqueued, report even this as success for ooo + * segment handling */ + return rv; } if (queue_event) @@ -226,7 +293,7 @@ u32 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc) { stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index); - if (s->session_state != SESSION_STATE_READY) + if (!s->server_tx_fifo) return 0; return svm_fifo_max_dequeue (s->server_tx_fifo); } @@ -266,7 +333,13 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block) return 0; /* Get session's server */ - app = application_get (s->app_index); + app = application_get_if_valid (s->app_index); + + if (PREDICT_FALSE (app == 0)) + { + clib_warning ("invalid s->app_index = %d", s->app_index); + return 0; + } /* Built-in server? Hand event to the callback... */ if (app->cb_fns.builtin_server_rx_callback) @@ -327,8 +400,9 @@ session_manager_flush_enqueue_events (u32 thread_index) stream_session_t *s0; /* Get session */ - s0 = stream_session_get (session_indices_to_enqueue[i], thread_index); - if (stream_session_enqueue_notify (s0, 0 /* don't block */ )) + s0 = stream_session_get_if_valid (session_indices_to_enqueue[i], + thread_index); + if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ )) { errors++; } @@ -366,21 +440,28 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) application_t *app; stream_session_t *new_s = 0; u64 handle; - u32 api_context = 0; + u32 opaque = 0; int error = 0; + u8 st; + st = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4); handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip, tc->lcl_port, tc->rmt_port, - tc->transport_proto); + st); if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE) { clib_warning ("This can't be good!"); return -1; } - /* Get the app's index from the handle we stored when opening connection */ - app = application_get (handle >> 32); - api_context = tc->s_index; + /* Get the app's index from the handle we stored when opening connection + * and the opaque (api_context for external apps) from transport session + * index*/ + app = application_get_if_valid (handle >> 32); + if (!app) + return -1; + + opaque = tc->s_index; if (!is_fail) { @@ -399,7 +480,7 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) } /* Notify client application */ - if (app->cb_fns.session_connected_callback (app->index, api_context, new_s, + if (app->cb_fns.session_connected_callback (app->index, opaque, new_s, is_fail)) { clib_warning ("failed to notify app"); @@ -484,9 +565,7 @@ stream_session_delete_notify (transport_connection_t * tc) /* App might've been removed already */ s = stream_session_get_if_valid (tc->s_index, tc->thread_index); if (!s) - { - return; - } + return; stream_session_delete (s); }