X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvcl%2Fvcl_event.c;h=e08f5f33adc3b9aaa59a65306786a8d1292f7b37;hb=697faeace706337eddf0407e4e28e0bb8d39c20e;hp=e72b864e9440d0e9ab19506f6b39514405372283;hpb=9c949e72a473195c10a1c1caf503db9467c93f9a;p=vpp.git diff --git a/src/vcl/vcl_event.c b/src/vcl/vcl_event.c index e72b864e944..e08f5f33adc 100644 --- a/src/vcl/vcl_event.c +++ b/src/vcl/vcl_event.c @@ -20,6 +20,7 @@ #include #include +#include /** * @file @@ -29,7 +30,7 @@ */ int -vce_generate_event (vce_event_thread_t *evt, u32 ev_idx) +vce_generate_event (vce_event_thread_t * evt, u32 ev_idx) { int elts, rv = 0; vce_event_t *p; @@ -38,9 +39,9 @@ vce_generate_event (vce_event_thread_t *evt, u32 ev_idx) /* Check there is event data for this event */ - VCE_EVENTS_LOCK(); - p = pool_elt_at_index (evt->vce_events, ev_idx); - ASSERT(p); + VCE_EVENTS_LOCK (); + p = pool_elt_at_index (evt->vce_events, ev_idx); + ASSERT (p); elts = (int) clib_fifo_free_elts (evt->event_index_fifo); if (PREDICT_TRUE (elts)) @@ -61,7 +62,7 @@ vce_generate_event (vce_event_thread_t *evt, u32 ev_idx) } void -vce_clear_event (vce_event_thread_t *evt, u32 ev_idx) +vce_clear_event (vce_event_thread_t * evt, u32 ev_idx) { VCE_EVENTS_LOCK (); pool_put_index (evt->vce_events, ev_idx); @@ -69,19 +70,19 @@ vce_clear_event (vce_event_thread_t *evt, u32 ev_idx) } vce_event_t * -vce_get_event_from_index(vce_event_thread_t *evt, u32 ev_idx) +vce_get_event_from_index (vce_event_thread_t * evt, u32 ev_idx) { vce_event_t *ev = 0; /* Assumes caller has obtained the spinlock (evt->events_lockp) */ - if ( ! pool_is_free_index (evt->vce_events, ev_idx)) + if (!pool_is_free_index (evt->vce_events, ev_idx)) ev = pool_elt_at_index (evt->vce_events, ev_idx); return ev; } vce_event_handler_reg_t * -vce_get_event_handler (vce_event_thread_t *evt, vce_event_key_t *evk) +vce_get_event_handler (vce_event_thread_t * evt, vce_event_key_t * evk) { vce_event_handler_reg_t *handler = 0; uword *p; @@ -96,8 +97,8 @@ vce_get_event_handler (vce_event_thread_t *evt, vce_event_key_t *evk) } vce_event_handler_reg_t * -vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, - vce_event_callback_t cb, void *cb_args) +vce_register_handler (vce_event_thread_t * evt, vce_event_key_t * evk, + vce_event_callback_t cb, void *cb_args) { vce_event_handler_reg_t *handler; vce_event_handler_reg_t *old_handler = 0; @@ -116,17 +117,17 @@ vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, /* If we are just re-registering, ignore and move on * else store the old handler_fn for unregister to re-instate */ if (old_handler->handler_fn == cb) - { + { - VCE_HANDLERS_UNLOCK (); + VCE_HANDLERS_UNLOCK (); - /* Signal event thread that a handler exists in case any - * recycled events requiring this handler are pending */ - pthread_mutex_lock (&(evt->generator_lock)); - pthread_cond_signal (&(evt->generator_cond)); - pthread_mutex_unlock (&(evt->generator_lock)); - return old_handler; - } + /* Signal event thread that a handler exists in case any + * recycled events requiring this handler are pending */ + pthread_mutex_lock (&(evt->generator_lock)); + pthread_cond_signal (&(evt->generator_cond)); + pthread_mutex_unlock (&(evt->generator_lock)); + return old_handler; + } } pool_get (evt->vce_event_handlers, handler); @@ -134,7 +135,7 @@ vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, handler->handler_fn = cb; handler->replaced_handler_idx = (u32) ((p) ? p[0] : ~0); - handler->ev_idx = (u32) ~0; //This will be set by the event thread if event happens + handler->ev_idx = (u32) ~ 0; //This will be set by the event thread if event happens handler->evk = evk->as_u64; handler->handler_fn_args = cb_args; @@ -155,8 +156,8 @@ vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, } int -vce_unregister_handler (vce_event_thread_t *evt, - vce_event_handler_reg_t *handler) +vce_unregister_handler (vce_event_thread_t * evt, + vce_event_handler_reg_t * handler) { uword *p; u64 evk = handler->evk; @@ -218,54 +219,54 @@ vce_event_thread_fn (void *arg) { uword fifo_depth = clib_fifo_elts (evt->event_index_fifo); while ((fifo_depth == 0) || (recycle_count == fifo_depth)) - { - recycle_count = 0; - pthread_cond_wait (&(evt->generator_cond), &(evt->generator_lock)); - fifo_depth = clib_fifo_elts (evt->event_index_fifo); - } + { + recycle_count = 0; + pthread_cond_wait (&(evt->generator_cond), &(evt->generator_lock)); + fifo_depth = clib_fifo_elts (evt->event_index_fifo); + } /* Remove event */ VCE_EVENTS_LOCK (); clib_fifo_sub1 (evt->event_index_fifo, ev_idx); ev = vce_get_event_from_index (evt, ev_idx); - ASSERT(ev); + ASSERT (ev); if (recycle_count && ev->recycle) - { - clib_fifo_add1 (evt->event_index_fifo, ev_idx); - VCE_EVENTS_UNLOCK (); - continue; - } + { + clib_fifo_add1 (evt->event_index_fifo, ev_idx); + VCE_EVENTS_UNLOCK (); + continue; + } VCE_HANDLERS_LOCK (); p = hash_get (evt->handlers_index_by_event_key, ev->evk.as_u64); if (!p) - { - /* If an event falls in the woods, and there is no handler to hear it, - * does it make any sound? - * I don't know either, so lets biff the event */ - pool_put(evt->vce_events, ev); - VCE_EVENTS_UNLOCK (); - VCE_HANDLERS_UNLOCK (); - pthread_mutex_unlock (&(evt->generator_lock)); - } + { + /* If an event falls in the woods, and there is no handler to hear it, + * does it make any sound? + * I don't know either, so lets biff the event */ + pool_put (evt->vce_events, ev); + VCE_EVENTS_UNLOCK (); + VCE_HANDLERS_UNLOCK (); + pthread_mutex_unlock (&(evt->generator_lock)); + } else - { - u32 evt_recycle = ev->recycle; - handler = pool_elt_at_index (evt->vce_event_handlers, p[0]); - handler->ev_idx = ev_idx; - ev->recycle = 0; + { + u32 evt_recycle = ev->recycle; + handler = pool_elt_at_index (evt->vce_event_handlers, p[0]); + handler->ev_idx = ev_idx; + ev->recycle = 0; - VCE_EVENTS_UNLOCK (); - VCE_HANDLERS_UNLOCK (); - pthread_mutex_unlock (&(evt->generator_lock)); + VCE_EVENTS_UNLOCK (); + VCE_HANDLERS_UNLOCK (); + pthread_mutex_unlock (&(evt->generator_lock)); - (handler->handler_fn)(handler); + (handler->handler_fn) (handler); - VCE_EVENTS_LOCK (); - ev = vce_get_event_from_index (evt, ev_idx); - recycle_count += (!evt_recycle && ev && ev->recycle) ? 1 : 0; - VCE_EVENTS_UNLOCK (); - } + VCE_EVENTS_LOCK (); + ev = vce_get_event_from_index (evt, ev_idx); + recycle_count += (!evt_recycle && ev && ev->recycle) ? 1 : 0; + VCE_EVENTS_UNLOCK (); + } pthread_mutex_lock (&(evt->generator_lock)); } @@ -273,7 +274,7 @@ vce_event_thread_fn (void *arg) } int -vce_start_event_thread (vce_event_thread_t *evt, u8 max_events) +vce_start_event_thread (vce_event_thread_t * evt, u8 max_events) { clib_fifo_validate (evt->event_index_fifo, max_events); evt->handlers_index_by_event_key = hash_create (0, sizeof (uword)); @@ -285,6 +286,269 @@ vce_start_event_thread (vce_event_thread_t *evt, u8 max_events) clib_spinlock_init (&(evt->handlers_lockp)); return pthread_create (&(evt->thread), NULL /* attr */ , - vce_event_thread_fn, evt); + vce_event_thread_fn, evt); +} + +static void * +vppcom_session_io_thread_fn (void *arg) +{ + vppcom_session_io_thread_t *evt = (vppcom_session_io_thread_t *) arg; + u32 *session_indexes = 0, *session_index; + int i, rv; + u32 bytes = 0; + vcl_session_t *session; + + while (1) + { + vec_reset_length (session_indexes); + VCE_IO_SESSIONS_LOCK (); + /* *INDENT-OFF* */ + pool_foreach (session_index, evt->active_session_indexes, ({ + vec_add1 (session_indexes, *session_index); + })); + /* *INDENT-ON* */ + VCE_IO_SESSIONS_UNLOCK (); + if (session_indexes) + { + for (i = 0; i < vec_len (session_indexes); ++i) + { + VCL_SESSION_LOCK_AND_GET (session_indexes[i], &session); + bytes = svm_fifo_max_dequeue (session->rx_fifo); + VCL_SESSION_UNLOCK (); + + if (bytes) + { + vppcom_ioevent_t *eio; + vce_event_t *ev; + u32 ev_idx; + + VCL_EVENTS_LOCK (); + + pool_get (vcm->event_thread.vce_events, ev); + ev_idx = (u32) (ev - vcm->event_thread.vce_events); + eio = vce_get_event_data (ev, sizeof (*eio)); + ev->evk.eid = VCL_EVENT_IOEVENT_RX_FIFO; + ev->evk.session_index = session_indexes[i]; + eio->bytes = bytes; + eio->session_index = session_indexes[i]; + + VCL_EVENTS_UNLOCK (); + + rv = vce_generate_event (&vcm->event_thread, ev_idx); + } + } + } + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1000000; /* 1 millisecond */ + nanosleep (&ts, NULL); + } +done: + VCL_SESSION_UNLOCK (); + return NULL; } +static int +vppcom_start_io_event_thread (vppcom_session_io_thread_t * evt, + u8 max_sessions) +{ + pthread_cond_init (&(evt->vce_io_cond), NULL); + pthread_mutex_init (&(evt->vce_io_lock), NULL); + + clib_spinlock_init (&(evt->io_sessions_lockp)); + + return pthread_create (&(evt->thread), NULL /* attr */ , + vppcom_session_io_thread_fn, evt); +} + +static void +vce_registered_ioevent_handler_fn (void *arg) +{ + vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg; + vppcom_ioevent_t *eio; + vce_event_t *ev; + u32 ioevt_ndx = (u64) (reg->handler_fn_args); + vppcom_session_ioevent_t *ioevent, ioevent_; + + VCL_EVENTS_LOCK (); + ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx); + eio = vce_get_event_data (ev, sizeof (*eio)); + VCL_EVENTS_UNLOCK (); + + VCL_IO_SESSIONS_LOCK (); + ioevent = pool_elt_at_index (vcm->session_io_thread.ioevents, ioevt_ndx); + ioevent_ = *ioevent; + VCL_IO_SESSIONS_UNLOCK (); + (ioevent_.user_cb) (eio, ioevent_.user_cb_data); + vce_clear_event (&vcm->event_thread, reg->ev_idx); + return; + + /*TODO - Unregister check in close for this listener */ + +} + +void +vce_registered_listener_connect_handler_fn (void *arg) +{ + vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg; + vce_event_connect_request_t *ecr; + vce_event_t *ev; + vppcom_endpt_t ep; + + vcl_session_t *new_session; + int rv; + + vppcom_session_listener_t *session_listener = + (vppcom_session_listener_t *) reg->handler_fn_args; + + VCL_EVENTS_LOCK (); + ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx); + ecr = vce_get_event_data (ev, sizeof (*ecr)); + VCL_EVENTS_UNLOCK (); + VCL_SESSION_LOCK_AND_GET (ecr->accepted_session_index, &new_session); + + ep.is_ip4 = new_session->transport.is_ip4; + ep.port = new_session->transport.rmt_port; + if (new_session->transport.is_ip4) + clib_memcpy (&ep.ip, &new_session->transport.rmt_ip.ip4, + sizeof (ip4_address_t)); + else + clib_memcpy (&ep.ip, &new_session->transport.rmt_ip.ip6, + sizeof (ip6_address_t)); + + vppcom_send_accept_session_reply (new_session->vpp_handle, + new_session->client_context, + 0 /* retval OK */ ); + VCL_SESSION_UNLOCK (); + + (session_listener->user_cb) (ecr->accepted_session_index, &ep, + session_listener->user_cb_data); + + if (vcm->session_io_thread.io_sessions_lockp) + { + /* Throw this new accepted session index into the rx poll thread pool */ + VCL_IO_SESSIONS_LOCK (); + u32 *active_session_index; + pool_get (vcm->session_io_thread.active_session_indexes, + active_session_index); + *active_session_index = ecr->accepted_session_index; + VCL_IO_SESSIONS_UNLOCK (); + } + + /*TODO - Unregister check in close for this listener */ + return; + +done: + ASSERT (0); // If we can't get a lock or accepted session fails, lets blow up. +} + +/** + * @brief vce_poll_wait_connect_request_handler_fn + * - used by vppcom_epoll_xxxx() for listener sessions + * - when a vl_api_accept_session_t_handler() generates an event + * this callback is alerted and sets the fields that vppcom_epoll_wait() + * expects to see. + * + * @param arg - void* to be cast to vce_event_handler_reg_t* + */ +void +vce_poll_wait_connect_request_handler_fn (void *arg) +{ + vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg; + vce_event_t *ev; + /* Retrieve the VCL_EVENT_CONNECT_REQ_ACCEPTED event */ + ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx); + vce_event_connect_request_t *ecr = vce_get_event_data (ev, sizeof (*ecr)); + + /* Add the accepted_session_index to the FIFO */ + VCL_ACCEPT_FIFO_LOCK (); + clib_fifo_add1 (vcm->client_session_index_fifo, + ecr->accepted_session_index); + VCL_ACCEPT_FIFO_UNLOCK (); + + /* Recycling the event. */ + VCL_EVENTS_LOCK (); + ev->recycle = 1; + clib_fifo_add1 (vcm->event_thread.event_index_fifo, reg->ev_idx); + VCL_EVENTS_UNLOCK (); +} + +int +vppcom_session_register_ioevent_cb (uint32_t session_index, + vppcom_session_ioevent_cb cb, + uint8_t rx, void *ptr) +{ + int rv = VPPCOM_OK; + vce_event_key_t evk; + vppcom_session_ioevent_t *ioevent; + + if (!vcm->session_io_thread.io_sessions_lockp) + rv = vppcom_start_io_event_thread (&vcm->session_io_thread, 100); /* DAW_TODO: ??? hard-coded value */ + + if (rv == VPPCOM_OK) + { + void *io_evt_ndx; + + /* Register handler for ioevent on session_index */ + VCL_IO_SESSIONS_LOCK (); + pool_get (vcm->session_io_thread.ioevents, ioevent); + io_evt_ndx = (void *) (ioevent - vcm->session_io_thread.ioevents); + ioevent->user_cb = cb; + ioevent->user_cb_data = ptr; + VCL_IO_SESSIONS_UNLOCK (); + + evk.session_index = session_index; + evk.eid = rx ? VCL_EVENT_IOEVENT_RX_FIFO : VCL_EVENT_IOEVENT_TX_FIFO; + + (void) vce_register_handler (&vcm->event_thread, &evk, + vce_registered_ioevent_handler_fn, + io_evt_ndx); + } + return rv; +} + +int +vppcom_session_register_listener (uint32_t session_index, + vppcom_session_listener_cb cb, + vppcom_session_listener_errcb + errcb, uint8_t flags, int q_len, void *ptr) +{ + int rv = VPPCOM_OK; + vce_event_key_t evk; + vppcom_session_listener_t *listener_args; + + if (!vcm->session_io_thread.io_sessions_lockp) + rv = vppcom_start_io_event_thread (&vcm->session_io_thread, 100); /* DAW_TODO: ??? hard-coded value */ + if (rv) + { + goto done; + } + rv = vppcom_session_listen (session_index, q_len); + if (rv) + { + goto done; + } + + /* Register handler for connect_request event on listen_session_index */ + listener_args = clib_mem_alloc (sizeof (vppcom_session_listener_t)); // DAW_TODO: Use a pool instead of thrashing the memory allocator! + listener_args->user_cb = cb; + listener_args->user_cb_data = ptr; + listener_args->user_errcb = errcb; + + evk.session_index = session_index; + evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED; + (void) vce_register_handler (&vcm->event_thread, &evk, + vce_registered_listener_connect_handler_fn, + listener_args); + +done: + return rv; +} + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */