X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=8b268c00b6496c25a1e39b6ef8aa157c2ea961b1;hb=6080e0d15e152e38811b01306eef6719a682c007;hp=2cde1f99bb6dd410056b6a3be9ae5b453a2baebe;hpb=be237bf02382854118986e8ea84c7544e42023f2;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 2cde1f99bb6..8b268c00b64 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -36,7 +36,8 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, mq = session_main_get_vpp_event_queue (thread_index); if (PREDICT_FALSE (svm_msg_q_lock (mq))) return -1; - if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + if (PREDICT_FALSE (svm_msg_q_is_full (mq) + || svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) { svm_msg_q_unlock (mq); return -2; @@ -129,7 +130,8 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) if (!(s->flags & SESSION_F_CUSTOM_TX)) { s->flags |= SESSION_F_CUSTOM_TX; - if (svm_fifo_set_event (s->tx_fifo)) + if (svm_fifo_set_event (s->tx_fifo) + || transport_connection_is_descheduled (tc)) { session_worker_t *wrk; session_evt_elt_t *elt; @@ -140,10 +142,24 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) elt = session_evt_alloc_old (wrk); elt->evt.session_index = tc->s_index; elt->evt.event_type = SESSION_IO_EVT_TX; + tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED; } } } +void +sesssion_reschedule_tx (transport_connection_t * tc) +{ + session_worker_t *wrk = session_main_get_worker (tc->thread_index); + session_evt_elt_t *elt; + + ASSERT (tc->thread_index == vlib_get_thread_index ()); + + elt = session_evt_alloc_new (wrk); + elt->evt.session_index = tc->s_index; + elt->evt.event_type = SESSION_IO_EVT_TX; +} + static void session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt) { @@ -205,6 +221,32 @@ session_free (session_t * s) pool_put (session_main.wrk[s->thread_index].sessions, s); } +u8 +session_is_valid (u32 si, u8 thread_index) +{ + session_t *s; + transport_connection_t *tc; + + s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si); + + if (!s) + return 1; + + if (s->thread_index != thread_index || s->session_index != si) + return 0; + + if (s->session_state == SESSION_STATE_TRANSPORT_DELETED + || s->session_state <= SESSION_STATE_LISTENING) + return 1; + + tc = session_get_transport (s); + if (s->connection_index != tc->c_index + || s->thread_index != tc->thread_index || tc->s_index != si) + return 0; + + return 1; +} + static void session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf) { @@ -369,6 +411,24 @@ session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b, return 0; } +void +session_fifo_tuning (session_t * s, svm_fifo_t * f, + session_ft_action_t act, u32 len) +{ + if (s->flags & SESSION_F_CUSTOM_FIFO_TUNING) + { + app_worker_t *app_wrk = app_worker_get (s->app_wrk_index); + app_worker_session_fifo_tuning (app_wrk, s, f, act, len); + if (CLIB_ASSERT_ENABLE) + { + segment_manager_t *sm; + sm = segment_manager_get (f->segment_manager); + ASSERT (f->size >= 4096); + ASSERT (f->size <= sm->max_fifo_size); + } + } +} + /* * Enqueue data for delivery to session peer. Does not notify peer of enqueue * event but on request can queue notification events for later delivery by @@ -431,6 +491,8 @@ session_enqueue_stream_connection (transport_connection_t * tc, s->flags |= SESSION_F_RX_EVT; vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index); } + + session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0); } return enqueued; @@ -468,6 +530,8 @@ session_enqueue_dgram_connection (session_t * s, s->flags |= SESSION_F_RX_EVT; vec_add1 (wrk->session_to_enqueue[proto], s->session_index); } + + session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0); } return enqueued; } @@ -487,6 +551,7 @@ session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes) u32 rv; rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes); + session_fifo_tuning (s, s->tx_fifo, SESSION_FT_ACTION_DEQUEUED, rv); if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes)) session_dequeue_notify (s); @@ -647,6 +712,9 @@ session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index) continue; } + session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, + 0 /* TODO/not needed */ ); + if (PREDICT_FALSE (session_enqueue_notify_inline (s))) errors++; } @@ -864,7 +932,8 @@ session_transport_delete_notify (transport_connection_t * tc) /* Session was created but accept notification was not yet sent to the * app. Cleanup everything. */ session_lookup_del_session (s); - session_free_w_fifos (s); + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); break; case SESSION_STATE_ACCEPTING: case SESSION_STATE_TRANSPORT_CLOSING: @@ -887,7 +956,7 @@ session_transport_delete_notify (transport_connection_t * tc) * session is just removed because both transport and app have * confirmed the close*/ session_lookup_del_session (s); - s->session_state = SESSION_STATE_CLOSED; + s->session_state = SESSION_STATE_TRANSPORT_DELETED; session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT); svm_fifo_dequeue_drop_all (s->tx_fifo); session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE); @@ -975,7 +1044,14 @@ session_stream_accept_notify (transport_connection_t * tc) if (!app_wrk) return -1; s->session_state = SESSION_STATE_ACCEPTING; - return app_worker_accept_notify (app_wrk, s); + if (app_worker_accept_notify (app_wrk, s)) + { + /* On transport delete, no notifications should be sent. Unless, the + * accept is retried and successful. */ + s->session_state = SESSION_STATE_CREATED; + return -1; + } + return 0; } /** @@ -1282,11 +1358,13 @@ session_transport_cleanup (session_t * s) { /* Delete from main lookup table before we axe the the transport */ session_lookup_del_session (s); - transport_cleanup (session_get_transport_proto (s), s->connection_index, - s->thread_index); + if (s->session_state != SESSION_STATE_TRANSPORT_DELETED) + transport_cleanup (session_get_transport_proto (s), s->connection_index, + s->thread_index); /* Since we called cleanup, no delete notification will come. So, make * sure the session is properly freed. */ - session_free_w_fifos (s); + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); } /** @@ -1303,7 +1381,6 @@ session_vpp_event_queues_allocate (session_main_t * smm) { u32 evt_q_length = 2048, evt_size = sizeof (session_event_t); ssvm_private_t *eqs = &smm->evt_qs_segment; - api_main_t *am = &api_main; uword eqs_size = 64 << 20; pid_t vpp_pid = getpid (); void *oldheap; @@ -1333,7 +1410,7 @@ session_vpp_event_queues_allocate (session_main_t * smm) if (smm->evt_qs_use_memfd_seg) oldheap = ssvm_push_heap (eqs->sh); else - oldheap = svm_push_data_heap (am->vlib_rp); + oldheap = vl_msg_push_heap (); for (i = 0; i < vec_len (smm->wrk); i++) { @@ -1358,7 +1435,7 @@ session_vpp_event_queues_allocate (session_main_t * smm) if (smm->evt_qs_use_memfd_seg) ssvm_pop_heap (oldheap); else - svm_pop_heap (oldheap); + vl_msg_pop_heap (oldheap); } ssvm_private_t * @@ -1492,6 +1569,7 @@ session_manager_main_enable (vlib_main_t * vm) wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->vm = vlib_mains[i]; wrk->last_vlib_time = vlib_time_now (vlib_mains[i]); + wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ; if (num_threads > 1) clib_rwlock_init (&smm->wrk[i].peekers_rw_locks); @@ -1535,7 +1613,6 @@ session_manager_main_enable (vlib_main_t * vm) /* Enable transports */ transport_enable_disable (vm, 1); - transport_init_tx_pacers_period (); return 0; } @@ -1607,10 +1684,25 @@ session_manager_main_init (vlib_main_t * vm) smm->evt_qs_segment_size = 1 << 20; #endif smm->is_enabled = 0; + smm->session_enable_asap = 0; + return 0; +} + +static clib_error_t * +session_main_init (vlib_main_t * vm) +{ + session_main_t *smm = &session_main; + if (smm->session_enable_asap) + { + vlib_worker_thread_barrier_sync (vm); + vnet_session_enable_disable (vm, 1 /* is_en */ ); + vlib_worker_thread_barrier_release (vm); + } return 0; } VLIB_INIT_FUNCTION (session_manager_main_init); +VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_init); static clib_error_t * session_config_fn (vlib_main_t * vm, unformat_input_t * input) @@ -1692,7 +1784,7 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input) &smm->evt_qs_segment_size)) ; else if (unformat (input, "enable")) - vnet_session_enable_disable (vm, 1 /* is_en */ ); + smm->session_enable_asap = 1; else return clib_error_return (0, "unknown input `%U'", format_unformat_error, input);