X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession_node.c;h=4fd8d0e0299ecdbe7de85d034b5d1eecb008f107;hb=b26743d093141a2aef19bdf8a7fe06dcaa81329a;hp=269e2fb591ebd32c616f51130774c587ef250f9e;hpb=8e43d04ca4f4496aaefc4f5e2b6e1c0951624099;p=vpp.git diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 269e2fb591e..4fd8d0e0299 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -65,17 +65,24 @@ static char *session_queue_error_strings[] = { #undef _ }; -always_inline void -session_tx_trace_buffer (vlib_main_t * vm, vlib_node_runtime_t * node, - u32 next_index, vlib_buffer_t * b, - stream_session_t * s, u32 * n_trace) +static void +session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node, + u32 next_index, u32 * to_next, u16 n_segs, + stream_session_t * s, u32 n_trace) { session_queue_trace_t *t; - vlib_trace_buffer (vm, node, next_index, b, 1 /* follow_chain */ ); - vlib_set_trace_count (vm, node, --*n_trace); - t = vlib_add_trace (vm, node, b, sizeof (*t)); - t->session_index = s->session_index; - t->server_thread_index = s->thread_index; + vlib_buffer_t *b; + int i; + + for (i = 0; i < clib_min (n_trace, n_segs); i++) + { + b = vlib_get_buffer (vm, to_next[i - n_segs]); + vlib_trace_buffer (vm, node, next_index, b, 1 /* follow_chain */ ); + t = vlib_add_trace (vm, node, b, sizeof (*t)); + t->session_index = s->session_index; + t->server_thread_index = s->thread_index; + } + vlib_set_trace_count (vm, node, n_trace - i); } always_inline void @@ -159,23 +166,14 @@ session_output_try_get_buffers (vlib_main_t * vm, session_manager_main_t * smm, u32 thread_index, u16 * n_bufs, u32 wanted) { - u32 bufs_alloc = 0, bufs_now; - vec_validate_aligned (smm->tx_buffers[thread_index], *n_bufs + wanted - 1, + u32 n_alloc; + vec_validate_aligned (smm->tx_buffers[thread_index], wanted - 1, CLIB_CACHE_LINE_BYTES); - do - { - bufs_now = - vlib_buffer_alloc (vm, - &smm->tx_buffers[thread_index][*n_bufs + - bufs_alloc], - wanted - bufs_alloc); - bufs_alloc += bufs_now; - } - while (bufs_now > 0 && ((bufs_alloc + *n_bufs < wanted))); - - *n_bufs += bufs_alloc; + n_alloc = vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][*n_bufs], + wanted - *n_bufs); + *n_bufs += n_alloc; _vec_len (smm->tx_buffers[thread_index]) = *n_bufs; - return bufs_alloc; + return n_alloc; } always_inline void @@ -192,7 +190,6 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx, b->error = 0; b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED; b->current_data = 0; - b->total_length_not_including_first_buffer = 0; data0 = vlib_buffer_make_headroom (b, MAX_HDRS_LEN); len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf); @@ -251,11 +248,11 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx, session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data); /* *INDENT-OFF* */ - SESSION_EVT_DBG(SESSION_EVT_DEQ, s, ({ - ed->data[0] = e->event_type; - ed->data[1] = max_dequeue; + SESSION_EVT_DBG(SESSION_EVT_DEQ, ctx->s, ({ + ed->data[0] = FIFO_EVENT_APP_TX; + ed->data[1] = ctx->max_dequeue; ed->data[2] = len_to_deq; - ed->data[3] = left_to_snd; + ed->data[3] = ctx->left_to_snd; })); /* *INDENT-ON* */ } @@ -269,6 +266,8 @@ session_tx_not_ready (stream_session_t * s, u8 peek_data) * session is not ready or closed */ if (s->session_state < SESSION_STATE_READY) return 1; + if (s->session_state == SESSION_STATE_CLOSED) + return 2; } return 0; } @@ -295,7 +294,7 @@ session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data) always_inline void session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, - u8 peek_data) + u32 max_segs, u8 peek_data) { u32 n_bytes_per_buf, n_bytes_per_seg; ctx->max_dequeue = svm_fifo_max_dequeue (ctx->s->server_tx_fifo); @@ -342,11 +341,19 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, ctx->max_len_to_snd = ctx->snd_space; } + /* Check if we're tx constrained by the node */ + ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss); + if (ctx->n_segs_per_evt > max_segs) + { + ctx->n_segs_per_evt = max_segs; + ctx->max_len_to_snd = max_segs * ctx->snd_mss; + } + n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX); ASSERT (n_bytes_per_buf > MAX_HDRS_LEN); n_bytes_per_seg = MAX_HDRS_LEN + ctx->snd_mss; - ctx->n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf); + ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf); ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf); ctx->deq_per_first_buf = clib_min (ctx->snd_mss, n_bytes_per_buf - MAX_HDRS_LEN); @@ -358,23 +365,24 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, stream_session_t * s, int *n_tx_packets, u8 peek_data) { - u32 next_index, next0, next1, next2, next3, *to_next, n_left_to_next; - u32 n_trace = vlib_get_trace_count (vm, node), n_packets = 0, pbi; - u32 n_bufs_per_frame, thread_index = s->thread_index; + u32 next_index, next0, next1, *to_next, n_left_to_next; + u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0; + u32 thread_index = s->thread_index, n_left, pbi; session_manager_main_t *smm = &session_manager_main; session_tx_context_t *ctx = &smm->ctx[thread_index]; transport_proto_t tp; vlib_buffer_t *pb; - u16 n_bufs; + u16 n_bufs, rv; - if (PREDICT_FALSE (session_tx_not_ready (s, peek_data))) + if (PREDICT_FALSE ((rv = session_tx_not_ready (s, peek_data)))) { - vec_add1 (smm->pending_event_vector[thread_index], *e); + if (rv < 2) + vec_add1 (smm->pending_event_vector[thread_index], *e); return 0; } next_index = smm->session_type_to_next[s->session_type]; - next0 = next1 = next2 = next3 = next_index; + next0 = next1 = next_index; tp = session_get_transport_proto (s); ctx->s = s; @@ -392,22 +400,23 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, svm_fifo_unset_event (s->server_tx_fifo); /* Check how much we can pull. */ - session_tx_set_dequeue_params (vm, ctx, peek_data); + session_tx_set_dequeue_params (vm, ctx, VLIB_FRAME_SIZE - *n_tx_packets, + peek_data); + if (PREDICT_FALSE (!ctx->max_len_to_snd)) return 0; n_bufs = vec_len (smm->tx_buffers[thread_index]); - ctx->left_to_snd = ctx->max_len_to_snd; + n_bufs_needed = ctx->n_segs_per_evt * ctx->n_bufs_per_seg; /* * Make sure we have at least one full frame of buffers ready */ - n_bufs_per_frame = ctx->n_bufs_per_seg * VLIB_FRAME_SIZE; - if (n_bufs < n_bufs_per_frame) + if (n_bufs < n_bufs_needed) { session_output_try_get_buffers (vm, smm, thread_index, &n_bufs, - n_bufs_per_frame); - if (PREDICT_FALSE (n_bufs < n_bufs_per_frame)) + ctx->n_bufs_per_seg * VLIB_FRAME_SIZE); + if (PREDICT_FALSE (n_bufs < n_bufs_needed)) { vec_add1 (smm->pending_event_vector[thread_index], *e); return -1; @@ -418,82 +427,83 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, * Write until we fill up a frame */ vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); - while (ctx->left_to_snd && n_left_to_next) + if (PREDICT_FALSE (ctx->n_segs_per_evt > n_left_to_next)) { - while (ctx->left_to_snd > 3 * ctx->snd_mss && n_left_to_next >= 4) - { - vlib_buffer_t *b0, *b1; - u32 bi0, bi1; + ctx->n_segs_per_evt = n_left_to_next; + ctx->max_len_to_snd = ctx->snd_mss * n_left_to_next; + } + ctx->left_to_snd = ctx->max_len_to_snd; + n_left = ctx->n_segs_per_evt; - pbi = smm->tx_buffers[thread_index][n_bufs - 3]; - pb = vlib_get_buffer (vm, pbi); - vlib_prefetch_buffer_header (pb, STORE); - pbi = smm->tx_buffers[thread_index][n_bufs - 4]; - pb = vlib_get_buffer (vm, pbi); - vlib_prefetch_buffer_header (pb, STORE); + while (n_left >= 4) + { + vlib_buffer_t *b0, *b1; + u32 bi0, bi1; - to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs]; - to_next[1] = bi1 = smm->tx_buffers[thread_index][--n_bufs]; + pbi = smm->tx_buffers[thread_index][n_bufs - 3]; + pb = vlib_get_buffer (vm, pbi); + vlib_prefetch_buffer_header (pb, STORE); + pbi = smm->tx_buffers[thread_index][n_bufs - 4]; + pb = vlib_get_buffer (vm, pbi); + vlib_prefetch_buffer_header (pb, STORE); - b0 = vlib_get_buffer (vm, bi0); - b1 = vlib_get_buffer (vm, bi1); + to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs]; + to_next[1] = bi1 = smm->tx_buffers[thread_index][--n_bufs]; - session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data); - session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data); + b0 = vlib_get_buffer (vm, bi0); + b1 = vlib_get_buffer (vm, bi1); - ctx->transport_vft->push_header (ctx->tc, b0); - ctx->transport_vft->push_header (ctx->tc, b1); + session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data); + session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data); - to_next += 2; - n_left_to_next -= 2; - n_packets += 2; + ctx->transport_vft->push_header (ctx->tc, b0); + ctx->transport_vft->push_header (ctx->tc, b1); - VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); - VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1); + to_next += 2; + n_left_to_next -= 2; + n_left -= 2; - if (PREDICT_FALSE (n_trace > 0)) - { - session_tx_trace_buffer (vm, node, next_index, b0, s, &n_trace); - if (n_trace) - session_tx_trace_buffer (vm, node, next_index, b1, s, - &n_trace); - } + VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); + VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1); - vlib_validate_buffer_enqueue_x2 (vm, node, next_index, to_next, - n_left_to_next, bi0, bi1, - next0, next1); - } - while (ctx->left_to_snd && n_left_to_next) - { - vlib_buffer_t *b0; - u32 bi0; + vlib_validate_buffer_enqueue_x2 (vm, node, next_index, to_next, + n_left_to_next, bi0, bi1, next0, + next1); + } + while (n_left) + { + vlib_buffer_t *b0; + u32 bi0; - ASSERT (n_bufs >= 1); - to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs]; - b0 = vlib_get_buffer (vm, bi0); - session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data); + ASSERT (n_bufs >= 1); + to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs]; + b0 = vlib_get_buffer (vm, bi0); + session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data); - /* Ask transport to push header after current_length and - * total_length_not_including_first_buffer are updated */ - ctx->transport_vft->push_header (ctx->tc, b0); + /* Ask transport to push header after current_length and + * total_length_not_including_first_buffer are updated */ + ctx->transport_vft->push_header (ctx->tc, b0); - to_next += 1; - n_left_to_next -= 1; - n_packets += 1; + to_next += 1; + n_left_to_next -= 1; + n_left -= 1; - VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); - if (PREDICT_FALSE (n_trace > 0)) - session_tx_trace_buffer (vm, node, next_index, b0, s, &n_trace); + VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); - vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, - n_left_to_next, bi0, next0); - } + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, + n_left_to_next, bi0, next0); } + + if (PREDICT_FALSE (n_trace > 0)) + session_tx_trace_frame (vm, node, next_index, to_next, + ctx->n_segs_per_evt, s, n_trace); + _vec_len (smm->tx_buffers[thread_index]) = n_bufs; - *n_tx_packets += n_packets; + *n_tx_packets += ctx->n_segs_per_evt; vlib_put_next_frame (vm, node, next_index, n_left_to_next); /* If we couldn't dequeue all bytes mark as partially read */ + ASSERT (ctx->left_to_snd == 0); if (ctx->max_len_to_snd < ctx->max_dequeue) if (svm_fifo_set_event (s->server_tx_fifo)) vec_add1 (smm->pending_event_vector[thread_index], *e); @@ -681,40 +691,39 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame) { session_manager_main_t *smm = vnet_get_session_manager_main (); - session_fifo_event_t *my_pending_event_vector, *pending_disconnects, *e; + session_fifo_event_t *my_pending_event_vector, *e; session_fifo_event_t *my_fifo_events; u32 n_to_dequeue, n_events; svm_queue_t *q; application_t *app; int n_tx_packets = 0; - u32 my_thread_index = vm->thread_index; + u32 thread_index = vm->thread_index; int i, rv; f64 now = vlib_time_now (vm); void (*fp) (void *); - SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, my_thread_index); + SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index); /* * Update transport time */ - transport_update_time (now, my_thread_index); + transport_update_time (now, thread_index); /* * Get vpp queue events */ - q = smm->vpp_event_queues[my_thread_index]; + q = smm->vpp_event_queues[thread_index]; if (PREDICT_FALSE (q == 0)) return 0; - my_fifo_events = smm->free_event_vector[my_thread_index]; + my_fifo_events = smm->free_event_vector[thread_index]; /* min number of events we can dequeue without blocking */ n_to_dequeue = q->cursize; - my_pending_event_vector = smm->pending_event_vector[my_thread_index]; - pending_disconnects = smm->pending_disconnects[my_thread_index]; + my_pending_event_vector = smm->pending_event_vector[thread_index]; if (!n_to_dequeue && !vec_len (my_pending_event_vector) - && !vec_len (pending_disconnects)) + && !vec_len (smm->pending_disconnects[thread_index])) return 0; SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0); @@ -746,11 +755,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, pthread_mutex_unlock (&q->mutex); vec_append (my_fifo_events, my_pending_event_vector); - vec_append (my_fifo_events, smm->pending_disconnects[my_thread_index]); + vec_append (my_fifo_events, smm->pending_disconnects[thread_index]); _vec_len (my_pending_event_vector) = 0; - smm->pending_event_vector[my_thread_index] = my_pending_event_vector; - _vec_len (smm->pending_disconnects[my_thread_index]) = 0; + smm->pending_event_vector[thread_index] = my_pending_event_vector; + _vec_len (smm->pending_disconnects[thread_index]) = 0; skip_dequeue: n_events = vec_len (my_fifo_events); @@ -760,17 +769,22 @@ skip_dequeue: session_fifo_event_t *e0; e0 = &my_fifo_events[i]; - switch (e0->event_type) { case FIFO_EVENT_APP_TX: - s0 = session_event_get_session (e0, my_thread_index); + if (n_tx_packets == VLIB_FRAME_SIZE) + { + vec_add1 (smm->pending_event_vector[thread_index], *e0); + break; + } + s0 = session_event_get_session (e0, thread_index); if (PREDICT_FALSE (!s0)) { clib_warning ("It's dead, Jim!"); continue; } + /* Spray packets in per session type frames, since they go to * different nodes */ rv = (smm->session_tx_fns[s0->session_type]) (vm, node, e0, s0, @@ -784,18 +798,26 @@ skip_dequeue: } break; case FIFO_EVENT_DISCONNECT: - /* Make sure disconnects run after the pending list is drained */ + /* Make sure stream disconnects run after the pending list is + * drained */ s0 = session_get_from_handle (e0->session_handle); - if (!e0->postponed || svm_fifo_max_dequeue (s0->server_tx_fifo)) + if (!e0->postponed) { e0->postponed = 1; - vec_add1 (smm->pending_disconnects[my_thread_index], *e0); + vec_add1 (smm->pending_disconnects[thread_index], *e0); continue; } + /* If tx queue is still not empty, wait */ + if (svm_fifo_max_dequeue (s0->server_tx_fifo)) + { + vec_add1 (smm->pending_disconnects[thread_index], *e0); + continue; + } + stream_session_disconnect_transport (s0); break; case FIFO_EVENT_BUILTIN_RX: - s0 = session_event_get_session (e0, my_thread_index); + s0 = session_event_get_session (e0, thread_index); if (PREDICT_FALSE (!s0)) continue; svm_fifo_unset_event (s0->server_rx_fifo); @@ -813,12 +835,12 @@ skip_dequeue: } _vec_len (my_fifo_events) = 0; - smm->free_event_vector[my_thread_index] = my_fifo_events; + smm->free_event_vector[thread_index] = my_fifo_events; vlib_node_increment_counter (vm, session_queue_node.index, SESSION_QUEUE_ERROR_TX, n_tx_packets); - SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 1); + SESSION_EVT_DBG (SESSION_EVT_DISPATCH_END, smm, thread_index); return n_tx_packets; } @@ -855,6 +877,51 @@ session_queue_exit (vlib_main_t * vm) VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit); +static uword +session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt, + vlib_frame_t * f) +{ + f64 now, timeout = 1.0; + uword *event_data = 0; + uword event_type; + + while (1) + { + vlib_process_wait_for_event_or_clock (vm, timeout); + now = vlib_time_now (vm); + event_type = vlib_process_get_events (vm, (uword **) & event_data); + + switch (event_type) + { + case SESSION_Q_PROCESS_FLUSH_FRAMES: + /* Flush the frames by updating all transports times */ + transport_update_time (now, 0); + break; + case SESSION_Q_PROCESS_STOP: + timeout = 100000.0; + break; + case ~0: + /* Timed out. Update time for all transports to trigger all + * outstanding retransmits. */ + transport_update_time (now, 0); + break; + } + vec_reset_length (event_data); + } + return 0; +} + +/* *INDENT-OFF* */ +VLIB_REGISTER_NODE (session_queue_process_node) = +{ + .function = session_queue_process, + .type = VLIB_NODE_TYPE_PROCESS, + .name = "session-queue-process", + .state = VLIB_NODE_STATE_DISABLED, +}; +/* *INDENT-ON* */ + + /* * fd.io coding-style-patch-verification: ON *