X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession_node.c;h=880f16388b87f2d38878f336a5ca48c7cd7b8865;hb=30e79c2e388a98160a3660f4f03103890c9b1b7c;hp=64c873cc75831644d5d20282c5d6172b69ef2fdb;hpb=7ac053b27fee8f9e437cf7b61357943356381061;p=vpp.git diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 64c873cc758..880f16388b8 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -29,6 +29,7 @@ session_mq_accepted_reply_handler (void *data) { session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data; vnet_disconnect_args_t _a = { 0 }, *a = &_a; + stream_session_state_t old_state; app_worker_t *app_wrk; local_session_t *ls; stream_session_t *s; @@ -65,25 +66,35 @@ session_mq_accepted_reply_handler (void *data) { s = session_get_from_handle_if_valid (mp->handle); if (!s) - { - clib_warning ("session doesn't exist"); - return; - } + return; + app_wrk = app_worker_get (s->app_wrk_index); if (app_wrk->app_index != mp->context) { clib_warning ("app doesn't own session"); return; } + + old_state = s->session_state; s->session_state = SESSION_STATE_READY; if (!svm_fifo_is_empty (s->server_rx_fifo)) app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX); + + /* Closed while waiting for app to reply. Resend disconnect */ + if (old_state >= SESSION_STATE_TRANSPORT_CLOSING) + { + application_t *app = application_get (app_wrk->app_index); + app->cb_fns.session_disconnect_callback (s); + s->session_state = old_state; + return; + } } } static void session_mq_reset_reply_handler (void *data) { + vnet_disconnect_args_t _a = { 0 }, *a = &_a; session_reset_reply_msg_t *mp; app_worker_t *app_wrk; stream_session_t *s; @@ -91,17 +102,17 @@ session_mq_reset_reply_handler (void *data) u32 index, thread_index; mp = (session_reset_reply_msg_t *) data; - app = application_lookup (mp->client_index); + app = application_lookup (mp->context); if (!app) return; session_parse_handle (mp->handle, &index, &thread_index); s = session_get_if_valid (index, thread_index); - if (!s) - { - clib_warning ("Invalid session!"); - return; - } + + /* Session was already closed or already cleaned up */ + if (!s || s->session_state != SESSION_STATE_TRANSPORT_CLOSING) + return; + app_wrk = app_worker_get (s->app_wrk_index); if (!app_wrk || app_wrk->app_index != app->app_index) { @@ -119,7 +130,9 @@ session_mq_reset_reply_handler (void *data) /* This comes as a response to a reset, transport only waiting for * confirmation to remove connection state, no need to disconnect */ - stream_session_cleanup (s); + a->handle = mp->handle; + a->app_index = app->app_index; + vnet_disconnect_session (a); } static void @@ -160,7 +173,7 @@ session_mq_disconnected_handler (void *data) svm_msg_q_unlock (app_wrk->event_queue); evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); clib_memset (evt, 0, sizeof (*evt)); - evt->event_type = SESSION_CTRL_EVT_DISCONNECTED; + evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY; rmp = (session_disconnected_reply_msg_t *) evt->data; rmp->handle = mp->handle; rmp->context = mp->context; @@ -194,6 +207,86 @@ session_mq_disconnected_reply_handler (void *data) } } +static void +session_mq_worker_update_handler (void *data) +{ + session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data; + session_worker_update_reply_msg_t *rmp; + svm_msg_q_msg_t _msg, *msg = &_msg; + app_worker_t *app_wrk; + u32 owner_app_wrk_map; + session_event_t *evt; + stream_session_t *s; + application_t *app; + + app = application_lookup (mp->client_index); + if (!app) + return; + if (!(s = session_get_from_handle_if_valid (mp->handle))) + { + clib_warning ("invalid handle %llu", mp->handle); + return; + } + app_wrk = app_worker_get (s->app_wrk_index); + if (app_wrk->app_index != app->app_index) + { + clib_warning ("app %u does not own session %llu", app->app_index, + mp->handle); + return; + } + owner_app_wrk_map = app_wrk->wrk_map_index; + app_wrk = application_get_worker (app, mp->wrk_index); + + /* This needs to come from the new owner */ + if (mp->req_wrk_index == owner_app_wrk_map) + { + session_req_worker_update_msg_t *wump; + + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, + SESSION_MQ_CTRL_EVT_RING, + SVM_Q_WAIT, msg); + svm_msg_q_unlock (app_wrk->event_queue); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); + clib_memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE; + wump = (session_req_worker_update_msg_t *) evt->data; + wump->session_handle = mp->handle; + svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT); + return; + } + + app_worker_own_session (app_wrk, s); + + /* + * Send reply + */ + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, + SESSION_MQ_CTRL_EVT_RING, + SVM_Q_WAIT, msg); + svm_msg_q_unlock (app_wrk->event_queue); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); + clib_memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY; + rmp = (session_worker_update_reply_msg_t *) evt->data; + rmp->handle = mp->handle; + rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo); + rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo); + rmp->segment_handle = session_segment_handle (s); + svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT); + + /* + * Retransmit messages that may have been lost + */ + if (!svm_fifo_is_empty (s->server_tx_fifo)) + session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX); + + if (!svm_fifo_is_empty (s->server_rx_fifo)) + app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX); + + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + app->cb_fns.session_disconnect_callback (s); +} + vlib_node_registration_t session_queue_node; typedef struct @@ -445,7 +538,7 @@ session_tx_not_ready (stream_session_t * s, u8 peek_data) * session is not ready or closed */ if (s->session_state < SESSION_STATE_READY) return 1; - if (s->session_state == SESSION_STATE_CLOSED) + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED) return 2; } return 0; @@ -528,8 +621,7 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, ctx->max_len_to_snd = max_segs * ctx->snd_mss; } - n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm, - VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX); + n_bytes_per_buf = VLIB_BUFFER_DATA_SIZE; ASSERT (n_bytes_per_buf > MAX_HDRS_LEN); n_bytes_per_seg = MAX_HDRS_LEN + ctx->snd_mss; ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf); @@ -569,9 +661,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, ctx->transport_vft = transport_protocol_get_vft (tp); ctx->tc = session_tx_get_transport (ctx, peek_data); ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc); - ctx->snd_space = - transport_connection_snd_space (ctx->tc, vm->clib_time.last_cpu_time, - ctx->snd_mss); + + if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH)) + { + if (ctx->transport_vft->flush_data) + ctx->transport_vft->flush_data (ctx->tc); + } + + ctx->snd_space = transport_connection_snd_space (ctx->tc, + vm->clib_time. + last_cpu_time, + ctx->snd_mss); if (ctx->snd_space == 0 || ctx->snd_mss == 0) { vec_add1 (wrk->pending_event_vector, *e); @@ -751,7 +851,7 @@ static void session_update_dispatch_period (session_manager_worker_t * wrk, f64 now, u32 thread_index) { - if (wrk->last_tx_packets > 1) + if (wrk->last_tx_packets) { f64 sample = now - wrk->last_vlib_time; wrk->dispatch_period = (wrk->dispatch_period + sample) * 0.5; @@ -800,7 +900,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, { vec_add2 (fifo_events, e, 1); svm_msg_q_sub_w_lock (mq, msg); - clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e)); + /* Works because reply messages are smaller than a session evt. + * If we ever need to support bigger messages this needs to be + * fixed */ + clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), sizeof (*e)); svm_msg_q_free_msg (mq, msg); } svm_msg_q_unlock (mq); @@ -825,6 +928,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, e = &fifo_events[i]; switch (e->event_type) { + case SESSION_IO_EVT_TX_FLUSH: case FIFO_EVENT_APP_TX: /* Don't try to send more that one frame per dispatch cycle */ if (n_tx_packets == VLIB_FRAME_SIZE) @@ -861,23 +965,24 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, } break; case FIFO_EVENT_DISCONNECT: - /* Make sure stream disconnects run after the pending list is - * drained */ - s = session_get_from_handle (e->session_handle); - if (!e->postponed) - { - e->postponed = 1; - vec_add1 (wrk->pending_disconnects, *e); - continue; - } - /* If tx queue is still not empty, wait */ - if (svm_fifo_max_dequeue (s->server_tx_fifo)) + s = session_get_from_handle_if_valid (e->session_handle); + if (PREDICT_FALSE (!s)) + break; + + /* Make sure session disconnects run after the pending list is + * drained, i.e., postpone if the first time. If not the first + * and the tx queue is still not empty, try to wait for some + * dispatch cycles */ + if (!e->postponed + || (e->postponed < 200 + && svm_fifo_max_dequeue (s->server_tx_fifo))) { + e->postponed += 1; vec_add1 (wrk->pending_disconnects, *e); continue; } - stream_session_disconnect_transport (s); + session_transport_close (s); break; case FIFO_EVENT_BUILTIN_RX: s = session_event_get_session (e, thread_index); @@ -911,6 +1016,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, case SESSION_CTRL_EVT_RESET_REPLY: session_mq_reset_reply_handler (e->data); break; + case SESSION_CTRL_EVT_WORKER_UPDATE: + session_mq_worker_update_handler (e->data); + break; default: clib_warning ("unhandled event type %d", e->event_type); } @@ -961,7 +1069,7 @@ dump_thread_0_event_queue (void) { msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index); ring = svm_msg_q_ring (mq, msg->ring_index); - clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize); + clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize); switch (e->event_type) { @@ -983,7 +1091,8 @@ dump_thread_0_event_queue (void) case FIFO_EVENT_RPC: fformat (stdout, "[%04d] RPC call %llx with %llx\n", - i, (u64) (e->rpc_args.fp), (u64) (e->rpc_args.arg)); + i, (u64) (uword) (e->rpc_args.fp), + (u64) (uword) (e->rpc_args.arg)); break; default: @@ -1053,7 +1162,7 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) { msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index); ring = svm_msg_q_ring (mq, msg->ring_index); - clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize); + clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize); found = session_node_cmp_event (e, f); if (found) return 1; @@ -1069,7 +1178,7 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) found = session_node_cmp_event (evt, f); if (found) { - clib_memcpy (e, evt, sizeof (*evt)); + clib_memcpy_fast (e, evt, sizeof (*evt)); break; } }