X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession_node.c;h=5f765784d26020fcdd287a2a533c19ffdeadd029;hb=2de9c0f92;hp=fde1931aaf979cccb67185b1c8d5a2ae4ab4867a;hpb=79f89537c6fd3baeac03354a3381f42895fe2ca8;p=vpp.git diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index fde1931aaf9..5f765784d26 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -55,6 +55,7 @@ session_mq_listen_handler (void *data) a->sep.sw_if_index = ENDPOINT_INVALID_INDEX; a->sep.transport_proto = mp->proto; a->sep_ext.ckpair_index = mp->ckpair_index; + a->sep_ext.crypto_engine = mp->crypto_engine; a->app_index = app->app_index; a->wrk_map_index = mp->wrk_index; @@ -111,9 +112,12 @@ session_mq_connect_handler (void *data) a->sep.port = mp->port; a->sep.transport_proto = mp->proto; a->sep.peer.fib_index = mp->vrf; + clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip)); a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX; a->sep_ext.parent_handle = mp->parent_handle; a->sep_ext.ckpair_index = mp->ckpair_index; + a->sep_ext.crypto_engine = mp->crypto_engine; + a->sep_ext.flags = mp->flags; if (mp->hostname_len) { vec_validate (a->sep_ext.hostname, mp->hostname_len - 1); @@ -306,8 +310,8 @@ session_mq_reset_reply_handler (void *data) session_parse_handle (mp->handle, &index, &thread_index); s = session_get_if_valid (index, thread_index); - /* Session was already closed or already cleaned up */ - if (!s || s->session_state != SESSION_STATE_TRANSPORT_CLOSING) + /* No session or not the right session */ + if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING) return; app_wrk = app_worker_get (s->app_wrk_index); @@ -497,7 +501,7 @@ format_session_queue_trace (u8 * s, va_list * args) CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *); - s = format (s, "SESSION_QUEUE: session index %d, server thread index %d", + s = format (s, "session index %d thread index %d", t->session_index, t->server_thread_index); return s; } @@ -539,7 +543,7 @@ session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node, for (i = 0; i < clib_min (n_trace, n_segs); i++) { - b = vlib_get_buffer (vm, to_next[i - n_segs]); + b = vlib_get_buffer (vm, to_next[i]); vlib_trace_buffer (vm, node, next_index, b, 1 /* follow_chain */ ); t = vlib_add_trace (vm, node, b, sizeof (*t)); t->session_index = s->session_index; @@ -810,14 +814,26 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, TRANSPORT_MAX_HDRS_LEN); } +always_inline void +session_tx_maybe_reschedule (session_worker_t * wrk, + session_tx_context_t * ctx, + session_evt_elt_t * elt, u8 is_peek) +{ + session_t *s = ctx->s; + + svm_fifo_unset_event (s->tx_fifo); + if (svm_fifo_max_dequeue_cons (s->tx_fifo) > is_peek ? ctx->tx_offset : 0) + if (svm_fifo_set_event (s->tx_fifo)) + session_evt_add_head_old (wrk, elt); +} + always_inline int session_tx_fifo_read_and_snd_i (session_worker_t * wrk, vlib_node_runtime_t * node, session_evt_elt_t * elt, int *n_tx_packets, u8 peek_data) { - u32 next_index, next0, next1, *to_next, n_left_to_next, max_burst; - u32 n_trace, n_bufs_needed = 0, n_left, pbi; + u32 n_trace, n_bufs_needed = 0, n_left, pbi, next_index, max_burst; session_tx_context_t *ctx = &wrk->ctx; session_main_t *smm = &session_main; session_event_t *e = &elt->evt; @@ -834,7 +850,6 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, } next_index = smm->session_type_to_next[ctx->s->session_type]; - next0 = next1 = next_index; max_burst = VLIB_FRAME_SIZE - *n_tx_packets; tp = session_get_transport_proto (ctx->s); @@ -845,6 +860,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, { if (ctx->transport_vft->flush_data) ctx->transport_vft->flush_data (ctx->tc); + e->event_type = SESSION_IO_EVT_TX; } if (ctx->s->flags & SESSION_F_CUSTOM_TX) @@ -865,25 +881,44 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, } ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc); - ctx->snd_space = transport_connection_snd_space (ctx->tc, - vm->clib_time. - last_cpu_time, - ctx->snd_mss); - - if (ctx->snd_space == 0 || ctx->snd_mss == 0) + if (PREDICT_FALSE (ctx->snd_mss == 0)) { session_evt_add_old (wrk, elt); return SESSION_TX_NO_DATA; } - /* Allow enqueuing of a new event */ - svm_fifo_unset_event (ctx->s->tx_fifo); + ctx->snd_space = transport_connection_snd_space (ctx->tc); + + /* This flow queue is "empty" so it should be re-evaluated before + * the ones that have data to send. */ + if (!ctx->snd_space) + { + session_evt_add_head_old (wrk, elt); + return SESSION_TX_NO_DATA; + } + + if (transport_connection_is_tx_paced (ctx->tc)) + { + u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc); + if (snd_space < TRANSPORT_PACER_MIN_BURST) + { + session_evt_add_head_old (wrk, elt); + return SESSION_TX_NO_DATA; + } + snd_space = clib_min (ctx->snd_space, snd_space); + ctx->snd_space = snd_space >= ctx->snd_mss ? + snd_space - snd_space % ctx->snd_mss : snd_space; + } /* Check how much we can pull. */ session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data); if (PREDICT_FALSE (!ctx->max_len_to_snd)) - return SESSION_TX_NO_DATA; + { + transport_connection_tx_pacer_reset_bucket (ctx->tc, 0); + session_tx_maybe_reschedule (wrk, ctx, elt, peek_data); + return SESSION_TX_NO_DATA; + } n_bufs_needed = ctx->n_segs_per_evt * ctx->n_bufs_per_seg; vec_validate_aligned (wrk->tx_buffers, n_bufs_needed - 1, @@ -893,21 +928,12 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, { if (n_bufs) vlib_buffer_free (vm, wrk->tx_buffers, n_bufs); - session_evt_add_old (wrk, elt); + session_evt_add_head_old (wrk, elt); vlib_node_increment_counter (wrk->vm, node->node_index, SESSION_QUEUE_ERROR_NO_BUFFER, 1); return SESSION_TX_NO_BUFFERS; } - /* - * Write until we fill up a frame - */ - vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); - if (PREDICT_FALSE (ctx->n_segs_per_evt > n_left_to_next)) - { - ctx->n_segs_per_evt = n_left_to_next; - ctx->max_len_to_snd = ctx->snd_mss * n_left_to_next; - } ctx->left_to_snd = ctx->max_len_to_snd; n_left = ctx->n_segs_per_evt; @@ -923,8 +949,8 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, pb = vlib_get_buffer (vm, pbi); vlib_prefetch_buffer_header (pb, STORE); - to_next[0] = bi0 = wrk->tx_buffers[--n_bufs]; - to_next[1] = bi1 = wrk->tx_buffers[--n_bufs]; + bi0 = wrk->tx_buffers[--n_bufs]; + bi1 = wrk->tx_buffers[--n_bufs]; b0 = vlib_get_buffer (vm, bi0); b1 = vlib_get_buffer (vm, bi1); @@ -935,16 +961,15 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, ctx->transport_vft->push_header (ctx->tc, b0); ctx->transport_vft->push_header (ctx->tc, b1); - to_next += 2; - n_left_to_next -= 2; n_left -= 2; VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1); - vlib_validate_buffer_enqueue_x2 (vm, node, next_index, to_next, - n_left_to_next, bi0, bi1, next0, - next1); + vec_add1 (wrk->pending_tx_buffers, bi0); + vec_add1 (wrk->pending_tx_buffers, bi1); + vec_add1 (wrk->pending_tx_nexts, next_index); + vec_add1 (wrk->pending_tx_nexts, next_index); } while (n_left) { @@ -958,7 +983,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, vlib_prefetch_buffer_header (pb, STORE); } - to_next[0] = bi0 = wrk->tx_buffers[--n_bufs]; + bi0 = wrk->tx_buffers[--n_bufs]; b0 = vlib_get_buffer (vm, bi0); session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data); @@ -966,18 +991,16 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, * total_length_not_including_first_buffer are updated */ ctx->transport_vft->push_header (ctx->tc, b0); - to_next += 1; - n_left_to_next -= 1; n_left -= 1; VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); - vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, - n_left_to_next, bi0, next0); + vec_add1 (wrk->pending_tx_buffers, bi0); + vec_add1 (wrk->pending_tx_nexts, next_index); } if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0)) - session_tx_trace_frame (vm, node, next_index, to_next, + session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers, ctx->n_segs_per_evt, ctx->s, n_trace); if (PREDICT_FALSE (n_bufs)) @@ -985,16 +1008,18 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, *n_tx_packets += ctx->n_segs_per_evt; transport_connection_update_tx_bytes (ctx->tc, ctx->max_len_to_snd); - vlib_put_next_frame (vm, node, next_index, n_left_to_next); SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue, ctx->s->tx_fifo->has_event, wrk->last_vlib_time); - /* If we couldn't dequeue all bytes mark as partially read */ ASSERT (ctx->left_to_snd == 0); + + /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise, + * check if application enqueued more data and reschedule accordingly */ if (ctx->max_len_to_snd < ctx->max_dequeue) - if (svm_fifo_set_event (ctx->s->tx_fifo)) - session_evt_add_old (wrk, elt); + session_evt_add_old (wrk, elt); + else + session_tx_maybe_reschedule (wrk, ctx, elt, peek_data); if (!peek_data && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM) @@ -1151,10 +1176,7 @@ session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node, case SESSION_IO_EVT_TX: s = session_event_get_session (e, thread_index); if (PREDICT_FALSE (!s)) - { - clib_warning ("session %u was freed!", e->session_index); - break; - } + break; CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD); wrk->ctx.s = s; /* Spray packets in per session type frames, since they go to @@ -1229,6 +1251,17 @@ session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt) } } +static void +session_flush_pending_tx_buffers (session_worker_t * wrk, + vlib_node_runtime_t * node) +{ + vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers, + wrk->pending_tx_nexts, + vec_len (wrk->pending_tx_nexts)); + vec_reset_length (wrk->pending_tx_buffers); + vec_reset_length (wrk->pending_tx_nexts); +} + static uword session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame) @@ -1237,20 +1270,22 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, u32 thread_index = vm->thread_index, n_to_dequeue; session_worker_t *wrk = &smm->wrk[thread_index]; session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he; + clib_llist_index_t ei, next_ei, old_ti; svm_msg_q_msg_t _msg, *msg = &_msg; - clib_llist_index_t old_ti; - int i, n_tx_packets = 0; + int i, n_tx_packets; session_event_t *evt; svm_msg_q_t *mq; SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk); wrk->last_vlib_time = vlib_time_now (vm); + wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ; /* * Update transport time */ transport_update_time (wrk->last_vlib_time, thread_index); + n_tx_packets = vec_len (wrk->pending_tx_buffers); /* * Dequeue and handle new events @@ -1290,45 +1325,45 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, */ new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); + old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); + old_ti = clib_llist_prev_index (old_he, evt_list); - /* *INDENT-OFF* */ - clib_llist_foreach_safe (wrk->event_elts, evt_list, new_he, elt, ({ - session_evt_type_t et; - - et = elt->evt.event_type; - clib_llist_remove (wrk->event_elts, evt_list, elt); - - /* Postpone tx events if we can't handle them this dispatch cycle */ - if (n_tx_packets >= VLIB_FRAME_SIZE - && (et == SESSION_IO_EVT_TX || et == SESSION_IO_EVT_TX_FLUSH)) - { - clib_llist_add (wrk->event_elts, evt_list, elt, new_he); - continue; - } - - session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets); - })); - /* *INDENT-ON* */ + ei = clib_llist_next_index (new_he, evt_list); + while (ei != wrk->new_head && n_tx_packets < VLIB_FRAME_SIZE) + { + elt = pool_elt_at_index (wrk->event_elts, ei); + ei = clib_llist_next_index (elt, evt_list); + clib_llist_remove (wrk->event_elts, evt_list, elt); + session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets); + } /* - * Handle the old io events + * Handle the old io events, if we had any prior to processing the new ones */ - old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); - old_ti = clib_llist_prev_index (old_he, evt_list); - - while (!clib_llist_is_empty (wrk->event_elts, evt_list, old_he)) + if (old_ti != wrk->old_head) { - clib_llist_index_t ei; + old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); + ei = clib_llist_next_index (old_he, evt_list); - clib_llist_pop_first (wrk->event_elts, evt_list, elt, old_he); - ei = clib_llist_entry_index (wrk->event_elts, elt); - session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets); + while (n_tx_packets < VLIB_FRAME_SIZE) + { + elt = pool_elt_at_index (wrk->event_elts, ei); + next_ei = clib_llist_next_index (elt, evt_list); + clib_llist_remove (wrk->event_elts, evt_list, elt); - old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); - if (n_tx_packets >= VLIB_FRAME_SIZE || ei == old_ti) - break; - }; + session_event_dispatch_io (wrk, node, elt, thread_index, + &n_tx_packets); + + if (ei == old_ti) + break; + + ei = next_ei; + }; + } + + if (vec_len (wrk->pending_tx_buffers)) + session_flush_pending_tx_buffers (wrk, node); vlib_node_increment_counter (vm, session_queue_node.index, SESSION_QUEUE_ERROR_TX, n_tx_packets); @@ -1420,6 +1455,8 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f) case SESSION_IO_EVT_RX: case SESSION_IO_EVT_TX: case SESSION_IO_EVT_BUILTIN_RX: + case SESSION_IO_EVT_BUILTIN_TX: + case SESSION_IO_EVT_TX_FLUSH: if (e->session_index == f->master_session_index) return 1; break; @@ -1469,8 +1506,7 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) found = session_node_cmp_event (e, f); if (found) return 1; - if (++index == mq->q->maxsize) - index = 0; + index = (index + 1) % mq->q->maxsize; } /* * Search pending events vector @@ -1478,18 +1514,31 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) /* *INDENT-OFF* */ clib_llist_foreach (wrk->event_elts, evt_list, - pool_elt_at_index (wrk->event_elts, wrk->old_head), + pool_elt_at_index (wrk->event_elts, wrk->new_head), elt, ({ found = session_node_cmp_event (&elt->evt, f); if (found) { clib_memcpy_fast (e, &elt->evt, sizeof (*e)); - break; + goto done; } + })); + /* *INDENT-ON* */ + /* *INDENT-OFF* */ + clib_llist_foreach (wrk->event_elts, evt_list, + pool_elt_at_index (wrk->event_elts, wrk->old_head), + elt, ({ + found = session_node_cmp_event (&elt->evt, f); + if (found) + { + clib_memcpy_fast (e, &elt->evt, sizeof (*e)); + goto done; + } })); /* *INDENT-ON* */ +done: return found; } @@ -1563,6 +1612,7 @@ session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, session_main_t *sm = &session_main; if (!sm->wrk[0].vpp_event_queue) return 0; + node = vlib_node_get_runtime (vm, session_queue_node.index); return session_queue_node_fn (vm, node, frame); }