X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fnode.c;h=8681105c284405875ebf1bb6343459faa1e08324;hb=6792ec059696a358b6c98d8d86e9740b34c01e24;hp=e467f4e99e20cdc80927fde375985610c0d4020d;hpb=68b0fb0c620c7451ef1a6380c43c39de6614db51;p=vpp.git diff --git a/src/vnet/session/node.c b/src/vnet/session/node.c index e467f4e99e2..8681105c284 100644 --- a/src/vnet/session/node.c +++ b/src/vnet/session/node.c @@ -13,22 +13,15 @@ * limitations under the License. */ +#include #include #include -#include -#include - #include - -#include -#include #include +#include +#include #include -#include -#include -#include - vlib_node_registration_t session_queue_node; typedef struct @@ -52,8 +45,8 @@ format_session_queue_trace (u8 * s, va_list * args) vlib_node_registration_t session_queue_node; -#define foreach_session_queue_error \ -_(TX, "Packets transmitted") \ +#define foreach_session_queue_error \ +_(TX, "Packets transmitted") \ _(TIMER, "Timer events") typedef enum @@ -78,10 +71,11 @@ static u32 session_type_to_next[] = { }; always_inline int -session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node, - session_manager_main_t * smm, session_fifo_event_t * e0, - stream_session_t * s0, u32 thread_index, int *n_tx_packets, - u8 peek_data) +session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, + session_manager_main_t * smm, + session_fifo_event_t * e0, + stream_session_t * s0, u32 thread_index, + int *n_tx_packets, u8 peek_data) { u32 n_trace = vlib_get_trace_count (vm, node); u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0; @@ -90,10 +84,10 @@ session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node, transport_proto_vft_t *transport_vft; u32 next_index, next0, *to_next, n_left_to_next, bi0; vlib_buffer_t *b0; - u32 rx_offset; + u32 rx_offset = 0, max_dequeue0; u16 snd_mss0; u8 *data0; - int i; + int i, n_bytes_read; next_index = next0 = session_type_to_next[s0->session_type]; @@ -104,21 +98,34 @@ session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node, snd_space0 = transport_vft->send_space (tc0); snd_mss0 = transport_vft->send_mss (tc0); - if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0 - || snd_mss0 == 0) - return 0; - - ASSERT (e0->enqueue_length > 0); - - /* Ensure we're not writing more than transport window allows */ - max_len_to_snd0 = clib_min (e0->enqueue_length, snd_space0); + /* Can't make any progress */ + if (snd_space0 == 0 || snd_mss0 == 0) + { + vec_add1 (smm->evts_partially_read[thread_index], *e0); + return 0; + } if (peek_data) { /* Offset in rx fifo from where to peek data */ - rx_offset = transport_vft->rx_fifo_offset (tc0); + rx_offset = transport_vft->tx_fifo_offset (tc0); + } + + /* Check how much we can pull. If buffering, subtract the offset */ + max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo) - rx_offset; + + /* Allow enqueuing of a new event */ + svm_fifo_unset_event (s0->server_tx_fifo); + + /* Nothing to read return */ + if (max_dequeue0 == 0) + { + return 0; } + /* Ensure we're not writing more than transport window allows */ + max_len_to_snd0 = clib_min (max_dequeue0, snd_space0); + /* TODO check if transport is willing to send len_to_snd0 * bytes (Nagle) */ @@ -142,8 +149,10 @@ session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node, * XXX 0.9 because when debugging we might not get a full frame */ if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE)) { - /* Keep track of how much we've dequeued and exit */ - e0->enqueue_length -= max_len_to_snd0 - left_to_snd0; + if (svm_fifo_set_event (s0->server_tx_fifo)) + { + vec_add1 (smm->evts_partially_read[thread_index], *e0); + } return -1; } @@ -185,23 +194,17 @@ session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node, t0->server_thread_index = s0->thread_index; } - if (1) - { - ELOG_TYPE_DECLARE (e) = - { - .format = "evt-dequeue: id %d length %d",.format_args = - "i4i4",}; - struct - { - u32 data[2]; - } *ed; - ed = ELOG_DATA (&vm->elog_main, e); - ed->data[0] = e0->event_id; - ed->data[1] = e0->enqueue_length; - } - len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0; + /* *INDENT-OFF* */ + SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({ + ed->data[0] = e0->event_id; + ed->data[1] = max_dequeue0; + ed->data[2] = len_to_deq0; + ed->data[3] = left_to_snd0; + })); + /* *INDENT-ON* */ + /* Make room for headers */ data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN); @@ -210,29 +213,30 @@ session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node, * 2) buffer chains */ if (peek_data) { - int n_bytes_read; n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid, rx_offset, len_to_deq0, data0); - if (n_bytes_read < 0) + if (n_bytes_read <= 0) goto dequeue_fail; /* Keep track of progress locally, transport is also supposed to - * increment it independently when pushing header */ + * increment it independently when pushing the header */ rx_offset += n_bytes_read; } else { - if (svm_fifo_dequeue_nowait (s0->server_tx_fifo, s0->pid, - len_to_deq0, data0) < 0) + n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo, + s0->pid, len_to_deq0, + data0); + if (n_bytes_read <= 0) goto dequeue_fail; } - b0->current_length = len_to_deq0; + b0->current_length = n_bytes_read; /* Ask transport to push header */ transport_vft->push_header (tc0, b0); - left_to_snd0 -= len_to_deq0; + left_to_snd0 -= n_bytes_read; *n_tx_packets = *n_tx_packets + 1; vlib_validate_buffer_enqueue_x1 (vm, node, next_index, @@ -242,45 +246,54 @@ session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_put_next_frame (vm, node, next_index, n_left_to_next); } - /* If we couldn't dequeue all bytes store progress */ - if (max_len_to_snd0 < e0->enqueue_length) + /* If we couldn't dequeue all bytes mark as partially read */ + if (max_len_to_snd0 < max_dequeue0) { - e0->enqueue_length -= max_len_to_snd0; - vec_add1 (smm->evts_partially_read[thread_index], *e0); + /* If we don't already have new event */ + if (svm_fifo_set_event (s0->server_tx_fifo)) + { + vec_add1 (smm->evts_partially_read[thread_index], *e0); + } } return 0; dequeue_fail: - /* Can't read from fifo. Store event rx progress, save as partially read, - * return buff to free list and return */ - e0->enqueue_length -= max_len_to_snd0 - left_to_snd0; - vec_add1 (smm->evts_partially_read[thread_index], *e0); + /* + * Can't read from fifo. If we don't already have an event, save as partially + * read, return buff to free list and return + */ + clib_warning ("dequeue fail"); - to_next -= 1; - n_left_to_next += 1; + if (svm_fifo_set_event (s0->server_tx_fifo)) + { + vec_add1 (smm->evts_partially_read[thread_index], *e0); + } + vlib_put_next_frame (vm, node, next_index, n_left_to_next + 1); _vec_len (smm->tx_buffers[thread_index]) += 1; - clib_warning ("dequeue fail"); return 0; } int -session_fifo_rx_peek (vlib_main_t * vm, vlib_node_runtime_t * node, - session_manager_main_t * smm, session_fifo_event_t * e0, - stream_session_t * s0, u32 thread_index, int *n_tx_pkts) +session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node, + session_manager_main_t * smm, + session_fifo_event_t * e0, + stream_session_t * s0, u32 thread_index, + int *n_tx_pkts) { - return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts, - 1); + return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index, + n_tx_pkts, 1); } int -session_fifo_rx_dequeue (vlib_main_t * vm, vlib_node_runtime_t * node, - session_manager_main_t * smm, - session_fifo_event_t * e0, stream_session_t * s0, - u32 thread_index, int *n_tx_pkts) +session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node, + session_manager_main_t * smm, + session_fifo_event_t * e0, + stream_session_t * s0, u32 thread_index, + int *n_tx_pkts) { - return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts, - 0); + return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index, + n_tx_pkts, 0); } static uword @@ -289,8 +302,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, { session_manager_main_t *smm = vnet_get_session_manager_main (); session_fifo_event_t *my_fifo_events, *e; - u32 n_to_dequeue; + u32 n_to_dequeue, n_events; unix_shared_memory_queue_t *q; + application_t *app; int n_tx_packets = 0; u32 my_thread_index = vm->cpu_index; int i, rv; @@ -309,16 +323,23 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, /* min number of events we can dequeue without blocking */ n_to_dequeue = q->cursize; - if (n_to_dequeue == 0) + my_fifo_events = smm->fifo_events[my_thread_index]; + + if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0) return 0; - my_fifo_events = smm->fifo_events[my_thread_index]; + SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0); - /* If we didn't manage to process previous events try going + /* + * If we didn't manage to process previous events try going * over them again without dequeuing new ones. - * XXX: Block senders to sessions that can't keep up */ + */ + /* XXX: Block senders to sessions that can't keep up */ if (vec_len (my_fifo_events) >= 100) - goto skip_dequeue; + { + clib_warning ("too many fifo events unsolved"); + goto skip_dequeue; + } /* See you in the next life, don't be late */ if (pthread_mutex_trylock (&q->mutex)) @@ -338,24 +359,31 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, smm->fifo_events[my_thread_index] = my_fifo_events; skip_dequeue: - - for (i = 0; i < n_to_dequeue; i++) + n_events = vec_len (my_fifo_events); + for (i = 0; i < n_events; i++) { svm_fifo_t *f0; /* $$$ prefetch 1 ahead maybe */ stream_session_t *s0; - u32 server_session_index0, server_thread_index0; + u32 session_index0; session_fifo_event_t *e0; e0 = &my_fifo_events[i]; f0 = e0->fifo; - server_session_index0 = f0->server_session_index; - server_thread_index0 = f0->server_thread_index; + session_index0 = f0->server_session_index; /* $$$ add multiple event queues, per vpp worker thread */ - ASSERT (server_thread_index0 == my_thread_index); + ASSERT (f0->server_thread_index == my_thread_index); - s0 = pool_elt_at_index (smm->sessions[my_thread_index], - server_session_index0); + s0 = stream_session_get_if_valid (session_index0, my_thread_index); + + if (CLIB_DEBUG && !s0) + { + clib_warning ("It's dead, Jim!"); + continue; + } + + if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED)) + continue; ASSERT (s0->thread_index == my_thread_index); @@ -364,14 +392,23 @@ skip_dequeue: case FIFO_EVENT_SERVER_TX: /* Spray packets in per session type frames, since they go to * different nodes */ - rv = (smm->session_rx_fns[s0->session_type]) (vm, node, smm, e0, s0, + rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0, my_thread_index, &n_tx_packets); + /* Out of buffers */ if (rv < 0) goto done; break; - + case FIFO_EVENT_SERVER_EXIT: + stream_session_disconnect (s0); + break; + case FIFO_EVENT_BUILTIN_RX: + svm_fifo_unset_event (s0->server_rx_fifo); + /* Get session's server */ + app = application_get (s0->app_index); + app->cb_fns.builtin_server_rx_callback (s0); + break; default: clib_warning ("unhandled event type %d", e0->event_type); } @@ -380,11 +417,11 @@ skip_dequeue: done: /* Couldn't process all events. Probably out of buffers */ - if (PREDICT_FALSE (i < n_to_dequeue)) + if (PREDICT_FALSE (i < n_events)) { session_fifo_event_t *partially_read = smm->evts_partially_read[my_thread_index]; - vec_add (partially_read, &my_fifo_events[i], n_to_dequeue - i); + vec_add (partially_read, &my_fifo_events[i], n_events - i); vec_free (my_fifo_events); smm->fifo_events[my_thread_index] = partially_read; smm->evts_partially_read[my_thread_index] = 0; @@ -400,6 +437,8 @@ done: vlib_node_increment_counter (vm, session_queue_node.index, SESSION_QUEUE_ERROR_TX, n_tx_packets); + SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 1); + return n_tx_packets; } @@ -413,8 +452,7 @@ VLIB_REGISTER_NODE (session_queue_node) = .n_errors = ARRAY_LEN (session_queue_error_strings), .error_strings = session_queue_error_strings, .n_next_nodes = SESSION_QUEUE_N_NEXT, - /* .state = VLIB_NODE_STATE_DISABLED, enable on-demand? */ - /* edit / add dispositions here */ + .state = VLIB_NODE_STATE_DISABLED, .next_nodes = { [SESSION_QUEUE_NEXT_DROP] = "error-drop",