X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=1a304a6f80eb0b68e4dec40c4e28c9f1eb3194a4;hb=refs%2Fchanges%2F28%2F19428%2F5;hp=cd8da50a5c9bec0312d5acc34b03f59028d1c2f9;hpb=d5c604d6872bdb5576a900d51b96f2e6736b63a5;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index cd8da50a5c9..1a304a6f80e 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -105,12 +105,19 @@ session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type) SESSION_CTRL_EVT_CLOSE); } +void +session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, + void *rpc_args) +{ + session_send_evt_to_thread (fp, rpc_args, thread_index, + SESSION_CTRL_EVT_RPC); +} + void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) { if (thread_index != vlib_get_thread_index ()) - session_send_evt_to_thread (fp, rpc_args, thread_index, - SESSION_CTRL_EVT_RPC); + session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args); else { void (*fnp) (void *) = fp; @@ -167,9 +174,15 @@ session_alloc (u32 thread_index) void session_free (session_t * s) { - pool_put (session_main.wrk[s->thread_index].sessions, s); if (CLIB_DEBUG) - clib_memset (s, 0xFA, sizeof (*s)); + { + u8 thread_index = s->thread_index; + clib_memset (s, 0xFA, sizeof (*s)); + pool_put (session_main.wrk[thread_index].sessions, s); + return; + } + SESSION_EVT_DBG (SESSION_EVT_FREE, s); + pool_put (session_main.wrk[s->thread_index].sessions, s); } void @@ -283,7 +296,7 @@ session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b, continue; if (is_in_order) { - rv = svm_fifo_enqueue_nowait (s->rx_fifo, len, data); + rv = svm_fifo_enqueue (s->rx_fifo, len, data); if (rv == len) { written += rv; @@ -350,9 +363,9 @@ session_enqueue_stream_connection (transport_connection_t * tc, if (is_in_order) { - enqueued = svm_fifo_enqueue_nowait (s->rx_fifo, - b->current_length, - vlib_buffer_get_current (b)); + enqueued = svm_fifo_enqueue (s->rx_fifo, + b->current_length, + vlib_buffer_get_current (b)); if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0)) { @@ -398,13 +411,12 @@ session_enqueue_dgram_connection (session_t * s, { int enqueued = 0, rv, in_order_off; - ASSERT (svm_fifo_max_enqueue (s->rx_fifo) + ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo) >= b->current_length + sizeof (*hdr)); - svm_fifo_enqueue_nowait (s->rx_fifo, sizeof (session_dgram_hdr_t), - (u8 *) hdr); - enqueued = svm_fifo_enqueue_nowait (s->rx_fifo, b->current_length, - vlib_buffer_get_current (b)); + svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr); + enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length, + vlib_buffer_get_current (b)); if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0)) { in_order_off = enqueued > b->current_length ? enqueued : 0; @@ -440,6 +452,10 @@ u32 session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes) { session_t *s = session_get (tc->s_index, tc->thread_index); + + if (svm_fifo_needs_tx_ntf (s->tx_fifo, max_bytes)) + session_dequeue_notify (s); + return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes); } @@ -479,6 +495,11 @@ static inline int session_enqueue_notify_inline (session_t * s) { app_worker_t *app_wrk; + u32 session_index; + u8 n_subscribers; + + session_index = s->session_index; + n_subscribers = svm_fifo_n_subscribers (s->rx_fifo); app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (PREDICT_FALSE (!app_wrk)) @@ -490,7 +511,7 @@ session_enqueue_notify_inline (session_t * s) /* *INDENT-OFF* */ SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({ ed->data[0] = SESSION_IO_EVT_RX; - ed->data[1] = svm_fifo_max_dequeue (s->rx_fifo); + ed->data[1] = svm_fifo_max_dequeue_prod (s->rx_fifo); })); /* *INDENT-ON* */ @@ -499,9 +520,12 @@ session_enqueue_notify_inline (session_t * s) SESSION_IO_EVT_RX))) return -1; - if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo))) - return session_notify_subscribers (app_wrk->app_index, s, - s->rx_fifo, SESSION_IO_EVT_RX); + if (PREDICT_FALSE (n_subscribers)) + { + s = session_get (session_index, vlib_get_thread_index ()); + return session_notify_subscribers (app_wrk->app_index, s, + s->rx_fifo, SESSION_IO_EVT_RX); + } return 0; } @@ -561,9 +585,6 @@ session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index) continue; } - if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo)) - continue; - if (PREDICT_FALSE (session_enqueue_notify_inline (s))) errors++; } @@ -818,17 +839,16 @@ session_transport_closed_notify (transport_connection_t * tc) void session_transport_reset_notify (transport_connection_t * tc) { - session_t *s; app_worker_t *app_wrk; - application_t *app; + session_t *s; + s = session_get (tc->s_index, tc->thread_index); svm_fifo_dequeue_drop_all (s->tx_fifo); if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) return; s->session_state = SESSION_STATE_TRANSPORT_CLOSING; app_wrk = app_worker_get (s->app_wrk_index); - app = application_get (app_wrk->app_index); - app->cb_fns.session_reset_callback (s); + app_worker_reset_notify (app_wrk, s); } int @@ -880,6 +900,7 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) transport_connection_t *tc; transport_endpoint_cfg_t *tep; app_worker_t *app_wrk; + session_handle_t sh; session_t *s; int rv; @@ -904,6 +925,9 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) return -1; } + sh = session_handle (s); + session_lookup_add_connection (tc, sh); + return app_worker_connect_notify (app_wrk, s, opaque); } @@ -1091,7 +1115,7 @@ session_transport_close (session_t * s) * point, either after sending everything or after a timeout, call delete * notify. This will finally lead to the complete cleanup of the session. */ - if (svm_fifo_max_dequeue (s->tx_fifo)) + if (svm_fifo_max_dequeue_cons (s->tx_fifo)) s->session_state = SESSION_STATE_CLOSED_WAITING; else s->session_state = SESSION_STATE_CLOSED; @@ -1207,7 +1231,7 @@ session_segment_handle (session_t * s) { svm_fifo_t *f; - if (s->session_state == SESSION_STATE_LISTENING) + if (!s->rx_fifo) return SESSION_INVALID_HANDLE; f = s->rx_fifo; @@ -1270,6 +1294,18 @@ session_get_transport (session_t * s) s->connection_index); } +void +session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl) +{ + if (s->session_state != SESSION_STATE_LISTENING) + return transport_get_endpoint (session_get_transport_proto (s), + s->connection_index, s->thread_index, tep, + is_lcl); + else + return transport_get_listener_endpoint (session_get_transport_proto (s), + s->connection_index, tep, is_lcl); +} + transport_connection_t * listen_session_get_transport (session_t * s) {