X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvcl%2Fvppcom.c;h=dcbbfc44a390815718edd5a931809a71d1ef78f0;hb=369db83f91a411977015748a74c5a4579170a16c;hp=e70aa0017723cd84f244af062e15f65b007ea30e;hpb=317b8e08367c769b90463613231b9fcfad486098;p=vpp.git diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index e70aa001772..dcbbfc44a39 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -15,10 +15,10 @@ #include #include -#include #include #include #include +#include __thread uword __vcl_worker_index = ~0; @@ -44,13 +44,13 @@ vcl_wait_for_segment (u64 segment_handle) } static inline int -vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq) +vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg) { svm_msg_q_msg_t *msg; u32 n_msgs; int i; - n_msgs = svm_msg_q_size (mq); + n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg); for (i = 0; i < n_msgs; i++) { vec_add2 (wrk->mq_msg_vector, msg, 1); @@ -260,7 +260,8 @@ vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, } static u32 -vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp) +vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp, + u32 ls_index) { vcl_session_t *session, *listen_session; svm_fifo_t *rx_fifo, *tx_fifo; @@ -269,29 +270,23 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp) session = vcl_session_alloc (wrk); - listen_session = vcl_session_table_lookup_listener (wrk, - mp->listener_handle); - if (!listen_session) + listen_session = vcl_session_get (wrk, ls_index); + if (listen_session->vpp_handle != mp->listener_handle) { - evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); - VDBG (0, "ERROR: couldn't find listen session: unknown vpp listener " - "handle %llx", mp->listener_handle); - vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle, - VNET_API_ERROR_INVALID_ARGUMENT); - vcl_session_free (wrk, session); - return VCL_INVALID_SESSION_INDEX; + VDBG (0, "ERROR: listener handle %lu does not match session %u", + mp->listener_handle, ls_index); + goto error; } - rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); - tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); - if (vcl_wait_for_segment (mp->segment_handle)) { - VDBG (0, "segment for session %u couldn't be mounted!", + VDBG (0, "ERROR: segment for session %u couldn't be mounted!", session->session_index); - return VCL_INVALID_SESSION_INDEX; + goto error; } + rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); + tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); rx_fifo->client_session_index = session->session_index; @@ -304,14 +299,13 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp) session->vpp_handle = mp->handle; session->vpp_thread_index = rx_fifo->master_thread_index; - session->client_context = mp->context; session->rx_fifo = rx_fifo; session->tx_fifo = tx_fifo; session->session_state = STATE_ACCEPT; - session->transport.rmt_port = mp->port; - session->transport.is_ip4 = mp->is_ip4; - clib_memcpy_fast (&session->transport.rmt_ip, mp->ip, + session->transport.rmt_port = mp->rmt.port; + session->transport.is_ip4 = mp->rmt.is_ip4; + clib_memcpy_fast (&session->transport.rmt_ip, &mp->rmt.ip, sizeof (ip46_address_t)); vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index); @@ -322,12 +316,22 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp) VDBG (1, "session %u [0x%llx]: client accept request from %s address %U" " port %d queue %p!", session->session_index, mp->handle, - mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip, - mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6, - clib_net_to_host_u16 (mp->port), session->vpp_evt_q); + mp->rmt.is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->rmt.ip, + mp->rmt.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6, + clib_net_to_host_u16 (mp->rmt.port), session->vpp_evt_q); vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index); + vcl_send_session_accepted_reply (session->vpp_evt_q, mp->context, + session->vpp_handle, 0); + return session->session_index; + +error: + evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); + vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle, + VNET_API_ERROR_INVALID_ARGUMENT); + vcl_session_free (wrk, session); + return VCL_INVALID_SESSION_INDEX; } static u32 @@ -393,10 +397,10 @@ vcl_session_connected_handler (vcl_worker_t * wrk, session->tx_fifo = tx_fifo; session->vpp_handle = mp->handle; session->vpp_thread_index = rx_fifo->master_thread_index; - session->transport.is_ip4 = mp->is_ip4; - clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip, + session->transport.is_ip4 = mp->lcl.is_ip4; + clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip, sizeof (session->transport.lcl_ip)); - session->transport.lcl_port = mp->lcl_port; + session->transport.lcl_port = mp->lcl.port; session->session_state = STATE_CONNECT; /* Add it to lookup table */ @@ -764,7 +768,7 @@ vcl_flush_mq_events (void) mq = wrk->app_event_queue; svm_msg_q_lock (mq); - vcl_mq_dequeue_batch (wrk, mq); + vcl_mq_dequeue_batch (wrk, mq, ~0); svm_msg_q_unlock (mq); for (i = 0; i < vec_len (wrk->mq_msg_vector); i++) @@ -818,13 +822,27 @@ static int vppcom_session_unbind (u32 session_handle) { vcl_worker_t *wrk = vcl_worker_get_current (); + session_accepted_msg_t *accepted_msg; vcl_session_t *session = 0; + vcl_session_msg_t *evt; u64 vpp_handle; session = vcl_session_get_w_handle (wrk, session_handle); if (!session) return VPPCOM_EBADFD; + /* Flush pending accept events, if any */ + while (clib_fifo_elts (session->accept_evts_fifo)) + { + clib_fifo_sub2 (session->accept_evts_fifo, evt); + accepted_msg = &evt->accepted_msg; + vcl_session_table_del_vpp_handle (wrk, accepted_msg->handle); + vcl_send_session_accepted_reply (session->vpp_evt_q, + accepted_msg->context, + session->vpp_handle, -1); + } + clib_fifo_free (session->accept_evts_fifo); + vpp_handle = session->vpp_handle; session->vpp_handle = ~0; session->session_state = STATE_DISCONNECT; @@ -924,8 +942,8 @@ vppcom_app_create (char *app_name) vcm->main_pid = getpid (); vcm->app_name = format (0, "%s", app_name); vppcom_init_error_string_table (); - svm_fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva, - 20 /* timeout in secs */ ); + fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva, + 20 /* timeout in secs */ ); pool_alloc (vcm->workers, vcl_cfg->max_workers); clib_spinlock_init (&vcm->workers_lock); clib_rwlock_init (&vcm->segment_table_lock); @@ -1324,7 +1342,7 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep, e = svm_msg_q_msg_data (wrk->app_event_queue, &msg); if (e->event_type != SESSION_CTRL_EVT_ACCEPTED) { - clib_warning ("discarded event: %u", e->event_type); + VDBG (0, "discarded event: %u", e->event_type); svm_msg_q_free_msg (wrk->app_event_queue, &msg); continue; } @@ -1335,7 +1353,11 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep, handle: - client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg); + client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg, + listen_session_index); + if (client_session_index == VCL_INVALID_SESSION_INDEX) + return VPPCOM_ECONNABORTED; + listen_session = vcl_session_get (wrk, listen_session_index); client_session = vcl_session_get (wrk, client_session_index); @@ -1360,10 +1382,6 @@ handle: sizeof (ip6_address_t)); } - vcl_send_session_accepted_reply (client_session->vpp_evt_q, - client_session->client_context, - client_session->vpp_handle, 0); - VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u " "local: %U:%u", listen_session_handle, listen_session->vpp_handle, client_session_index, client_session->vpp_handle, @@ -1531,11 +1549,11 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, svm_fifo_unset_event (s->rx_fifo); /* Cut-through sessions might request tx notifications on rx fifos */ - if (PREDICT_FALSE (rx_fifo->want_tx_ntf)) + if (PREDICT_FALSE (rx_fifo->want_deq_ntf)) { app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index, SESSION_IO_EVT_RX, SVM_Q_WAIT); - svm_fifo_reset_tx_ntf (s->rx_fifo); + svm_fifo_reset_has_deq_ntf (s->rx_fifo); } VDBG (2, "session %u[0x%llx]: read %d bytes from (%p)", s->session_index, @@ -1611,7 +1629,7 @@ vppcom_session_read_segments (uint32_t session_handle, } } - n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds); + n_read = svm_fifo_segments (rx_fifo, (svm_fifo_seg_t *) ds); svm_fifo_unset_event (rx_fifo); return n_read; @@ -1628,7 +1646,7 @@ vppcom_session_free_segments (uint32_t session_handle, if (PREDICT_FALSE (!s || s->is_vep)) return; - svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds); + svm_fifo_segments_free (s->rx_fifo, (svm_fifo_seg_t *) ds); } int @@ -1699,7 +1717,7 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n, } while (svm_fifo_is_full_prod (tx_fifo)) { - svm_fifo_add_want_tx_ntf (tx_fifo, SVM_FIFO_WANT_TX_NOTIF); + svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); if (vcl_session_is_closing (s)) return vcl_session_closing_error (s); svm_msg_q_lock (mq); @@ -1895,7 +1913,7 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, } } } - vcl_mq_dequeue_batch (wrk, mq); + vcl_mq_dequeue_batch (wrk, mq, ~0); svm_msg_q_unlock (mq); for (i = 0; i < vec_len (wrk->mq_msg_vector); i++) @@ -2019,7 +2037,7 @@ vppcom_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map, bits_set++; } else - svm_fifo_add_want_tx_ntf (session->tx_fifo, SVM_FIFO_WANT_TX_NOTIF); + svm_fifo_add_want_deq_ntf (session->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); })); check_rd: @@ -2228,8 +2246,8 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle, vep_session->vep.next_sh = session_handle; if (session->tx_fifo) - svm_fifo_add_want_tx_ntf (session->tx_fifo, - SVM_FIFO_WANT_TX_NOTIF_IF_FULL); + svm_fifo_add_want_deq_ntf (session->tx_fifo, + SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL); VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!", vep_handle, session_handle, event->events, event->data.u64); @@ -2313,7 +2331,7 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle, session->is_vep_session = 0; if (session->tx_fifo) - svm_fifo_del_want_tx_ntf (session->tx_fifo, SVM_FIFO_NO_TX_NOTIF); + svm_fifo_del_want_deq_ntf (session->tx_fifo, SVM_FIFO_NO_DEQ_NOTIF); VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sh %u!", vep_handle, session_handle); @@ -2367,7 +2385,7 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, add_event = 1; events[*num_ev].events |= EPOLLOUT; session_evt_data = session->vep.ev.data.u64; - svm_fifo_reset_tx_ntf (session->tx_fifo); + svm_fifo_reset_has_deq_ntf (session->tx_fifo); break; case SESSION_CTRL_EVT_ACCEPTED: session = vcl_session_accepted (wrk, @@ -2479,7 +2497,8 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, } } } - vcl_mq_dequeue_batch (wrk, mq); + ASSERT (maxevents > *num_ev); + vcl_mq_dequeue_batch (wrk, mq, maxevents - *num_ev); svm_msg_q_unlock (mq); handle_dequeued: @@ -2487,10 +2506,7 @@ handle_dequeued: { msg = vec_elt_at_index (wrk->mq_msg_vector, i); e = svm_msg_q_msg_data (mq, msg); - if (*num_ev < maxevents) - vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev); - else - vec_add1 (wrk->unhandled_evts_vector, *e); + vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev); svm_msg_q_free_msg (mq, msg); } vec_reset_length (wrk->mq_msg_vector); @@ -2502,7 +2518,7 @@ static int vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events, int maxevents, u32 n_evts, double wait_for_time) { - double wait = 0, start = 0; + double wait = 0, start = 0, now; if (!n_evts) { @@ -2519,7 +2535,9 @@ vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events, if (wait == -1) continue; - wait = wait - (clib_time_now (&wrk->clib_time) - start); + now = clib_time_now (&wrk->clib_time); + wait -= now - start; + start = now; } while (wait > 0); @@ -2586,11 +2604,11 @@ vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events, events, &n_evts); if (n_evts == maxevents) { - i += 1; - break; + vec_delete (wrk->unhandled_evts_vector, i + 1, 0); + return n_evts; } } - vec_delete (wrk->unhandled_evts_vector, i, 0); + vec_reset_length (wrk->unhandled_evts_vector); } if (vcm->cfg.use_mq_eventfd) @@ -3339,6 +3357,13 @@ vppcom_worker_register (void) return VPPCOM_OK; } +void +vppcom_worker_unregister (void) +{ + vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ ); + vcl_set_worker_index (~0); +} + int vppcom_worker_index (void) {