+session_accepted_handler (session_accepted_msg_t * mp)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ udp_echo_main_t *utm = &udp_echo_main;
+ session_accepted_reply_msg_t *rmp;
+ svm_fifo_t *rx_fifo, *tx_fifo;
+ app_session_t *session;
+ static f64 start_time;
+ u32 session_index;
+ int rv = 0;
+
+ if (start_time == 0.0)
+ start_time = clib_time_now (&utm->clib_time);
+
+ utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+ rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+ tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+
+ pool_get (utm->sessions, session);
+ memset (session, 0, sizeof (*session));
+ session_index = session - utm->sessions;
+
+ /* Cut-through case */
+ if (mp->server_event_queue_address)
+ {
+ clib_warning ("cut-through session");
+ utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address,
+ svm_msg_q_t *);
+ rx_fifo->master_session_index = session_index;
+ tx_fifo->master_session_index = session_index;
+ utm->cut_through_session_index = session_index;
+ session->rx_fifo = rx_fifo;
+ session->tx_fifo = tx_fifo;
+
+ rv = pthread_create (&utm->cut_through_thread_handle,
+ NULL /*attr */ , cut_through_thread_fn, 0);
+ if (rv)
+ {
+ clib_warning ("pthread_create returned %d", rv);
+ rv = VNET_API_ERROR_SYSCALL_ERROR_1;
+ }
+ utm->do_echo = 1;
+ }
+ else
+ {
+ rx_fifo->client_session_index = session_index;
+ tx_fifo->client_session_index = session_index;
+ session->rx_fifo = rx_fifo;
+ session->tx_fifo = tx_fifo;
+ clib_memcpy (&session->transport.rmt_ip, mp->ip,
+ sizeof (ip46_address_t));
+ session->transport.is_ip4 = mp->is_ip4;
+ session->transport.rmt_port = mp->port;
+ }
+
+ hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index);
+ if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
+ {
+ f64 now = clib_time_now (&utm->clib_time);
+ fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
+ pool_elts (utm->sessions), now - start_time,
+ (f64) pool_elts (utm->sessions) / (now - start_time));
+ }
+
+ app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+ SESSION_CTRL_EVT_ACCEPTED_REPLY);
+ rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+ rmp->handle = mp->handle;
+ rmp->context = mp->context;
+ rmp->retval = rv;
+ app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+ CLIB_MEMORY_BARRIER ();
+ utm->state = STATE_READY;
+}
+
+static void
+session_disconnected_handler (session_disconnected_msg_t * mp)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ udp_echo_main_t *utm = &udp_echo_main;
+ session_disconnected_reply_msg_t *rmp;
+ app_session_t *session;
+ uword *p;
+ int rv = 0;
+
+ p = hash_get (utm->session_index_by_vpp_handles, mp->handle);
+
+ if (p)
+ {
+ session = pool_elt_at_index (utm->sessions, p[0]);
+ hash_unset (utm->session_index_by_vpp_handles, mp->handle);
+ pool_put (utm->sessions, session);
+ }
+ else
+ {
+ clib_warning ("couldn't find session key %llx", mp->handle);
+ return;
+ }
+
+ app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+ SESSION_CTRL_EVT_DISCONNECTED_REPLY);
+ rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
+ rmp->retval = rv;
+ rmp->handle = mp->handle;
+ rmp->context = mp->context;
+ app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+}
+
+static void
+session_connected_handler (session_connected_msg_t * mp)
+{
+ udp_echo_main_t *utm = &udp_echo_main;
+ unformat_input_t _input, *input = &_input;
+ session_endpoint_extended_t _sep, *sep = &_sep;
+ app_session_t *session;
+
+ ASSERT (utm->i_am_server == 0);
+
+ if (mp->retval)
+ {
+ clib_warning ("failed connect");
+ return;
+ }
+
+ ASSERT (mp->server_rx_fifo && mp->server_tx_fifo);
+
+ pool_get (utm->sessions, session);
+ session->rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+ session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+ session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+ /* Cut-through case */
+ if (mp->client_event_queue_address)
+ {
+ clib_warning ("cut-through session");
+ utm->cut_through_session_index = session - utm->sessions;
+ utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+ utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address,
+ svm_msg_q_t *);
+ utm->do_echo = 1;
+ }
+ else
+ {
+ utm->connected_session = session - utm->sessions;
+ utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+
+ clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
+ sizeof (ip46_address_t));
+ session->transport.is_ip4 = mp->is_ip4;
+ session->transport.lcl_port = mp->lcl_port;
+
+ unformat_init_vector (input, utm->connect_uri);
+ if (!unformat (input, "%U", unformat_uri, sep))
+ {
+ clib_warning ("can't figure out remote ip and port");
+ utm->state = STATE_FAILED;
+ unformat_free (input);
+ return;
+ }
+ unformat_free (input);
+ clib_memcpy (&session->transport.rmt_ip, &sep->ip,
+ sizeof (ip46_address_t));
+ session->transport.rmt_port = sep->port;
+ session->is_dgram = !utm->is_connected;
+ }
+ utm->state = STATE_READY;
+}
+
+static void
+handle_mq_event (session_event_t * e)
+{
+ switch (e->event_type)
+ {
+ case SESSION_CTRL_EVT_ACCEPTED:
+ session_accepted_handler ((session_accepted_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ session_connected_handler ((session_connected_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ session_disconnected_handler ((session_disconnected_msg_t *) e->data);
+ break;
+ default:
+ clib_warning ("unhandled %u", e->event_type);
+ }
+}
+
+static void
+udp_client_send_connect (udp_echo_main_t * utm)