/* * Copyright (c) 2019 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include typedef struct _quic_echo_cb_vft { void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index); void (*client_stream_connected_cb) (session_connected_msg_t * mp, u32 session_index); void (*server_stream_connected_cb) (session_connected_msg_t * mp, u32 session_index); void (*quic_accepted_cb) (session_accepted_msg_t * mp, u32 session_index); void (*client_stream_accepted_cb) (session_accepted_msg_t * mp, u32 session_index); void (*server_stream_accepted_cb) (session_accepted_msg_t * mp, u32 session_index); } quic_echo_cb_vft_t; typedef struct { quic_echo_cb_vft_t cb_vft; /* cb vft for QUIC scenarios */ u8 send_quic_disconnects; /* actively send disconnect */ u32 n_stream_clients; /* Target Number of STREAM sessions per QUIC session */ volatile u32 n_quic_clients_connected; /* Number of connected QUIC sessions */ } quic_echo_proto_main_t; quic_echo_proto_main_t quic_echo_proto_main; /* * * ECHO Callback definitions * */ static void quic_echo_on_connected_connect (session_connected_msg_t * mp, u32 session_index) { echo_main_t *em = &echo_main; quic_echo_proto_main_t *eqm = &quic_echo_proto_main; echo_connect_args_t _a, *a = &_a; u64 i; a->parent_session_handle = mp->handle; a->context = session_index; clib_memcpy_fast (&a->lcl_ip, &em->lcl_ip, sizeof (ip46_address_t)); clib_memcpy_fast (&a->ip, &em->uri_elts.ip, sizeof (ip46_address_t)); echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); for (i = 0; i < eqm->n_stream_clients; i++) echo_send_rpc (em, echo_send_connect, (echo_rpc_args_t *) a); ECHO_LOG (1, "Qsession 0x%llx S[%d] connected to %U:%d", mp->handle, session_index, format_ip46_address, &mp->lcl.ip, mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port)); } static void quic_echo_on_connected_send (session_connected_msg_t * mp, u32 session_index) { static u32 client_index = 0; echo_main_t *em = &echo_main; echo_session_t *session; session = pool_elt_at_index (em->sessions, session_index); session->bytes_to_send = em->bytes_to_send; session->bytes_to_receive = em->bytes_to_receive; session->session_state = ECHO_SESSION_STATE_READY; em->data_thread_args[client_index++] = session->session_index; } static void quic_echo_on_connected_error (session_connected_msg_t * mp, u32 session_index) { ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_CONNECT, "Got a wrong connected on session %u [%lx]", session_index, mp->handle); } static void quic_echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index) { static u32 client_index = 0; echo_main_t *em = &echo_main; echo_session_t *session; session = pool_elt_at_index (em->sessions, session_index); session->bytes_to_send = em->bytes_to_send; session->bytes_to_receive = em->bytes_to_receive; em->data_thread_args[client_index++] = session->session_index; session->session_state = ECHO_SESSION_STATE_READY; } static void quic_echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index) { echo_main_t *em = &echo_main; quic_echo_proto_main_t *eqm = &quic_echo_proto_main; ECHO_LOG (2, "Accept on QSession 0x%lx S[%u]", mp->handle, session_index); echo_connect_args_t _a, *a = &_a; u32 i; a->parent_session_handle = mp->handle; a->context = session_index; clib_memcpy_fast (&a->lcl_ip, &em->lcl_ip, sizeof (ip46_address_t)); clib_memcpy_fast (&a->ip, &em->uri_elts.ip, sizeof (ip46_address_t)); echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); for (i = 0; i < eqm->n_stream_clients; i++) echo_send_rpc (em, echo_send_connect, (echo_rpc_args_t *) a); } static void quic_echo_on_accept_error (session_accepted_msg_t * mp, u32 session_index) { ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_ACCEPT, "Got a wrong accept on session 0x%lx S[%u]", mp->handle, session_index); } static void quic_echo_on_accept_log_ip (session_accepted_msg_t * mp, u32 session_index) { u8 *ip_str; ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4); ECHO_LOG (1, "Accepted session from: %s:%d", ip_str, clib_net_to_host_u16 (mp->rmt.port)); } static const quic_echo_cb_vft_t default_cb_vft = { /* Qsessions */ .quic_accepted_cb = quic_echo_on_accept_log_ip, .quic_connected_cb = quic_echo_on_connected_connect, /* client initiated streams */ .server_stream_accepted_cb = quic_echo_on_accept_recv, .client_stream_connected_cb = quic_echo_on_connected_send, /* server initiated streams */ .client_stream_accepted_cb = quic_echo_on_accept_error, .server_stream_connected_cb = quic_echo_on_connected_error, }; static const quic_echo_cb_vft_t server_stream_cb_vft = { /* Qsessions */ .quic_accepted_cb = quic_echo_on_accept_connect, .quic_connected_cb = NULL, /* client initiated streams */ .server_stream_accepted_cb = quic_echo_on_accept_error, .client_stream_connected_cb = quic_echo_on_connected_error, /* server initiated streams */ .client_stream_accepted_cb = quic_echo_on_accept_recv, .server_stream_connected_cb = quic_echo_on_connected_send, }; static void quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died); static inline void quic_echo_cleanup_listener (u32 listener_index, echo_main_t * em, quic_echo_proto_main_t * eqm) { echo_session_t *ls; ls = pool_elt_at_index (em->sessions, listener_index); if (ls->session_type != ECHO_SESSION_TYPE_QUIC) { ECHO_LOG (2, "%U: Invalid listener session type", echo_format_session, ls); return; } if (!clib_atomic_sub_fetch (&ls->accepted_session_count, 1)) { if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE) { echo_send_rpc (em, echo_send_disconnect_session, (echo_rpc_args_t *) & ls->vpp_session_handle); clib_atomic_fetch_add (&em->stats.active_count.q, 1); } else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE) { quic_echo_cleanup_cb (ls, 0 /* parent_died */ ); clib_atomic_fetch_add (&em->stats.clean_count.q, 1); } } } static void quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died) { echo_main_t *em = &echo_main; quic_echo_proto_main_t *eqm = &quic_echo_proto_main; if ((em->state == STATE_DATA_DONE) || !(s->session_state < ECHO_SESSION_STATE_CLOSED)) return; ECHO_LOG (3, "%U cleanup (parent_died %d)", echo_format_session, s, parent_died); s->session_state = ECHO_SESSION_STATE_CLOSED; if (s->session_type == ECHO_SESSION_TYPE_QUIC) { if (parent_died) clib_atomic_fetch_add (&em->stats.clean_count.q, 1); /* Don't cleanup listener as it's handled by main() */ clib_atomic_sub_fetch (&eqm->n_quic_clients_connected, 1); } else if (s->session_type == ECHO_SESSION_TYPE_STREAM) { if (parent_died) clib_atomic_fetch_add (&em->stats.clean_count.s, 1); else quic_echo_cleanup_listener (s->listener_index, em, eqm); clib_atomic_sub_fetch (&em->n_clients_connected, 1); } if (!em->n_clients_connected && !eqm->n_quic_clients_connected) em->state = STATE_DATA_DONE; ECHO_LOG (2, "Cleanup sessions (still %uQ %uS): app %U", eqm->n_quic_clients_connected, em->n_clients_connected, echo_format_app_state, em->state); } static void quic_echo_initiate_qsession_close_no_stream (echo_main_t * em) { quic_echo_proto_main_t *eqm = &quic_echo_proto_main; ECHO_LOG (2, "Closing Qsessions"); /* Close Quic session without streams */ echo_session_t *s; /* *INDENT-OFF* */ pool_foreach (s, em->sessions, ({ if (s->session_type == ECHO_SESSION_TYPE_QUIC) { if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE) { ECHO_LOG (2,"%U: ACTIVE close", echo_format_session, s); echo_send_rpc (em, echo_send_disconnect_session, (echo_rpc_args_t *) &s->vpp_session_handle); clib_atomic_fetch_add (&em->stats.active_count.q, 1); } else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE) { ECHO_LOG (2,"%U: CLEAN close", echo_format_session, s); quic_echo_cleanup_cb (s, 0 /* parent_died */); clib_atomic_fetch_add (&em->stats.clean_count.q, 1); } else ECHO_LOG (2,"%U: PASSIVE close", echo_format_session, s); } })); /* *INDENT-ON* */ } static void quic_echo_on_connected (session_connected_msg_t * mp, u32 session_index) { echo_main_t *em = &echo_main; quic_echo_proto_main_t *eqm = &quic_echo_proto_main; echo_session_t *listen_session; echo_session_t *session = pool_elt_at_index (em->sessions, session_index); if (session->listener_index == SESSION_INVALID_INDEX) { clib_atomic_fetch_add (&em->stats.connected_count.q, 1); session->session_type = ECHO_SESSION_TYPE_QUIC; ECHO_LOG (2, "Connected %U -> URI", echo_format_session, session); session->accepted_session_count = 0; if (eqm->cb_vft.quic_connected_cb) eqm->cb_vft.quic_connected_cb (mp, session->session_index); clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1); if (em->stats.connected_count.q % LOGGING_BATCH == 0) ECHO_LOG (0, "Connected Q %d / %d", em->stats.connected_count.q, em->n_connects); } else { clib_atomic_fetch_add (&em->stats.connected_count.s, 1); listen_session = pool_elt_at_index (em->sessions, session->listener_index); session->session_type = ECHO_SESSION_TYPE_STREAM; clib_atomic_fetch_add (&listen_session->accepted_session_count, 1); ECHO_LOG (2, "Connected %U -> %U", echo_format_session, session, echo_format_session, listen_session); if (em->i_am_master && eqm->cb_vft.server_stream_connected_cb) eqm->cb_vft.server_stream_connected_cb (mp, session->session_index); if (!em->i_am_master && eqm->cb_vft.client_stream_connected_cb) eqm->cb_vft.client_stream_connected_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_clients_connected, 1); if (em->stats.connected_count.s % LOGGING_BATCH == 0) ECHO_LOG (0, "Connected S %d / %d", em->stats.connected_count.s, em->n_clients); } if (em->n_clients_connected == em->n_clients && em->n_clients_connected != 0) echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); if (eqm->n_quic_clients_connected == em->n_connects && em->state < STATE_READY) { echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED); em->state = STATE_READY; if (eqm->n_stream_clients == 0) quic_echo_initiate_qsession_close_no_stream (em); } } static void quic_echo_connected_cb (session_connected_bundled_msg_t * mp, u32 session_index, u8 is_failed) { if (is_failed) { ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_CONNECT, "Echo connect failed"); return; } return quic_echo_on_connected ((session_connected_msg_t *) mp, session_index); } static void quic_echo_accepted_cb (session_accepted_msg_t * mp, echo_session_t * session) { echo_main_t *em = &echo_main; quic_echo_proto_main_t *eqm = &quic_echo_proto_main; echo_session_t *ls; ls = pool_elt_at_index (em->sessions, session->listener_index); if (ls->session_type == ECHO_SESSION_TYPE_LISTEN) { clib_atomic_fetch_add (&em->stats.accepted_count.q, 1); echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT); session->session_type = ECHO_SESSION_TYPE_QUIC; session->accepted_session_count = 0; if (eqm->cb_vft.quic_accepted_cb) eqm->cb_vft.quic_accepted_cb (mp, session->session_index); clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1); if (em->stats.accepted_count.q % LOGGING_BATCH == 0) ECHO_LOG (0, "Accepted Q %d / %d", em->stats.accepted_count.q, em->n_connects); } else { clib_atomic_fetch_add (&em->stats.accepted_count.s, 1); session->session_type = ECHO_SESSION_TYPE_STREAM; echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); clib_atomic_fetch_add (&ls->accepted_session_count, 1); if (em->i_am_master && eqm->cb_vft.server_stream_accepted_cb) eqm->cb_vft.server_stream_accepted_cb (mp, session->session_index); if (!em->i_am_master && eqm->cb_vft.client_stream_accepted_cb) eqm->cb_vft.client_stream_accepted_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_clients_connected, 1); if (em->stats.accepted_count.s % LOGGING_BATCH == 0) ECHO_LOG (0, "Accepted S %d / %d", em->stats.accepted_count.s, em->n_clients); if (em->connect_flag && !(mp->flags & em->connect_flag)) { ECHO_FAIL (ECHO_FAIL_UNIDIRECTIONAL, "expected unidirectional streams"); } } if (em->n_clients_connected == em->n_clients && em->n_clients_connected != 0) echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); if (eqm->n_quic_clients_connected == em->n_connects && em->state < STATE_READY) { echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED); em->state = STATE_READY; if (eqm->n_stream_clients == 0) quic_echo_initiate_qsession_close_no_stream (em); } } static void quic_echo_sent_disconnect_cb (echo_session_t * s) { if (s->session_type == ECHO_SESSION_TYPE_STREAM) s->session_state = ECHO_SESSION_STATE_CLOSING; else quic_echo_cleanup_cb (s, 0 /* parent_died */ ); /* We can clean Q/Lsessions right away */ } static void quic_echo_disconnected_cb (session_disconnected_msg_t * mp, echo_session_t * s) { echo_main_t *em = &echo_main; if (s->session_type == ECHO_SESSION_TYPE_STREAM) { echo_session_print_stats (em, s); if (s->bytes_to_receive || s->bytes_to_send) s->session_state = ECHO_SESSION_STATE_AWAIT_DATA; else s->session_state = ECHO_SESSION_STATE_CLOSING; clib_atomic_fetch_add (&em->stats.close_count.s, 1); } else { quic_echo_cleanup_cb (s, 0 /* parent_died */ ); /* We can clean Q/Lsessions right away */ clib_atomic_fetch_add (&em->stats.close_count.q, 1); } } static void quic_echo_reset_cb (session_reset_msg_t * mp, echo_session_t * s) { echo_main_t *em = &echo_main; if (s->session_type == ECHO_SESSION_TYPE_STREAM) { clib_atomic_fetch_add (&em->stats.reset_count.s, 1); s->session_state = ECHO_SESSION_STATE_CLOSING; } else { clib_atomic_fetch_add (&em->stats.reset_count.q, 1); quic_echo_cleanup_cb (s, 0 /* parent_died */ ); /* We can clean Q/Lsessions right away */ } } static uword quic_echo_unformat_setup_vft (unformat_input_t * input, va_list * args) { quic_echo_proto_main_t *eqm = &quic_echo_proto_main; if (unformat (input, "serverstream")) eqm->cb_vft = server_stream_cb_vft; else if (unformat (input, "default")) ; else return 0; return 1; } static int quic_echo_process_opts_cb (unformat_input_t * a) { echo_main_t *em = &echo_main; quic_echo_proto_main_t *eqm = &quic_echo_proto_main; if (unformat (a, "quic-streams %d", &eqm->n_stream_clients)) ; else if (unformat (a, "quic-setup %U", quic_echo_unformat_setup_vft)) ; else if (unformat (a, "uni")) em->connect_flag = SESSION_F_UNIDIRECTIONAL; else if (unformat (a, "qclose=%U", echo_unformat_close, &eqm->send_quic_disconnects)) ; else return 0; return 1; } static void quic_echo_set_defaults_before_opts_cb () { quic_echo_proto_main_t *eqm = &quic_echo_proto_main; eqm->cb_vft = default_cb_vft; eqm->n_stream_clients = 1; } static void quic_echo_set_defaults_after_opts_cb () { quic_echo_proto_main_t *eqm = &quic_echo_proto_main; echo_main_t *em = &echo_main; u8 default_f_active; em->n_connects = em->n_clients; em->n_sessions = clib_max (1, eqm->n_stream_clients) * em->n_clients + em->n_clients + em->n_uris; em->n_clients = eqm->n_stream_clients * em->n_clients; if (em->i_am_master) default_f_active = em->bytes_to_send == 0 ? ECHO_CLOSE_F_ACTIVE : ECHO_CLOSE_F_PASSIVE; else default_f_active = em->bytes_to_receive == 0 ? ECHO_CLOSE_F_PASSIVE : ECHO_CLOSE_F_ACTIVE; if (eqm->send_quic_disconnects == ECHO_CLOSE_F_INVALID) eqm->send_quic_disconnects = default_f_active; } static void quic_echo_print_usage_cb () { fprintf (stderr, "-- QUIC specific options -- \n" " quic-setup OPT OPT=serverstream : Client open N connections. \n" " On each one server opens M streams\n" " OPT=default : Client open N connections.\n" " On each one client opens M streams\n" " qclose=[Y|N|W] When connection is done send[Y]|nop[N]|wait[W] for close\n" " uni Use unidirectional streams\n" "\n" " quic-streams N Open N QUIC streams (defaults to 1)\n"); } echo_proto_cb_vft_t quic_echo_proto_cb_vft = { .disconnected_cb = quic_echo_disconnected_cb, .connected_cb = quic_echo_connected_cb, .accepted_cb = quic_echo_accepted_cb, .reset_cb = quic_echo_reset_cb, .sent_disconnect_cb = quic_echo_sent_disconnect_cb, .cleanup_cb = quic_echo_cleanup_cb, .process_opts_cb = quic_echo_process_opts_cb, .print_usage_cb = quic_echo_print_usage_cb, .set_defaults_before_opts_cb = quic_echo_set_defaults_before_opts_cb, .set_defaults_after_opts_cb = quic_echo_set_defaults_after_opts_cb, }; ECHO_REGISTER_PROTO (TRANSPORT_PROTO_QUIC, quic_echo_proto_cb_vft); /* * fd.io coding-style-patch-verification: ON * * Local Variables: * eval: (c-set-style "gnu") * End: */