X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession_node.c;h=b5f4321c9a8b3fc9f39c03424aab033c8392ae05;hb=6080e0d15e152e38811b01306eef6719a682c007;hp=a072bfa0f772e2e68bba8045b8ba096a5b80e344;hpb=11e9e351046d8f4ab61b8aaf975046215fba7c5d;p=vpp.git diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index a072bfa0f77..b5f4321c9a8 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -117,6 +117,7 @@ session_mq_connect_handler (void *data) a->sep_ext.parent_handle = mp->parent_handle; a->sep_ext.ckpair_index = mp->ckpair_index; a->sep_ext.crypto_engine = mp->crypto_engine; + a->sep_ext.flags = mp->flags; if (mp->hostname_len) { vec_validate (a->sep_ext.hostname, mp->hostname_len - 1); @@ -309,8 +310,8 @@ session_mq_reset_reply_handler (void *data) session_parse_handle (mp->handle, &index, &thread_index); s = session_get_if_valid (index, thread_index); - /* Session was already closed or already cleaned up */ - if (!s || s->session_state != SESSION_STATE_TRANSPORT_CLOSING) + /* No session or not the right session */ + if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING) return; app_wrk = app_worker_get (s->app_wrk_index); @@ -500,7 +501,7 @@ format_session_queue_trace (u8 * s, va_list * args) CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *); - s = format (s, "SESSION_QUEUE: session index %d, server thread index %d", + s = format (s, "session index %d thread index %d", t->session_index, t->server_thread_index); return s; } @@ -542,7 +543,7 @@ session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node, for (i = 0; i < clib_min (n_trace, n_segs); i++) { - b = vlib_get_buffer (vm, to_next[i - n_segs]); + b = vlib_get_buffer (vm, to_next[i]); vlib_trace_buffer (vm, node, next_index, b, 1 /* follow_chain */ ); t = vlib_add_trace (vm, node, b, sizeof (*t)); t->session_index = s->session_index; @@ -566,7 +567,7 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx, b->total_length_not_including_first_buffer = 0; chain_b = b; - left_from_seg = clib_min (ctx->snd_mss - b->current_length, + left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length, ctx->left_to_snd); to_deq = left_from_seg; for (j = 1; j < ctx->n_bufs_per_seg; j++) @@ -582,8 +583,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, - ctx->tx_offset, len_to_deq, data); - ctx->tx_offset += n_bytes_read; + ctx->sp.tx_offset, len_to_deq, data); + ctx->sp.tx_offset += n_bytes_read; } else { @@ -650,12 +651,12 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { - n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->tx_offset, + n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data0); ASSERT (n_bytes_read > 0); /* Keep track of progress locally, transport is also supposed to * increment it independently when pushing the header */ - ctx->tx_offset += n_bytes_read; + ctx->sp.tx_offset += n_bytes_read; } else { @@ -755,13 +756,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { /* Offset in rx fifo from where to peek data */ - ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc); - if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue)) + if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue)) { ctx->max_len_to_snd = 0; return; } - ctx->max_dequeue -= ctx->tx_offset; + ctx->max_dequeue -= ctx->sp.tx_offset; } else { @@ -781,38 +781,51 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, ASSERT (ctx->max_dequeue > 0); /* Ensure we're not writing more than transport window allows */ - if (ctx->max_dequeue < ctx->snd_space) + if (ctx->max_dequeue < ctx->sp.snd_space) { /* Constrained by tx queue. Try to send only fully formed segments */ - ctx->max_len_to_snd = - (ctx->max_dequeue > ctx->snd_mss) ? - ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue; + ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ? + (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) : + ctx->max_dequeue; /* TODO Nagle ? */ } else { /* Expectation is that snd_space0 is already a multiple of snd_mss */ - ctx->max_len_to_snd = ctx->snd_space; + ctx->max_len_to_snd = ctx->sp.snd_space; } /* Check if we're tx constrained by the node */ - ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss); + ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss); if (ctx->n_segs_per_evt > max_segs) { ctx->n_segs_per_evt = max_segs; - ctx->max_len_to_snd = max_segs * ctx->snd_mss; + ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss; } n_bytes_per_buf = vlib_buffer_get_default_data_size (vm); ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN); - n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->snd_mss; + n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss; ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf); - ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf); - ctx->deq_per_first_buf = clib_min (ctx->snd_mss, + ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf); + ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN); } +always_inline void +session_tx_maybe_reschedule (session_worker_t * wrk, + session_tx_context_t * ctx, + session_evt_elt_t * elt) +{ + session_t *s = ctx->s; + + svm_fifo_unset_event (s->tx_fifo); + if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset) + if (svm_fifo_set_event (s->tx_fifo)) + session_evt_add_head_old (wrk, elt); +} + always_inline int session_tx_fifo_read_and_snd_i (session_worker_t * wrk, vlib_node_runtime_t * node, @@ -859,27 +872,30 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)) return SESSION_TX_OK; max_burst -= n_custom_tx; - if (!max_burst) + if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX)) { session_evt_add_old (wrk, elt); return SESSION_TX_OK; } } - ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc); - if (PREDICT_FALSE (ctx->snd_mss == 0)) - { - session_evt_add_old (wrk, elt); - return SESSION_TX_NO_DATA; - } + transport_connection_snd_params (ctx->tc, &ctx->sp); - ctx->snd_space = transport_connection_snd_space (ctx->tc); - - /* This flow queue is "empty" so it should be re-evaluated before - * the ones that have data to send. */ - if (!ctx->snd_space) + if (!ctx->sp.snd_space) { - session_evt_add_head_old (wrk, elt); + /* If the deschedule flag was set, remove session from scheduler. + * Transport is responsible for rescheduling this session. */ + if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED) + transport_connection_deschedule (ctx->tc); + /* Request to postpone the session, e.g., zero-wnd and transport + * is not currently probing */ + else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE) + session_evt_add_old (wrk, elt); + /* This flow queue is "empty" so it should be re-evaluated before + * the ones that have data to send. */ + else + session_evt_add_head_old (wrk, elt); + return SESSION_TX_NO_DATA; } @@ -891,20 +907,18 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, session_evt_add_head_old (wrk, elt); return SESSION_TX_NO_DATA; } - snd_space = clib_min (ctx->snd_space, snd_space); - ctx->snd_space = snd_space >= ctx->snd_mss ? - snd_space - snd_space % ctx->snd_mss : snd_space; + snd_space = clib_min (ctx->sp.snd_space, snd_space); + ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ? + snd_space - snd_space % ctx->sp.snd_mss : snd_space; } - /* Allow enqueuing of a new event */ - svm_fifo_unset_event (ctx->s->tx_fifo); - /* Check how much we can pull. */ session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data); if (PREDICT_FALSE (!ctx->max_len_to_snd)) { transport_connection_tx_pacer_reset_bucket (ctx->tc, 0); + session_tx_maybe_reschedule (wrk, ctx, elt); return SESSION_TX_NO_DATA; } @@ -916,8 +930,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, { if (n_bufs) vlib_buffer_free (vm, wrk->tx_buffers, n_bufs); - if (svm_fifo_set_event (ctx->s->tx_fifo)) - session_evt_add_head_old (wrk, elt); + session_evt_add_head_old (wrk, elt); vlib_node_increment_counter (wrk->vm, node->node_index, SESSION_QUEUE_ERROR_NO_BUFFER, 1); return SESSION_TX_NO_BUFFERS; @@ -1001,11 +1014,14 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue, ctx->s->tx_fifo->has_event, wrk->last_vlib_time); - /* If we couldn't dequeue all bytes mark as partially read */ ASSERT (ctx->left_to_snd == 0); + + /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise, + * check if application enqueued more data and reschedule accordingly */ if (ctx->max_len_to_snd < ctx->max_dequeue) - if (svm_fifo_set_event (ctx->s->tx_fifo)) - session_evt_add_old (wrk, elt); + session_evt_add_old (wrk, elt); + else + session_tx_maybe_reschedule (wrk, ctx, elt); if (!peek_data && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM) @@ -1162,10 +1178,7 @@ session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node, case SESSION_IO_EVT_TX: s = session_event_get_session (e, thread_index); if (PREDICT_FALSE (!s)) - { - clib_warning ("session %u was freed!", e->session_index); - break; - } + break; CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD); wrk->ctx.s = s; /* Spray packets in per session type frames, since they go to @@ -1259,9 +1272,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, u32 thread_index = vm->thread_index, n_to_dequeue; session_worker_t *wrk = &smm->wrk[thread_index]; session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he; + clib_llist_index_t ei, next_ei, old_ti; svm_msg_q_msg_t _msg, *msg = &_msg; - clib_llist_index_t old_ti; - int i, n_tx_packets = 0; + int i, n_tx_packets; session_event_t *evt; svm_msg_q_t *mq; @@ -1274,6 +1287,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, * Update transport time */ transport_update_time (wrk->last_vlib_time, thread_index); + n_tx_packets = vec_len (wrk->pending_tx_buffers); /* * Dequeue and handle new events @@ -1316,24 +1330,14 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); old_ti = clib_llist_prev_index (old_he, evt_list); - /* *INDENT-OFF* */ - clib_llist_foreach_safe (wrk->event_elts, evt_list, new_he, elt, ({ - session_evt_type_t et; - - et = elt->evt.event_type; - clib_llist_remove (wrk->event_elts, evt_list, elt); - - /* Postpone tx events if we can't handle them this dispatch cycle */ - if (n_tx_packets >= VLIB_FRAME_SIZE - && (et == SESSION_IO_EVT_TX || et == SESSION_IO_EVT_TX_FLUSH)) - { - clib_llist_add (wrk->event_elts, evt_list, elt, new_he); - continue; - } - - session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets); - })); - /* *INDENT-ON* */ + ei = clib_llist_next_index (new_he, evt_list); + while (ei != wrk->new_head && n_tx_packets < VLIB_FRAME_SIZE) + { + elt = pool_elt_at_index (wrk->event_elts, ei); + ei = clib_llist_next_index (elt, evt_list); + clib_llist_remove (wrk->event_elts, evt_list, elt); + session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets); + } /* * Handle the old io events, if we had any prior to processing the new ones @@ -1341,8 +1345,6 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, if (old_ti != wrk->old_head) { - clib_llist_index_t ei, next_ei; - old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); ei = clib_llist_next_index (old_he, evt_list); @@ -1612,6 +1614,7 @@ session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, session_main_t *sm = &session_main; if (!sm->wrk[0].vpp_event_queue) return 0; + node = vlib_node_get_runtime (vm, session_queue_node.index); return session_queue_node_fn (vm, node, frame); }