X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvcl%2Fvcl_event.c;h=38a78535ee70d8ea1266500bfb5b885a908b5742;hb=134a996a0279577080b626d5172c1739efa00b92;hp=dafa2500d1d4f251b51f8c6b8044cfafde9fe496;hpb=410bcca41c1a3e7c3d4b4c2940120f9b21732d49;p=vpp.git diff --git a/src/vcl/vcl_event.c b/src/vcl/vcl_event.c index dafa2500d1d..38a78535ee7 100644 --- a/src/vcl/vcl_event.c +++ b/src/vcl/vcl_event.c @@ -18,7 +18,10 @@ #include #include +#include #include +#include + /** * @file * @brief VPP Communications Library (VCL) event handler. @@ -26,8 +29,17 @@ * Definitions for generic event handling in VCL. */ +#define VCL_EVENTS_LOCK() \ + clib_spinlock_lock (&(vcm->event_thread.events_lockp)) +#define VCL_EVENTS_UNLOCK() \ + clib_spinlock_unlock (&(vcm->event_thread.events_lockp)) +#define VCL_IO_SESSIONS_LOCK() \ + clib_spinlock_lock (&(vcm->session_io_thread.io_sessions_lockp)) +#define VCL_IO_SESSIONS_UNLOCK() \ + clib_spinlock_unlock (&(vcm->session_io_thread.io_sessions_lockp)) + int -vce_generate_event (vce_event_thread_t *evt, u32 ev_idx) +vce_generate_event (vce_event_thread_t * evt, u32 ev_idx) { int elts, rv = 0; vce_event_t *p; @@ -36,9 +48,9 @@ vce_generate_event (vce_event_thread_t *evt, u32 ev_idx) /* Check there is event data for this event */ - clib_spinlock_lock (&(evt->events_lockp)); - p = pool_elt_at_index (evt->vce_events, ev_idx); - ASSERT(p); + VCE_EVENTS_LOCK (); + p = pool_elt_at_index (evt->vce_events, ev_idx); + ASSERT (p); elts = (int) clib_fifo_free_elts (evt->event_index_fifo); if (PREDICT_TRUE (elts)) @@ -52,50 +64,50 @@ vce_generate_event (vce_event_thread_t *evt, u32 ev_idx) rv = VNET_API_ERROR_QUEUE_FULL; } - clib_spinlock_unlock (&(evt->events_lockp)); + VCE_EVENTS_UNLOCK (); pthread_mutex_unlock (&(evt->generator_lock)); return rv; } void -vce_clear_event (vce_event_thread_t *evt, u32 ev_idx) +vce_clear_event (vce_event_thread_t * evt, u32 ev_idx) { - clib_spinlock_lock (&(evt->events_lockp)); + VCE_EVENTS_LOCK (); pool_put_index (evt->vce_events, ev_idx); - clib_spinlock_unlock (&(evt->events_lockp)); + VCE_EVENTS_UNLOCK (); } vce_event_t * -vce_get_event_from_index(vce_event_thread_t *evt, u32 ev_idx) +vce_get_event_from_index (vce_event_thread_t * evt, u32 ev_idx) { vce_event_t *ev = 0; /* Assumes caller has obtained the spinlock (evt->events_lockp) */ - if ( ! pool_is_free_index (evt->vce_events, ev_idx)) + if (!pool_is_free_index (evt->vce_events, ev_idx)) ev = pool_elt_at_index (evt->vce_events, ev_idx); return ev; } vce_event_handler_reg_t * -vce_get_event_handler (vce_event_thread_t *evt, vce_event_key_t *evk) +vce_get_event_handler (vce_event_thread_t * evt, vce_event_key_t * evk) { vce_event_handler_reg_t *handler = 0; uword *p; - clib_spinlock_lock (&evt->handlers_lockp); + VCE_HANDLERS_LOCK (); p = hash_get (evt->handlers_index_by_event_key, evk->as_u64); if (p) handler = pool_elt_at_index (evt->vce_event_handlers, p[0]); - clib_spinlock_unlock (&evt->handlers_lockp); + VCE_HANDLERS_UNLOCK (); return handler; } vce_event_handler_reg_t * -vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, - vce_event_callback_t cb, void *cb_args) +vce_register_handler (vce_event_thread_t * evt, vce_event_key_t * evk, + vce_event_callback_t cb, void *cb_args) { vce_event_handler_reg_t *handler; vce_event_handler_reg_t *old_handler = 0; @@ -105,7 +117,7 @@ vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, /* TODO - multiple handler support. For now we can replace * and re-instate, which is useful for event recycling */ - clib_spinlock_lock (&evt->handlers_lockp); + VCE_HANDLERS_LOCK (); p = hash_get (evt->handlers_index_by_event_key, evk->as_u64); if (p) @@ -114,17 +126,17 @@ vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, /* If we are just re-registering, ignore and move on * else store the old handler_fn for unregister to re-instate */ if (old_handler->handler_fn == cb) - { + { - clib_spinlock_unlock (&evt->handlers_lockp); + VCE_HANDLERS_UNLOCK (); - /* Signal event thread that a handler exists in case any - * recycled events requiring this handler are pending */ - pthread_mutex_lock (&(evt->generator_lock)); - pthread_cond_signal (&(evt->generator_cond)); - pthread_mutex_unlock (&(evt->generator_lock)); - return old_handler; - } + /* Signal event thread that a handler exists in case any + * recycled events requiring this handler are pending */ + pthread_mutex_lock (&(evt->generator_lock)); + pthread_cond_signal (&(evt->generator_cond)); + pthread_mutex_unlock (&(evt->generator_lock)); + return old_handler; + } } pool_get (evt->vce_event_handlers, handler); @@ -132,7 +144,7 @@ vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, handler->handler_fn = cb; handler->replaced_handler_idx = (u32) ((p) ? p[0] : ~0); - handler->ev_idx = (u32) ~0; //This will be set by the event thread if event happens + handler->ev_idx = (u32) ~ 0; //This will be set by the event thread if event happens handler->evk = evk->as_u64; handler->handler_fn_args = cb_args; @@ -141,7 +153,7 @@ vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, pthread_cond_init (&(handler->handler_cond), NULL); pthread_mutex_init (&(handler->handler_lock), NULL); - clib_spinlock_unlock (&evt->handlers_lockp); + VCE_HANDLERS_UNLOCK (); /* Signal event thread that a new handler exists in case any * recycled events requiring this handler are pending */ @@ -153,19 +165,19 @@ vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk, } int -vce_unregister_handler (vce_event_thread_t *evt, - vce_event_handler_reg_t *handler) +vce_unregister_handler (vce_event_thread_t * evt, + vce_event_handler_reg_t * handler) { uword *p; u64 evk = handler->evk; u8 generate_signal = 0; - clib_spinlock_lock (&evt->handlers_lockp); + VCE_HANDLERS_LOCK (); p = hash_get (evt->handlers_index_by_event_key, evk); if (!p) { - clib_spinlock_unlock (&evt->handlers_lockp); + VCE_HANDLERS_UNLOCK (); return VNET_API_ERROR_NO_SUCH_ENTRY; } @@ -187,7 +199,7 @@ vce_unregister_handler (vce_event_thread_t *evt, pthread_cond_destroy (&(handler->handler_cond)); pool_put (evt->vce_event_handlers, handler); - clib_spinlock_unlock (&evt->handlers_lockp); + VCE_HANDLERS_UNLOCK (); if (generate_signal) { @@ -216,54 +228,54 @@ vce_event_thread_fn (void *arg) { uword fifo_depth = clib_fifo_elts (evt->event_index_fifo); while ((fifo_depth == 0) || (recycle_count == fifo_depth)) - { - recycle_count = 0; - pthread_cond_wait (&(evt->generator_cond), &(evt->generator_lock)); - fifo_depth = clib_fifo_elts (evt->event_index_fifo); - } + { + recycle_count = 0; + pthread_cond_wait (&(evt->generator_cond), &(evt->generator_lock)); + fifo_depth = clib_fifo_elts (evt->event_index_fifo); + } /* Remove event */ - clib_spinlock_lock (&(evt->events_lockp)); + VCE_EVENTS_LOCK (); clib_fifo_sub1 (evt->event_index_fifo, ev_idx); ev = vce_get_event_from_index (evt, ev_idx); - ASSERT(ev); + ASSERT (ev); if (recycle_count && ev->recycle) - { - clib_fifo_add1 (evt->event_index_fifo, ev_idx); - clib_spinlock_unlock (&(evt->events_lockp)); - continue; - } - clib_spinlock_lock (&evt->handlers_lockp); + { + clib_fifo_add1 (evt->event_index_fifo, ev_idx); + VCE_EVENTS_UNLOCK (); + continue; + } + VCE_HANDLERS_LOCK (); p = hash_get (evt->handlers_index_by_event_key, ev->evk.as_u64); if (!p) - { - /* If an event falls in the woods, and there is no handler to hear it, - * does it make any sound? - * I don't know either, so lets biff the event */ - pool_put(evt->vce_events, ev); - clib_spinlock_unlock (&(evt->events_lockp)); - clib_spinlock_unlock (&evt->handlers_lockp); - pthread_mutex_unlock (&(evt->generator_lock)); - } + { + /* If an event falls in the woods, and there is no handler to hear it, + * does it make any sound? + * I don't know either, so lets biff the event */ + pool_put (evt->vce_events, ev); + VCE_EVENTS_UNLOCK (); + VCE_HANDLERS_UNLOCK (); + pthread_mutex_unlock (&(evt->generator_lock)); + } else - { - u32 evt_recycle = ev->recycle; - handler = pool_elt_at_index (evt->vce_event_handlers, p[0]); - handler->ev_idx = ev_idx; - ev->recycle = 0; + { + u32 evt_recycle = ev->recycle; + handler = pool_elt_at_index (evt->vce_event_handlers, p[0]); + handler->ev_idx = ev_idx; + ev->recycle = 0; - clib_spinlock_unlock (&(evt->events_lockp)); - clib_spinlock_unlock (&evt->handlers_lockp); - pthread_mutex_unlock (&(evt->generator_lock)); + VCE_EVENTS_UNLOCK (); + VCE_HANDLERS_UNLOCK (); + pthread_mutex_unlock (&(evt->generator_lock)); - (handler->handler_fn)(handler); + (handler->handler_fn) (handler); - clib_spinlock_lock (&(evt->events_lockp)); - ev = vce_get_event_from_index (evt, ev_idx); - recycle_count += (!evt_recycle && ev && ev->recycle) ? 1 : 0; - clib_spinlock_unlock(&(evt->events_lockp)); - } + VCE_EVENTS_LOCK (); + ev = vce_get_event_from_index (evt, ev_idx); + recycle_count += (!evt_recycle && ev && ev->recycle) ? 1 : 0; + VCE_EVENTS_UNLOCK (); + } pthread_mutex_lock (&(evt->generator_lock)); } @@ -271,7 +283,7 @@ vce_event_thread_fn (void *arg) } int -vce_start_event_thread (vce_event_thread_t *evt, u8 max_events) +vce_start_event_thread (vce_event_thread_t * evt, u8 max_events) { clib_fifo_validate (evt->event_index_fifo, max_events); evt->handlers_index_by_event_key = hash_create (0, sizeof (uword)); @@ -283,6 +295,260 @@ vce_start_event_thread (vce_event_thread_t *evt, u8 max_events) clib_spinlock_init (&(evt->handlers_lockp)); return pthread_create (&(evt->thread), NULL /* attr */ , - vce_event_thread_fn, evt); + vce_event_thread_fn, evt); +} + +static void * +vppcom_session_io_thread_fn (void *arg) +{ + vppcom_session_io_thread_t *evt = (vppcom_session_io_thread_t *) arg; + u32 *session_indexes = 0, *session_index; + int i; + u32 bytes = 0; + vcl_session_t *session; + + while (1) + { + vec_reset_length (session_indexes); + VCE_IO_SESSIONS_LOCK (); + /* *INDENT-OFF* */ + pool_foreach (session_index, evt->active_session_indexes, ({ + vec_add1 (session_indexes, *session_index); + })); + /* *INDENT-ON* */ + VCE_IO_SESSIONS_UNLOCK (); + if (session_indexes) + { + for (i = 0; i < vec_len (session_indexes); ++i) + { + session = vcl_session_get (0, session_indexes[i]); + if (!session) + return NULL; + bytes = svm_fifo_max_dequeue (session->rx_fifo); + + if (bytes) + { + vppcom_ioevent_t *eio; + vce_event_t *ev; + u32 ev_idx; + + VCL_EVENTS_LOCK (); + + pool_get (vcm->event_thread.vce_events, ev); + ev_idx = (u32) (ev - vcm->event_thread.vce_events); + eio = vce_get_event_data (ev, sizeof (*eio)); + ev->evk.eid = VCL_EVENT_IOEVENT_RX_FIFO; + ev->evk.session_index = session_indexes[i]; + eio->bytes = bytes; + eio->session_index = session_indexes[i]; + + VCL_EVENTS_UNLOCK (); + + vce_generate_event (&vcm->event_thread, ev_idx); + } + } + } + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1000000; /* 1 millisecond */ + nanosleep (&ts, NULL); + } + return NULL; +} + +static int +vppcom_start_io_event_thread (vppcom_session_io_thread_t * evt, + u8 max_sessions) +{ + pthread_cond_init (&(evt->vce_io_cond), NULL); + pthread_mutex_init (&(evt->vce_io_lock), NULL); + + clib_spinlock_init (&(evt->io_sessions_lockp)); + + return pthread_create (&(evt->thread), NULL /* attr */ , + vppcom_session_io_thread_fn, evt); +} + +static void +vce_registered_ioevent_handler_fn (void *arg) +{ + vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg; + vppcom_ioevent_t *eio; + vce_event_t *ev; + u32 ioevt_ndx = (u64) (reg->handler_fn_args); + vppcom_session_ioevent_t *ioevent, ioevent_; + + VCL_EVENTS_LOCK (); + ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx); + eio = vce_get_event_data (ev, sizeof (*eio)); + VCL_EVENTS_UNLOCK (); + + VCL_IO_SESSIONS_LOCK (); + ioevent = pool_elt_at_index (vcm->session_io_thread.ioevents, ioevt_ndx); + ioevent_ = *ioevent; + VCL_IO_SESSIONS_UNLOCK (); + (ioevent_.user_cb) (eio, ioevent_.user_cb_data); + vce_clear_event (&vcm->event_thread, reg->ev_idx); + return; + + /*TODO - Unregister check in close for this listener */ + +} + +void +vce_registered_listener_connect_handler_fn (void *arg) +{ + vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg; + vce_event_connect_request_t *ecr; + vce_event_t *ev; + vppcom_endpt_t ep; + vcl_session_t *new_session; + + vppcom_session_listener_t *session_listener = + (vppcom_session_listener_t *) reg->handler_fn_args; + + VCL_EVENTS_LOCK (); + ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx); + ecr = vce_get_event_data (ev, sizeof (*ecr)); + VCL_EVENTS_UNLOCK (); + new_session = vcl_session_get (0, ecr->accepted_session_index); + if (!new_session) + return; + + ep.is_ip4 = new_session->transport.is_ip4; + ep.port = new_session->transport.rmt_port; + if (new_session->transport.is_ip4) + clib_memcpy (&ep.ip, &new_session->transport.rmt_ip.ip4, + sizeof (ip4_address_t)); + else + clib_memcpy (&ep.ip, &new_session->transport.rmt_ip.ip6, + sizeof (ip6_address_t)); + + vppcom_send_accept_session_reply (new_session->vpp_handle, + new_session->client_context, + 0 /* retval OK */ ); + + (session_listener->user_cb) (ecr->accepted_session_index, &ep, + session_listener->user_cb_data); + + if (vcm->session_io_thread.io_sessions_lockp) + { + /* Throw this new accepted session index into the rx poll thread pool */ + VCL_IO_SESSIONS_LOCK (); + u32 *active_session_index; + pool_get (vcm->session_io_thread.active_session_indexes, + active_session_index); + *active_session_index = ecr->accepted_session_index; + VCL_IO_SESSIONS_UNLOCK (); + } + + /*TODO - Unregister check in close for this listener */ + return; + + ASSERT (0); // If we can't get a lock or accepted session fails, lets blow up. +} + +/** + * @brief vce_poll_wait_connect_request_handler_fn + * - used by vppcom_epoll_xxxx() for listener sessions + * - when a vl_api_accept_session_t_handler() generates an event + * this callback is alerted and sets the fields that vppcom_epoll_wait() + * expects to see. + * + * @param arg - void* to be cast to vce_event_handler_reg_t* + */ +void +vce_poll_wait_connect_request_handler_fn (void *arg) +{ + vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg; + vce_event_t *ev; + /* Retrieve the VCL_EVENT_CONNECT_REQ_ACCEPTED event */ + ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx); + vce_event_connect_request_t *ecr = vce_get_event_data (ev, sizeof (*ecr)); + + /* Recycling the event. */ + VCL_EVENTS_LOCK (); + ev->recycle = 1; + clib_fifo_add1 (vcm->event_thread.event_index_fifo, reg->ev_idx); + VCL_EVENTS_UNLOCK (); } +int +vppcom_session_register_ioevent_cb (uint32_t session_index, + vppcom_session_ioevent_cb cb, + uint8_t rx, void *ptr) +{ + int rv = VPPCOM_OK; + vce_event_key_t evk; + vppcom_session_ioevent_t *ioevent; + + if (!vcm->session_io_thread.io_sessions_lockp) + rv = vppcom_start_io_event_thread (&vcm->session_io_thread, 100); /* DAW_TODO: ??? hard-coded value */ + + if (rv == VPPCOM_OK) + { + void *io_evt_ndx; + + /* Register handler for ioevent on session_index */ + VCL_IO_SESSIONS_LOCK (); + pool_get (vcm->session_io_thread.ioevents, ioevent); + io_evt_ndx = (void *) (ioevent - vcm->session_io_thread.ioevents); + ioevent->user_cb = cb; + ioevent->user_cb_data = ptr; + VCL_IO_SESSIONS_UNLOCK (); + + evk.session_index = session_index; + evk.eid = rx ? VCL_EVENT_IOEVENT_RX_FIFO : VCL_EVENT_IOEVENT_TX_FIFO; + + (void) vce_register_handler (&vcm->event_thread, &evk, + vce_registered_ioevent_handler_fn, + io_evt_ndx); + } + return rv; +} + +int +vppcom_session_register_listener (uint32_t session_index, + vppcom_session_listener_cb cb, + vppcom_session_listener_errcb + errcb, uint8_t flags, int q_len, void *ptr) +{ + int rv = VPPCOM_OK; + vce_event_key_t evk; + vppcom_session_listener_t *listener_args; + + if (!vcm->session_io_thread.io_sessions_lockp) + rv = vppcom_start_io_event_thread (&vcm->session_io_thread, 100); /* DAW_TODO: ??? hard-coded value */ + if (rv) + { + goto done; + } + rv = vppcom_session_listen (session_index, q_len); + if (rv) + { + goto done; + } + + /* Register handler for connect_request event on listen_session_index */ + listener_args = clib_mem_alloc (sizeof (vppcom_session_listener_t)); // DAW_TODO: Use a pool instead of thrashing the memory allocator! + listener_args->user_cb = cb; + listener_args->user_cb_data = ptr; + listener_args->user_errcb = errcb; + + evk.session_index = session_index; + evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED; + (void) vce_register_handler (&vcm->event_thread, &evk, + vce_registered_listener_connect_handler_fn, + listener_args); + +done: + return rv; +} + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */