From 60f3e6547aa75a31fad1f78bea11c950cc30d63c Mon Sep 17 00:00:00 2001 From: Nathan Skrzypczak Date: Tue, 19 Mar 2019 13:57:31 +0100 Subject: [PATCH] Integrate first QUIC protocol implementation Currently supports on single stream exposed through standard internal APIs Based on libquicly & picotls by h2o Change-Id: I7bc1ec0e399d1fb02bfd1da91aa7410076d08d14 Signed-off-by: Nathan Skrzypczak --- build/external/Makefile | 7 +- build/external/packages.mk | 32 +- build/external/packages/quicly.mk | 42 + .../patches/quicly/0001-cmake-install.patch | 57 + src/plugins/quic/CMakeLists.txt | 43 + src/plugins/quic/quic.c | 1592 ++++++++++++++++++++ src/plugins/quic/quic.h | 118 ++ src/tests/vnet/session/quic_echo.c | 1412 +++++++++++++++++ src/vnet/CMakeLists.txt | 6 + src/vnet/session/application.h | 4 +- src/vnet/session/session.c | 11 +- src/vnet/session/session.h | 2 + src/vnet/session/transport.c | 10 + src/vnet/session/transport_types.h | 1 + 14 files changed, 3330 insertions(+), 7 deletions(-) create mode 100644 build/external/packages/quicly.mk create mode 100644 build/external/patches/quicly/0001-cmake-install.patch create mode 100644 src/plugins/quic/CMakeLists.txt create mode 100644 src/plugins/quic/quic.c create mode 100644 src/plugins/quic/quic.h create mode 100644 src/tests/vnet/session/quic_echo.c diff --git a/build/external/Makefile b/build/external/Makefile index 084d694e88c..0c2d837aee0 100644 --- a/build/external/Makefile +++ b/build/external/Makefile @@ -30,6 +30,7 @@ I := $(INSTALL_DIR) include packages.mk include packages/nasm.mk include packages/ipsec-mb.mk +include packages/quicly.mk include packages/dpdk.mk include packages/rdma-core.mk @@ -38,7 +39,7 @@ clean: @rm -rf $(B) $(I) .PHONY: install -install: dpdk-install rdma-core-install +install: dpdk-install rdma-core-install quicly-install .PHONY: config config: dpdk-config rdma-core-config @@ -63,7 +64,7 @@ deb/debian/changelog: Makefile $(DEV_DEB): deb/debian/changelog @cd deb && dpkg-buildpackage -b -uc -us - git clean -fdx deb + @git clean -ffdx deb build-deb: $(DEV_DEB) @@ -107,7 +108,7 @@ $(DEV_RPM): Makefile rpm/vpp-ext-deps.spec --define "_release $(PKG_SUFFIX)" \ $(CURDIR)/rpm/vpp-ext-deps.spec mv rpm/RPMS/$(RPM_ARCH)/*.rpm . - @git clean -fdx rpm + @git clean -ffdx rpm build-rpm: $(DEV_RPM) diff --git a/build/external/packages.mk b/build/external/packages.mk index 005c2a958b2..d1126db7475 100644 --- a/build/external/packages.mk +++ b/build/external/packages.mk @@ -71,10 +71,40 @@ $(B)/.$1.extract.ok: $(B)/.$1.download.ok .PHONY: $1-extract $1-extract: $(B)/.$1.extract.ok +############################################################################## +# Git clone & checkout +############################################################################## + +$(B)/.$1.clone.ok: + $$(call h1,"Cloning $1 $($1_repository)") + @mkdir -p $$($1_src_dir) + @git clone --recursive $$($1_repository) $$($1_src_dir) +ifneq ($$($1_version),) + $$(call h1,"Checking out $1 $($1_version)") + cd $$($1_src_dir) && git -c advice.detachedHead=false checkout $$($1_version) + cd $$($1_src_dir) && git submodule update --init +endif + @touch $$@ + +.PHONY: $1-clone +$1-clone: $(B)/.$1.clone.ok + +############################################################################## +# Fetch source : clone or extract +############################################################################## + +ifeq ($$($1_repository),) +$(B)/.$1.fetchsrc.ok: $(B)/.$1.extract.ok + @touch $$@ +else +$(B)/.$1.fetchsrc.ok: $(B)/.$1.clone.ok + @touch $$@ +endif + ############################################################################## # Patch ############################################################################## -$(B)/.$1.patch.ok: $(B)/.$1.extract.ok +$(B)/.$1.patch.ok: $(B)/.$1.fetchsrc.ok $$(call h1,"patching $1 $($1_version)") ifneq ($$(wildcard $$($1_patch_dir)/*.patch),) @for f in $$($1_patch_dir)/*.patch ; do \ diff --git a/build/external/packages/quicly.mk b/build/external/packages/quicly.mk new file mode 100644 index 00000000000..246c73fb02d --- /dev/null +++ b/build/external/packages/quicly.mk @@ -0,0 +1,42 @@ +# Copyright (c) 2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +quicly_repository := https://github.com/h2o/quicly.git +quicly_version := f25b70d37f8974af9cc48a4a565d13a9cdc5fd75 +quicly_patch_dir := $(CURDIR)/patches/quicly + +picotls_build_dir := $(B)/build-picotls + +define quicly_build_cmds + @cd $(quicly_build_dir) && \ + cmake -DCMAKE_INSTALL_PREFIX:PATH=$(quicly_install_dir) \ + $(quicly_src_dir) > $(quicly_build_log) + @$(MAKE) $(MAKE_ARGS) -C $(quicly_build_dir) > $(quicly_build_log) + + @mkdir -p $(picotls_build_dir) + @cd $(picotls_build_dir) && \ + cmake -DCMAKE_INSTALL_PREFIX:PATH=$(quicly_install_dir) \ + $(quicly_src_dir)/deps/picotls > $(quicly_build_log) +endef + +define quicly_config_cmds + @true +endef + +define quicly_install_cmds + @$(MAKE) $(MAKE_ARGS) -C $(quicly_build_dir) install > $(quicly_install_log) + @$(MAKE) $(MAKE_ARGS) -C $(picotls_build_dir) install > $(quicly_install_log) +endef + + +$(eval $(call package,quicly)) diff --git a/build/external/patches/quicly/0001-cmake-install.patch b/build/external/patches/quicly/0001-cmake-install.patch new file mode 100644 index 00000000000..3ac90c7d0aa --- /dev/null +++ b/build/external/patches/quicly/0001-cmake-install.patch @@ -0,0 +1,57 @@ +diff --git a/CMakeLists.txt b/CMakeLists.txt +index 202cc52..b5c2bee 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -40,6 +40,8 @@ SET(UNITTEST_SOURCE_FILES + t/stream-concurrency.c + t/test.c) + ++SET(CMAKE_POSITION_INDEPENDENT_CODE ON) ++ + ADD_LIBRARY(quicly ${QUICLY_LIBRARY_FILES}) + + ADD_EXECUTABLE(cli ${PICOTLS_OPENSSL_FILES} src/cli.c) +@@ -50,6 +52,16 @@ TARGET_LINK_LIBRARIES(test.t quicly ${OPENSSL_LIBRARIES} ${CMAKE_DL_LIBS}) + + ADD_EXECUTABLE(udpfw t/udpfw.c) + ++INSTALL ( ++ DIRECTORY ${CMAKE_SOURCE_DIR}/include/ ++ DESTINATION include ++ FILES_MATCHING PATTERN "*.h") ++ ++INSTALL(TARGETS quicly ++ RUNTIME DESTINATION bin ++ LIBRARY DESTINATION lib ++ ARCHIVE DESTINATION lib) ++ + ADD_CUSTOM_TARGET(check env BINARY_DIR=${CMAKE_CURRENT_BINARY_DIR} prove --exec "sh -c" -v ${CMAKE_CURRENT_BINARY_DIR}/*.t t/*.t + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + DEPENDS cli test.t) +Submodule deps/picotls contains untracked content +Submodule deps/picotls contains modified content +diff --git a/deps/picotls/CMakeLists.txt b/deps/picotls/CMakeLists.txt +index cda6aad..62b23b7 100644 +--- a/deps/picotls/CMakeLists.txt ++++ b/deps/picotls/CMakeLists.txt +@@ -5,6 +5,7 @@ PROJECT(picotls) + + FIND_PACKAGE(PkgConfig REQUIRED) + ++SET(CMAKE_POSITION_INDEPENDENT_CODE ON) + SET(CMAKE_C_FLAGS "-std=c99 -Wall -O2 -g ${CC_WARNING_FLAGS} ${CMAKE_C_FLAGS}") + INCLUDE_DIRECTORIES(${OPENSSL_INCLUDE_DIR} deps/cifra/src/ext deps/cifra/src deps/micro-ecc deps/picotest include) + SET(MINICRYPTO_LIBRARY_FILES +@@ -97,3 +98,12 @@ IF (BUILD_FUZZER) + + + ENDIF() ++ ++INSTALL (DIRECTORY ${CMAKE_SOURCE_DIR}/include/ ++ DESTINATION include ++ FILES_MATCHING PATTERN "*.h") ++ ++INSTALL(TARGETS picotls-core picotls-openssl ++ RUNTIME DESTINATION bin ++ LIBRARY DESTINATION lib ++ ARCHIVE DESTINATION lib) diff --git a/src/plugins/quic/CMakeLists.txt b/src/plugins/quic/CMakeLists.txt new file mode 100644 index 00000000000..ccd49682e8c --- /dev/null +++ b/src/plugins/quic/CMakeLists.txt @@ -0,0 +1,43 @@ + +# Copyright (c) 2019 Cisco +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +message(STATUS "Looking for quicly") +unset(QUIC_LINK_LIBRARIES) + +find_path(QUICLY_INCLUDE_DIR NAMES quicly.h) +find_library (QUICLY_LIBRARY NAMES "libquicly.a") +find_library (PICOTLS_CORE_LIBRARY NAMES "libpicotls-core.a") +find_library (PICOTLS_OPENSSL_LIBRARY NAMES "libpicotls-openssl.a") + +list(APPEND QUIC_LINK_LIBRARIES + ${QUICLY_LIBRARY} + ${PICOTLS_CORE_LIBRARY} + ${PICOTLS_OPENSSL_LIBRARY} +) + +if(QUICLY_INCLUDE_DIR AND QUIC_LINK_LIBRARIES) + include_directories (${QUICLY_INCLUDE_DIR}) + add_vpp_plugin(quic + SOURCES + quic.c + + INSTALL_HEADERS + quic.h + + LINK_LIBRARIES ${QUIC_LINK_LIBRARIES} + ) + message(STATUS "Found quicly in ${QUICLY_INCLUDE_DIR}") +else() + message(WARNING "-- quicly not found - quic_plugin disabled") +endif() diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c new file mode 100644 index 00000000000..82de6de130b --- /dev/null +++ b/src/plugins/quic/quic.c @@ -0,0 +1,1592 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include + +static quic_main_t quic_main; + +static void quic_update_timer (quic_ctx_t * ctx); +static int64_t quic_get_time (quicly_now_cb * self); +static void quic_connection_closed (u32 conn_index); +static void quic_disconnect (u32 ctx_index, u32 thread_index); + +#define QUIC_INT_MAX 0x3FFFFFFFFFFFFFFF + +u32 +quic_ctx_half_open_alloc (void) +{ + quic_main_t *qm = &quic_main; + u8 will_expand = 0; + quic_ctx_t *ctx; + u32 ctx_index; + + pool_get_aligned_will_expand (qm->half_open_ctx_pool, will_expand, 0); + if (PREDICT_FALSE (will_expand && vlib_num_workers ())) + { + clib_rwlock_writer_lock (&qm->half_open_rwlock); + pool_get (qm->half_open_ctx_pool, ctx); + ctx_index = ctx - qm->half_open_ctx_pool; + clib_rwlock_writer_unlock (&qm->half_open_rwlock); + } + else + { + /* reader lock assumption: only main thread will call pool_get */ + clib_rwlock_reader_lock (&qm->half_open_rwlock); + pool_get (qm->half_open_ctx_pool, ctx); + ctx_index = ctx - qm->half_open_ctx_pool; + clib_rwlock_reader_unlock (&qm->half_open_rwlock); + } + memset (ctx, 0, sizeof (*ctx)); + return ctx_index; +} + +void +quic_ctx_half_open_free (u32 ho_index) +{ + quic_main_t *qm = &quic_main; + clib_rwlock_writer_lock (&qm->half_open_rwlock); + pool_put_index (qm->half_open_ctx_pool, ho_index); + clib_rwlock_writer_unlock (&qm->half_open_rwlock); +} + +quic_ctx_t * +quic_ctx_half_open_get (u32 ctx_index) +{ + quic_main_t *qm = &quic_main; + clib_rwlock_reader_lock (&qm->half_open_rwlock); + return pool_elt_at_index (qm->half_open_ctx_pool, ctx_index); +} + +void +quic_ctx_half_open_reader_unlock () +{ + clib_rwlock_reader_unlock (&quic_main.half_open_rwlock); +} + +u32 +quic_ctx_half_open_index (quic_ctx_t * ctx) +{ + return (ctx - quic_main.half_open_ctx_pool); +} + +u32 +quic_ctx_alloc () +{ + u8 thread_index = vlib_get_thread_index (); + quic_main_t *qm = &quic_main; + quic_ctx_t *ctx; + + pool_get (qm->ctx_pool[thread_index], ctx); + + memset (ctx, 0, sizeof (quic_ctx_t)); + ctx->c_thread_index = thread_index; + return ctx - qm->ctx_pool[thread_index]; +} + +static void +quic_ctx_free (quic_ctx_t * ctx) +{ + QUIC_DBG (2, "Free ctx %u", ctx->c_c_index); + u32 thread_index = ctx->c_thread_index; + if (CLIB_DEBUG) + memset (ctx, 0xfb, sizeof (*ctx)); + pool_put (quic_main.ctx_pool[thread_index], ctx); +} + +static quic_ctx_t * +quic_ctx_get (u32 ctx_index) +{ + return pool_elt_at_index (quic_main.ctx_pool[vlib_get_thread_index ()], + ctx_index); +} + +static quic_ctx_t * +quic_ctx_get_w_thread (u32 ctx_index, u8 thread_index) +{ + return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index); +} + +static void +quic_disconnect_transport (quic_ctx_t * ctx) +{ + QUIC_DBG (2, "Called quic_disconnect_transport"); + vnet_disconnect_args_t a = { + .handle = ctx->c_quic_ctx_id.quic_session, + .app_index = quic_main.app_index, + }; + + if (vnet_disconnect_session (&a)) + clib_warning ("UDP session disconnect errored"); +} + +static int +quic_send_datagram (session_t * session, quicly_datagram_t * packet) +{ + QUIC_DBG (2, "Called quic_send_datagram at %ld", quic_get_time (NULL)); + u32 max_enqueue; + session_dgram_hdr_t hdr; + int rv; + u32 len; + svm_fifo_t *f; + transport_connection_t *tc; + + len = packet->data.len; + f = session->tx_fifo; + tc = session_get_transport (session); + + max_enqueue = svm_fifo_max_enqueue (f); + if (max_enqueue <= sizeof (session_dgram_hdr_t)) + return 1; + + max_enqueue -= sizeof (session_dgram_hdr_t); + + if (max_enqueue < len) + return 1; + + // Build packet header for fifo + hdr.data_length = len; + hdr.data_offset = 0; + hdr.is_ip4 = tc->is_ip4; + clib_memcpy (&hdr.lcl_ip, &tc->lcl_ip, sizeof (ip46_address_t)); + hdr.lcl_port = tc->lcl_port; + + // Read dest address from quicly-provided sockaddr + if (hdr.is_ip4) + { + ASSERT (packet->sa.sa_family == AF_INET); + struct sockaddr_in *sa4 = (struct sockaddr_in *) &packet->sa; + hdr.rmt_port = sa4->sin_port; + hdr.rmt_ip.ip4.as_u32 = sa4->sin_addr.s_addr; + } + else + { + ASSERT (packet->sa.sa_family == AF_INET6); + struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) &packet->sa; + hdr.rmt_port = sa6->sin6_port; + clib_memcpy (&hdr.rmt_ip.ip6, &sa6->sin6_addr, 16); + } + + rv = svm_fifo_enqueue_nowait (f, sizeof (hdr), (u8 *) & hdr); + ASSERT (rv == sizeof (hdr)); + if (svm_fifo_enqueue_nowait (f, len, packet->data.base) != len) + return 1; + return 0; +} + +static int +quic_send_packets (quic_ctx_t * ctx) +{ + //QUIC_DBG (2, "Called quic_send_packets"); + quicly_datagram_t *packets[16]; + session_t *quic_session; + quicly_conn_t *conn; + size_t num_packets, i; + int ret; + + quic_session = session_get_from_handle (ctx->c_quic_ctx_id.quic_session); + conn = ctx->c_quic_ctx_id.conn; + + if (!conn) + return 0; + + do + { + num_packets = sizeof (packets) / sizeof (packets[0]); + if ((ret = quicly_send (conn, packets, &num_packets)) == 0) + { + for (i = 0; i != num_packets; ++i) + { + if (quic_send_datagram (quic_session, packets[i])) + { + QUIC_DBG (2, "quic_send_datagram failed"); + goto stop_sending; + } + ret = 0; + quicly_default_free_packet_cb.cb + (&quicly_default_free_packet_cb, packets[i]); + } + } + else + { + QUIC_DBG (2, "quicly_send returned %d, closing connection\n", ret); + return ret; + } + } + while (ret == 0 && num_packets == sizeof (packets) / sizeof (packets[0])); + +stop_sending: + if (svm_fifo_set_event (quic_session->tx_fifo)) + session_send_io_evt_to_thread (quic_session->tx_fifo, FIFO_EVENT_APP_TX); + + quic_update_timer (ctx); + return 0; +} + +/***************************************************************************** + * START QUICLY CALLBACKS + * Called from QUIC lib + *****************************************************************************/ + +static int +quic_on_stop_sending (quicly_stream_t * stream, int error_code) +{ + QUIC_DBG (2, "received STOP_SENDING: %d", error_code); + return 0; +} + +static int +quic_on_receive_reset (quicly_stream_t * stream, int error_code) +{ + QUIC_DBG (2, "received RESET_STREAM: %d", error_code); + return 0; +} + +static int +quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, + size_t len) +{ + QUIC_DBG (2, "received data: %lu bytes", len); + u32 to_enqueue, ctx_id; + quic_ctx_t *ctx; + session_t *app_session; + svm_fifo_t *rx_fifo; + app_worker_t *app_wrk; + + ctx_id = (u64) * quicly_get_data (stream->conn); + ctx = quic_ctx_get (ctx_id); + app_session = session_get_from_handle (ctx->c_quic_ctx_id.app_session); + rx_fifo = app_session->rx_fifo; + to_enqueue = svm_fifo_max_enqueue (rx_fifo); + if (to_enqueue > len) + to_enqueue = len; + // TODO what happens to the excess bytes? + + svm_fifo_enqueue_nowait (rx_fifo, to_enqueue, src); + + // Notify app + app_wrk = app_worker_get_if_valid (app_session->app_wrk_index); + if (PREDICT_TRUE (app_wrk != 0)) + app_worker_lock_and_send_event (app_wrk, app_session, SESSION_IO_EVT_RX); + return 0; +} + +static const quicly_stream_callbacks_t quic_stream_callbacks = { + .on_destroy = quicly_streambuf_destroy, + .on_send_shift = quicly_streambuf_egress_shift, + .on_send_emit = quicly_streambuf_egress_emit, + .on_send_stop = quic_on_stop_sending, + .on_receive = quic_on_receive, + .on_receive_reset = quic_on_receive_reset +}; + +static int +quic_on_stream_open (quicly_stream_open_cb * self, quicly_stream_t * stream) +{ + QUIC_DBG (2, "on_stream_open called"); + int ret; + if ((ret = + quicly_streambuf_create (stream, sizeof (quicly_streambuf_t))) != 0) + { + return ret; + } + stream->callbacks = &quic_stream_callbacks; + return 0; +} + +static quicly_stream_open_cb on_stream_open = { &quic_on_stream_open }; + +static void +quic_on_conn_close (quicly_closed_by_peer_cb * self, quicly_conn_t * conn, + int code, uint64_t frame_type, + const char *reason, size_t reason_len) +{ + QUIC_DBG (2, "connection closed, reason: %s", reason); + u32 ctx_index = (u64) * quicly_get_data (conn); + quic_connection_closed (ctx_index); +} + +static quicly_closed_by_peer_cb on_closed_by_peer = { &quic_on_conn_close }; + + +/***************************************************************************** + * END QUICLY CALLBACKS + *****************************************************************************/ + +/* single-entry session cache */ +struct st_util_session_cache_t +{ + ptls_encrypt_ticket_t super; + uint8_t id[32]; + ptls_iovec_t data; +}; + +static int +encrypt_ticket_cb (ptls_encrypt_ticket_t * _self, ptls_t * tls, + int is_encrypt, ptls_buffer_t * dst, ptls_iovec_t src) +{ + struct st_util_session_cache_t *self = (void *) _self; + int ret; + + if (is_encrypt) + { + + /* replace the cached entry along with a newly generated session id */ + free (self->data.base); + if ((self->data.base = malloc (src.len)) == NULL) + return PTLS_ERROR_NO_MEMORY; + + ptls_get_context (tls)->random_bytes (self->id, sizeof (self->id)); + memcpy (self->data.base, src.base, src.len); + self->data.len = src.len; + + /* store the session id in buffer */ + if ((ret = ptls_buffer_reserve (dst, sizeof (self->id))) != 0) + return ret; + memcpy (dst->base + dst->off, self->id, sizeof (self->id)); + dst->off += sizeof (self->id); + + } + else + { + + /* check if session id is the one stored in cache */ + if (src.len != sizeof (self->id)) + return PTLS_ERROR_SESSION_NOT_FOUND; + if (memcmp (self->id, src.base, sizeof (self->id)) != 0) + return PTLS_ERROR_SESSION_NOT_FOUND; + + /* return the cached value */ + if ((ret = ptls_buffer_reserve (dst, self->data.len)) != 0) + return ret; + memcpy (dst->base + dst->off, self->data.base, self->data.len); + dst->off += self->data.len; + } + + return 0; +} + +static struct st_util_session_cache_t sc = { + .super = { + .cb = encrypt_ticket_cb, + }, +}; + +/* *INDENT-OFF* */ +static ptls_context_t quic_tlsctx = { + .random_bytes = ptls_openssl_random_bytes, + .get_time = &ptls_get_time, + .key_exchanges = ptls_openssl_key_exchanges, + .cipher_suites = ptls_openssl_cipher_suites, + .certificates = { + .list = NULL, + .count = 0 + }, + .esni = NULL, + .on_client_hello = NULL, + .emit_certificate = NULL, + .sign_certificate = NULL, + .verify_certificate = NULL, + .ticket_lifetime = 86400, + .max_early_data_size = 8192, + .hkdf_label_prefix__obsolete = NULL, + .require_dhe_on_psk = 1, + .encrypt_ticket = &sc.super, +}; +/* *INDENT-ON* */ + +static int +ptls_compare_separator_line (const char *line, const char *begin_or_end, + const char *label) +{ + int ret = strncmp (line, "-----", 5); + size_t text_index = 5; + + if (ret == 0) + { + size_t begin_or_end_length = strlen (begin_or_end); + ret = strncmp (line + text_index, begin_or_end, begin_or_end_length); + text_index += begin_or_end_length; + } + + if (ret == 0) + { + ret = line[text_index] - ' '; + text_index++; + } + + if (ret == 0) + { + size_t label_length = strlen (label); + ret = strncmp (line + text_index, label, label_length); + text_index += label_length; + } + + if (ret == 0) + { + ret = strncmp (line + text_index, "-----", 5); + } + + return ret; +} + +static int +ptls_get_bio_pem_object (BIO * bio, const char *label, ptls_buffer_t * buf) +{ + int ret = PTLS_ERROR_PEM_LABEL_NOT_FOUND; + char line[256]; + ptls_base64_decode_state_t state; + + /* Get the label on a line by itself */ + while (BIO_gets (bio, line, 256)) + { + if (ptls_compare_separator_line (line, "BEGIN", label) == 0) + { + ret = 0; + ptls_base64_decode_init (&state); + break; + } + } + /* Get the data in the buffer */ + while (ret == 0 && BIO_gets (bio, line, 256)) + { + if (ptls_compare_separator_line (line, "END", label) == 0) + { + if (state.status == PTLS_BASE64_DECODE_DONE + || (state.status == PTLS_BASE64_DECODE_IN_PROGRESS + && state.nbc == 0)) + { + ret = 0; + } + else + { + ret = PTLS_ERROR_INCORRECT_BASE64; + } + break; + } + else + { + ret = ptls_base64_decode (line, &state, buf); + } + } + + return ret; +} + +int +ptls_load_bio_pem_objects (BIO * bio, const char *label, ptls_iovec_t * list, + size_t list_max, size_t * nb_objects) +{ + int ret = 0; + size_t count = 0; + + *nb_objects = 0; + + if (ret == 0) + { + while (count < list_max) + { + ptls_buffer_t buf; + + ptls_buffer_init (&buf, "", 0); + + ret = ptls_get_bio_pem_object (bio, label, &buf); + + if (ret == 0) + { + if (buf.off > 0 && buf.is_allocated) + { + list[count].base = buf.base; + list[count].len = buf.off; + count++; + } + else + { + ptls_buffer_dispose (&buf); + } + } + else + { + ptls_buffer_dispose (&buf); + break; + } + } + } + + if (ret == PTLS_ERROR_PEM_LABEL_NOT_FOUND && count > 0) + { + ret = 0; + } + + *nb_objects = count; + + return ret; +} + +#define PTLS_MAX_CERTS_IN_CONTEXT 16 + +int +ptls_load_bio_certificates (ptls_context_t * ctx, BIO * bio) +{ + int ret = 0; + + ctx->certificates.list = + (ptls_iovec_t *) malloc (PTLS_MAX_CERTS_IN_CONTEXT * + sizeof (ptls_iovec_t)); + + if (ctx->certificates.list == NULL) + { + ret = PTLS_ERROR_NO_MEMORY; + } + else + { + ret = + ptls_load_bio_pem_objects (bio, "CERTIFICATE", ctx->certificates.list, + PTLS_MAX_CERTS_IN_CONTEXT, + &ctx->certificates.count); + } + + return ret; +} + +static inline void +load_bio_certificate_chain (ptls_context_t * ctx, const char *cert_data) +{ + BIO *cert_bio; + cert_bio = BIO_new_mem_buf (cert_data, -1); + if (ptls_load_bio_certificates (ctx, cert_bio) != 0) + { + BIO_free (cert_bio); + fprintf (stderr, "failed to load certificate:%s\n", strerror (errno)); + exit (1); + } + BIO_free (cert_bio); +} + +static inline void +load_bio_private_key (ptls_context_t * ctx, const char *pk_data) +{ + static ptls_openssl_sign_certificate_t sc; + EVP_PKEY *pkey; + BIO *key_bio; + + key_bio = BIO_new_mem_buf (pk_data, -1); + pkey = PEM_read_bio_PrivateKey (key_bio, NULL, NULL, NULL); + BIO_free (key_bio); + + if (pkey == NULL) + { + fprintf (stderr, "failed to read private key from app configuration\n"); + exit (1); + } + + ptls_openssl_init_sign_certificate (&sc, pkey); + EVP_PKEY_free (pkey); + + ctx->sign_certificate = &sc.super; +} + +static void +quic_connection_closed (u32 ctx_index) +{ + QUIC_DBG (2, "QUIC connection closed"); + quic_ctx_t *ctx; + + ctx = quic_ctx_get (ctx_index); + // TODO if connection is not established, just delete the session + + // Do not try to send anything anymore + ctx->stream = NULL; + quicly_free (ctx->c_quic_ctx_id.conn); + ctx->c_quic_ctx_id.conn = NULL; + session_transport_closing_notify (&ctx->connection); +} + +static int64_t +quic_get_time (quicly_now_cb * self) +{ + // TODO read value set by set_time_now? + // (needs to change it not to call this function) + vlib_main_t *vlib_main = vlib_get_main (); + f64 time = vlib_time_now (vlib_main); + return (int64_t) (time * 1000.f); +} +quicly_now_cb quicly_vpp_now_cb = { quic_get_time }; + +static void +allocate_quicly_ctx (application_t * app, u8 is_client) +{ + QUIC_DBG (2, "Called allocate_quicly_ctx"); + struct + { + quicly_context_t _; + char cid_key[17]; + } *ctx_data; + quicly_context_t *quicly_ctx; + char *cid_key; + + ctx_data = malloc (sizeof (*ctx_data)); + quicly_ctx = &ctx_data->_; + app->quicly_ctx = (u64 *) quicly_ctx; + memcpy (quicly_ctx, &quicly_default_context, sizeof (quicly_context_t)); + + quicly_ctx->tls = &quic_tlsctx; + quicly_ctx->stream_open = &on_stream_open; + quicly_ctx->closed_by_peer = &on_closed_by_peer; + quicly_ctx->now = &quicly_vpp_now_cb; + + quicly_amend_ptls_context (quicly_ctx->tls); + + quicly_ctx->event_log.mask = INT64_MAX; + quicly_ctx->event_log.cb = quicly_new_default_event_log_cb (stderr); + + quicly_ctx->transport_params.max_data = QUIC_INT_MAX; + quicly_ctx->transport_params.max_streams_uni = QUIC_INT_MAX; + quicly_ctx->transport_params.max_streams_bidi = QUIC_INT_MAX; + quicly_ctx->transport_params.max_stream_data.bidi_local = QUIC_INT_MAX; + quicly_ctx->transport_params.max_stream_data.bidi_remote = QUIC_INT_MAX; + quicly_ctx->transport_params.max_stream_data.uni = QUIC_INT_MAX; + + if (!is_client) + { + load_bio_private_key (quicly_ctx->tls, (char *) app->tls_key); + load_bio_certificate_chain (quicly_ctx->tls, (char *) app->tls_cert); + cid_key = ctx_data->cid_key; + quicly_ctx->tls->random_bytes (cid_key, 16); + cid_key[16] = 0; + quicly_ctx->encrypt_cid = + quicly_new_default_encrypt_cid_cb (&ptls_openssl_bfecb, + &ptls_openssl_sha256, + ptls_iovec_init (cid_key, + strlen + (cid_key))); + quicly_ctx->decrypt_cid = + quicly_new_default_decrypt_cid_cb (&ptls_openssl_bfecb, + &ptls_openssl_sha256, + ptls_iovec_init (cid_key, + strlen + (cid_key))); + } +} + + +/***************************************************************************** + * BEGIN TIMERS HANDLING + *****************************************************************************/ + +static u32 +quic_set_time_now (u32 thread_index) +{ + quic_main.wrk_ctx[thread_index].time_now = quic_get_time (NULL); + return quic_main.wrk_ctx[thread_index].time_now; +} + +static void +quic_timer_expired (u32 conn_index) +{ + quic_ctx_t *ctx; + QUIC_DBG (2, "Timer expired for conn %u at %ld", conn_index, + quic_get_time (NULL)); + ctx = quic_ctx_get (conn_index); + ctx->c_quic_ctx_id.timer_handle = QUIC_TIMER_HANDLE_INVALID; + if (quic_send_packets (ctx)) + { + quic_connection_closed (conn_index); + } +} + +static void +quic_update_timer (quic_ctx_t * ctx) +{ + tw_timer_wheel_1t_3w_1024sl_ov_t *tw; + int64_t next_timeout; + + // This timeout is in ms which is the unit of our timer + next_timeout = quicly_get_first_timeout (ctx->c_quic_ctx_id.conn); + tw = &quic_main.wrk_ctx[vlib_get_thread_index ()].timer_wheel; + f64 next_timeout_f = ((f64) next_timeout) / 1000.f; + + clib_warning ("Timer set to %ld (%lf)", next_timeout, next_timeout_f); + + if (ctx->c_quic_ctx_id.timer_handle == QUIC_TIMER_HANDLE_INVALID) + { + if (next_timeout == INT64_MAX) + return; + ctx->c_quic_ctx_id.timer_handle = + tw_timer_start_1t_3w_1024sl_ov (tw, ctx->c_c_index, 0, + next_timeout_f); + } + else + { + if (next_timeout == INT64_MAX) + { + tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->c_quic_ctx_id.timer_handle); + ctx->c_quic_ctx_id.timer_handle = QUIC_TIMER_HANDLE_INVALID; + } + else + tw_timer_update_1t_3w_1024sl_ov (tw, ctx->c_quic_ctx_id.timer_handle, + next_timeout_f); + } +} + +static void +quic_expired_timers_dispatch (u32 * expired_timers) +{ + int i; + + for (i = 0; i < vec_len (expired_timers); i++) + { + quic_timer_expired (expired_timers[i]); + } +} + + +/***************************************************************************** + * END TIMERS HANDLING + * + * BEGIN TRANSPORT PROTO FUNCTIONS + *****************************************************************************/ + +int +quic_connect (transport_endpoint_cfg_t * tep) +{ + QUIC_DBG (2, "Called quic_connect"); + vnet_connect_args_t _cargs = { {}, }, *cargs = &_cargs; + session_endpoint_cfg_t *sep; + quic_main_t *qm = &quic_main; + quic_ctx_t *ctx; + app_worker_t *app_wrk; + application_t *app; + u32 ctx_index; + int error; + + sep = (session_endpoint_cfg_t *) tep; + ctx_index = quic_ctx_half_open_alloc (); + ctx = quic_ctx_half_open_get (ctx_index); + ctx->c_quic_ctx_id.parent_app_wrk_idx = sep->app_wrk_index; + ctx->c_s_index = 0xFAFAFAFA; + ctx->c_quic_ctx_id.udp_is_ip4 = sep->is_ip4; + ctx->c_quic_ctx_id.timer_handle = QUIC_TIMER_HANDLE_INVALID; + ctx->c_quic_ctx_id.conn_state = QUIC_CONN_STATE_HANDSHAKE; + ctx->client_opaque = sep->opaque; + if (sep->hostname) + { + ctx->srv_hostname = format (0, "%v", sep->hostname); + vec_terminate_c_string (ctx->srv_hostname); + } + else + { + // needed by quic for crypto + determining client / server + ctx->srv_hostname = + format (0, "%U", format_ip46_address, &sep->ip, sep->is_ip4); + } + + quic_ctx_half_open_reader_unlock (); + + clib_memcpy (&cargs->sep, sep, sizeof (session_endpoint_cfg_t)); + cargs->sep.transport_proto = TRANSPORT_PROTO_UDP; + cargs->app_index = qm->app_index; + cargs->api_context = ctx_index; + + app_wrk = app_worker_get (sep->app_wrk_index); + app = application_get (app_wrk->app_index); + ctx->c_quic_ctx_id.parent_app_id = app_wrk->app_index; + + allocate_quicly_ctx (app, 1 /* is client */ ); + + if ((error = vnet_connect (cargs))) + return error; + + QUIC_DBG (1, "New connect request %u", ctx_index); + return 0; +} + +static void +quic_disconnect (u32 ctx_index, u32 thread_index) +{ + QUIC_DBG (2, "Called quic_disconnect"); + tw_timer_wheel_1t_3w_1024sl_ov_t *tw; + quic_ctx_t *ctx; + + QUIC_DBG (1, "Disconnecting %x", ctx_index); + + ctx = quic_ctx_get (ctx_index); + if (ctx->c_quic_ctx_id.timer_handle != QUIC_TIMER_HANDLE_INVALID) + { + tw = &quic_main.wrk_ctx[vlib_get_thread_index ()].timer_wheel; + tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->c_quic_ctx_id.timer_handle); + } + quic_disconnect_transport (ctx); + // This removes the session from the lookup table and frees it. + session_transport_delete_notify (&ctx->connection); + quic_ctx_free (ctx); +} + +u32 +quic_start_listen (u32 app_listen_session_index, transport_endpoint_t * tep) +{ + QUIC_DBG (2, "Called quic_start_listen"); + vnet_listen_args_t _bargs, *args = &_bargs; + quic_main_t *qm = &quic_main; + session_handle_t udp_handle; + session_endpoint_cfg_t *sep; + session_t *quic_listen_session, *app_listen_session; + app_worker_t *app_wrk; + application_t *app; + quic_ctx_t *lctx; + u32 lctx_index; + app_listener_t *app_listener; + + sep = (session_endpoint_cfg_t *) tep; + app_wrk = app_worker_get (sep->app_wrk_index); + app = application_get (app_wrk->app_index); + + allocate_quicly_ctx (app, 0 /* is_client */ ); + + sep->transport_proto = TRANSPORT_PROTO_UDP; + memset (args, 0, sizeof (*args)); + args->app_index = qm->app_index; + args->sep_ext = *sep; + args->sep_ext.ns_index = app->ns_index; + if (vnet_listen (args)) + return -1; + + lctx_index = quic_ctx_alloc (); // listener + udp_handle = args->handle; + app_listener = app_listener_get_w_handle (udp_handle); + quic_listen_session = app_listener_get_session (app_listener); + quic_listen_session->opaque = lctx_index; + + app_listen_session = listen_session_get (app_listen_session_index); + + lctx = quic_ctx_get (lctx_index); // listener + lctx->is_listener = 1; + lctx->c_quic_ctx_id.parent_app_wrk_idx = sep->app_wrk_index; + lctx->c_quic_ctx_id.parent_app_id = app_wrk->app_index; + lctx->c_quic_ctx_id.quic_session = udp_handle; + lctx->c_quic_ctx_id.app_session = + listen_session_get_handle (app_listen_session); + lctx->c_quic_ctx_id.udp_is_ip4 = sep->is_ip4; + + QUIC_DBG (1, "Started listening %d", lctx_index); + return lctx_index; +} + +u32 +quic_stop_listen (u32 lctx_index) +{ + QUIC_DBG (2, "Called quic_stop_listen"); + quic_ctx_t *lctx; + + lctx = quic_ctx_get (lctx_index); // listener + vnet_unlisten_args_t a = { + .handle = lctx->c_quic_ctx_id.quic_session, + .app_index = quic_main.app_index, + .wrk_map_index = 0 /* default wrk */ + }; + if (vnet_unlisten (&a)) + clib_warning ("unlisten errored"); + + // TODO: crypto state cleanup + + quic_ctx_free (lctx); // listener + return 0; +} + +transport_connection_t * +quic_connection_get (u32 ctx_index, u32 thread_index) +{ + QUIC_DBG (2, "Called quic_connection_get"); + quic_ctx_t *ctx; + ctx = quic_ctx_get_w_thread (ctx_index, thread_index); + return &ctx->connection; +} + +transport_connection_t * +quic_listener_get (u32 listener_index) +{ + QUIC_DBG (2, "Called quic_listener_get"); + quic_ctx_t *ctx; + ctx = quic_ctx_get (listener_index); + return &ctx->connection; +} + +static void +quic_update_time (f64 now, u8 thread_index) +{ + tw_timer_wheel_1t_3w_1024sl_ov_t *tw; + + tw = &quic_main.wrk_ctx[thread_index].timer_wheel; + quic_set_time_now (thread_index); + tw_timer_expire_timers_1t_3w_1024sl_ov (tw, now); +} + +static u8 * +format_quic_connection (u8 * s, va_list * args) +{ + s = format (s, "[QUIC] connection"); + return s; +} + +static u8 * +format_quic_half_open (u8 * s, va_list * args) +{ + u32 qc_index = va_arg (*args, u32); + quic_ctx_t *ctx = quic_ctx_half_open_get (qc_index); + s = format (s, "[QUIC] half-open app %u", ctx->c_quic_ctx_id.parent_app_id); + quic_ctx_half_open_reader_unlock (); + return s; +} + +// TODO improve +static u8 * +format_quic_listener (u8 * s, va_list * args) +{ + s = format (s, "[QUIC] listener"); + return s; +} + +/***************************************************************************** + * END TRANSPORT PROTO FUNCTIONS + * + * START SESSION CALLBACKS + * Called from UDP layer + *****************************************************************************/ + +static inline void +quic_build_sockaddr (struct sockaddr *sa, socklen_t * salen, + ip46_address_t * addr, u16 port, u8 is_ip4) +{ + if (is_ip4) + { + struct sockaddr_in *sa4 = (struct sockaddr_in *) sa; + sa4->sin_family = AF_INET; + sa4->sin_port = port; + sa4->sin_addr.s_addr = addr->ip4.as_u32; + *salen = sizeof (struct sockaddr_in); + } + else + { + struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) sa; + sa6->sin6_family = AF_INET6; + sa6->sin6_port = port; + clib_memcpy (&sa6->sin6_addr, &addr->ip6, 16); + *salen = sizeof (struct sockaddr_in6); + } +} + +static int +quic_delayed_notify_app_connected (void *ctx_index) +{ + QUIC_DBG (1, "quic_notify_app_connected"); + session_t *app_session; + app_worker_t *app_wrk; + quic_ctx_t *ctx; + ctx = quic_ctx_get ((u32) (u64) ctx_index); + + app_wrk = app_worker_get_if_valid (ctx->c_quic_ctx_id.parent_app_wrk_idx); + if (!app_wrk) + { + quic_disconnect_transport (ctx); + return -1; + } + + app_session = session_alloc (ctx->c_thread_index); + QUIC_DBG (1, "Created app_session, id %u", app_session->session_index); + ctx->c_s_index = app_session->session_index; + app_session->app_wrk_index = ctx->c_quic_ctx_id.parent_app_wrk_idx; + app_session->connection_index = ctx->c_c_index; + app_session->session_type = + session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, + ctx->c_quic_ctx_id.udp_is_ip4); + + if (app_worker_init_connected (app_wrk, app_session)) // TODO dont allocate fifos + { + quic_disconnect (ctx->c_c_index, vlib_get_thread_index ()); + return app_worker_connect_notify (app_wrk, NULL, ctx->client_opaque); + } + + app_session->session_state = SESSION_STATE_CONNECTING; + if (app_worker_connect_notify (app_wrk, app_session, ctx->client_opaque)) + { + QUIC_DBG (1, "failed to notify app"); + quic_disconnect (ctx->c_c_index, vlib_get_thread_index ()); + return -1; + } + + ctx->c_quic_ctx_id.app_session = session_handle (app_session); + app_session->session_state = SESSION_STATE_LISTENING; + session_lookup_add_connection (&ctx->connection, + session_handle (app_session)); + + return 0; +} + +int +quic_session_connected_callback (u32 quic_app_index, u32 ho_ctx_idx, + session_t * s, u8 is_fail) +{ + QUIC_DBG (2, "Called quic_session_connected_callback"); + // This should always be called before quic_connect returns since UDP always + // connects instantly. + struct sockaddr_in6 sa6; + struct sockaddr *sa = (struct sockaddr *) &sa6; + socklen_t salen; + transport_connection_t *tc; + quic_ctx_t *ho_ctx, *ctx; + u32 ctx_index; + int ret; + application_t *app; + app_worker_t *app_wrk; + + ho_ctx = quic_ctx_half_open_get (ho_ctx_idx); + if (is_fail) + { + int (*cb_fn) (u32, u32, session_t *, u8), rv = 0; + u32 wrk_index, api_context; + + wrk_index = ho_ctx->c_quic_ctx_id.parent_app_wrk_idx; + app_wrk = + app_worker_get_if_valid (ho_ctx->c_quic_ctx_id.parent_app_wrk_idx); + if (app_wrk) + { + api_context = ho_ctx->c_s_index; + app = application_get (app_wrk->app_index); + cb_fn = app->cb_fns.session_connected_callback; + rv = cb_fn (wrk_index, api_context, 0, 1 /* failed */ ); + } + quic_ctx_half_open_reader_unlock (); + quic_ctx_half_open_free (ho_ctx_idx); + return rv; + } + + app_wrk = + app_worker_get_if_valid (ho_ctx->c_quic_ctx_id.parent_app_wrk_idx); + if (!app_wrk) + { + QUIC_DBG (1, "Appwrk not found"); + return -1; + } + app = application_get (app_wrk->app_index); + + ctx_index = quic_ctx_alloc (); + ctx = quic_ctx_get (ctx_index); + clib_memcpy (ctx, ho_ctx, sizeof (*ctx)); + quic_ctx_half_open_reader_unlock (); // TODO: this is a race + quic_ctx_half_open_free (ho_ctx_idx); + + ctx->c_thread_index = vlib_get_thread_index (); + ctx->c_c_index = ctx_index; + + QUIC_DBG (1, "Quic connect for returned %u. New connection [%u]%x", + is_fail, vlib_get_thread_index (), (ctx) ? ctx_index : ~0); + + ctx->c_quic_ctx_id.quic_session = session_handle (s); + s->opaque = ctx_index; + s->session_state = SESSION_STATE_READY; + + // Init QUIC lib connection + // Generate required sockaddr & salen + tc = session_get_transport (s); + quic_build_sockaddr (sa, &salen, &tc->rmt_ip, tc->rmt_port, tc->is_ip4); + + ret = + quicly_connect (&ctx->c_quic_ctx_id.conn, + (quicly_context_t *) app->quicly_ctx, + (char *) ctx->srv_hostname, sa, salen, + &quic_main.next_cid, &quic_main.hs_properties, NULL); + ++quic_main.next_cid.master_id; + // Save context handle in quicly connection + *quicly_get_data (ctx->c_quic_ctx_id.conn) = (void *) (u64) ctx_index; + assert (ret == 0); + + if (quic_send_packets (ctx)) + { + quic_connection_closed (ctx_index); + } + return ret; +} + +void +quic_session_disconnect_callback (session_t * s) +{ + clib_warning ("UDP session disconnected???"); +} + +void +quic_session_reset_callback (session_t * s) +{ + clib_warning ("UDP session reset???"); +} + +static int +quic_add_segment_callback (u32 client_index, u64 seg_handle) +{ + QUIC_DBG (2, "Called quic_add_segment_callback"); + QUIC_DBG (2, "NOT IMPLEMENTED"); + /* No-op for builtin */ + return 0; +} + +static int +quic_del_segment_callback (u32 client_index, u64 seg_handle) +{ + QUIC_DBG (2, "Called quic_del_segment_callback"); + QUIC_DBG (2, "NOT IMPLEMENTED"); + /* No-op for builtin */ + return 0; +} + +int +quic_add_vpp_q_builtin_tx_evt (session_t * s) +{ + if (svm_fifo_set_event (s->tx_fifo)) + session_send_io_evt_to_thread_custom (s, s->thread_index, + FIFO_EVENT_BUILTIN_TX); + return 0; +} + +void +quic_open_stream_if_ready (quic_ctx_t * ctx) +{ + quicly_conn_t *conn = ctx->c_quic_ctx_id.conn; + if (ctx->stream) + { + QUIC_DBG (2, "----------- > FOUND Stream id %d", + ctx->stream->stream_id); + QUIC_DBG (2, "----------- > FOUND Stream is_open %d", + ctx->stream->sendstate.is_open); + return; + } + if (quicly_connection_is_ready (conn)) + assert (!quicly_open_stream (conn, &ctx->stream, 0)); + QUIC_DBG (2, "Stream id %d", ctx->stream->stream_id); + QUIC_DBG (2, "Stream is_open %d", ctx->stream->sendstate.is_open); +} + +int +quic_custom_tx_callback (void *session) +{ + QUIC_DBG (2, "Called quic_custom_tx_callback"); + session_t *app_session = (session_t *) session; + quic_ctx_t *ctx; + svm_fifo_t *f; + u32 deq_max; + u8 *data; + + if (PREDICT_FALSE + (app_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING)) + return 0; + ctx = quic_ctx_get (app_session->connection_index); + quic_open_stream_if_ready (ctx); + if (!ctx->stream) + { + quic_add_vpp_q_builtin_tx_evt (app_session); + return 0; + } + + f = app_session->tx_fifo; + deq_max = svm_fifo_max_dequeue (f); + if (!deq_max) + return 0; + + data = svm_fifo_head (f); + if (quicly_streambuf_egress_write (ctx->stream, data, deq_max)) + { + assert (0); + return 0; + } + QUIC_DBG (2, "Sent %u bytes", deq_max); + svm_fifo_dequeue_drop (f, deq_max); + if (quic_send_packets (ctx)) + { + quic_connection_closed (ctx->c_c_index); + } + return 0; +} + +int +quic_find_packet_ctx (quic_ctx_t ** ctx, quicly_conn_t ** conn, + struct sockaddr *sa, socklen_t salen, + quicly_decoded_packet_t packet) +{ + quic_ctx_t *ctx_; + quicly_conn_t *conn_; + /* *INDENT-OFF* */ + pool_foreach (ctx_, quic_main.ctx_pool[vlib_get_thread_index()], + ({ + conn_ = ctx_->c_quic_ctx_id.conn; + if (conn_ && !ctx_->is_listener) + { + if (quicly_is_destination(conn_, sa, salen, &packet)) + { + *conn = conn_; + *ctx = ctx_; + QUIC_DBG (2, "connection_found"); + return 0; + } + } + })); + /* *INDENT-ON* */ + return 0; +} + +static int +quic_receive (quic_ctx_t * ctx, quicly_conn_t * conn, + quicly_decoded_packet_t packet) +{ + quicly_receive (conn, &packet); + // Conn may be set to null if the connection is terminated + if (ctx->c_quic_ctx_id.conn + && ctx->c_quic_ctx_id.conn_state == QUIC_CONN_STATE_HANDSHAKE) + { + if (quicly_connection_is_ready (conn)) + { + ctx->c_quic_ctx_id.conn_state = QUIC_CONN_STATE_READY; + if (quicly_is_client (conn)) + session_send_rpc_evt_to_thread_force (vlib_get_thread_index (), + &quic_delayed_notify_app_connected, + (void *) (u64) + ctx->c_c_index); + } + } + if (quic_send_packets (ctx)) + { + quic_connection_closed (ctx->c_c_index); + } + return 0; +} + +static int +quic_delayed_create_app_session (void *ctx_index) +{ + quic_ctx_t *lctx, *ctx; + session_t *app_session, *app_listen_session; + app_worker_t *app_wrk; + int rv; + + ctx = quic_ctx_get ((u32) (u64) ctx_index); + app_session = session_alloc (ctx->c_thread_index); + app_session->session_state = SESSION_STATE_LISTENING; + ctx->c_s_index = app_session->session_index; + + lctx = quic_ctx_get (ctx->c_quic_ctx_id.listener_ctx_id); + + app_listen_session = + listen_session_get_from_handle (lctx->c_quic_ctx_id.app_session); + app_session->app_wrk_index = lctx->c_quic_ctx_id.parent_app_wrk_idx; + app_session->connection_index = ctx->c_c_index; + app_session->session_type = app_listen_session->session_type; + app_session->listener_index = app_listen_session->session_index; + app_session->app_index = quic_main.app_index; + + // TODO: don't alloc fifos when we don't transfer data on this session + if ((rv = app_worker_init_accepted (app_session))) + { + QUIC_DBG (1, "failed to allocate fifos"); + session_free (app_session); + return rv; + } + ctx->c_quic_ctx_id.app_session = session_handle (app_session); + ctx->c_quic_ctx_id.parent_app_id = lctx->c_quic_ctx_id.parent_app_id; + ctx->c_quic_ctx_id.udp_is_ip4 = lctx->c_quic_ctx_id.udp_is_ip4; + ctx->c_quic_ctx_id.parent_app_wrk_idx = app_session->app_wrk_index; + session_lookup_add_connection (&ctx->connection, + session_handle (app_session)); + app_wrk = app_worker_get (app_session->app_wrk_index); + rv = app_worker_accept_notify (app_wrk, app_session); + if (rv) + { + QUIC_DBG (1, "failed to notify accept worker app"); + return rv; + } + return 0; +} + +static int +quic_create_connection (quicly_context_t * quicly_ctx, + u64 quic_session_handle, u32 lctx_index, + quicly_conn_t * conn, struct sockaddr *sa, + socklen_t salen, quicly_decoded_packet_t packet) +{ + quic_ctx_t *ctx; + u32 ctx_index; + + /* new connection, accept and create context if packet is valid */ + // TODO: check if socket is actually listening? + QUIC_DBG (2, "New connection created"); + if (quicly_accept (&conn, quicly_ctx, sa, salen, + &packet, ptls_iovec_init (NULL, 0), + &quic_main.next_cid, NULL) != 0) + { + // Invalid packet, pass + assert (conn == NULL); + QUIC_DBG (2, "Accept failed"); + return 0; + } + assert (conn != NULL); + + ++quic_main.next_cid.master_id; + // Create context + ctx_index = quic_ctx_alloc (); + ctx = quic_ctx_get (ctx_index); + // Save ctx handle in quicly connection + *quicly_get_data (conn) = (void *) (u64) ctx_index; + + ctx->c_thread_index = vlib_get_thread_index (); + ctx->c_c_index = ctx_index; + ctx->c_quic_ctx_id.quic_session = quic_session_handle; + ctx->c_quic_ctx_id.listener_ctx_id = lctx_index; + ctx->c_quic_ctx_id.timer_handle = QUIC_TIMER_HANDLE_INVALID; + ctx->c_quic_ctx_id.conn = conn; + + session_send_rpc_evt_to_thread_force (vlib_get_thread_index (), + &quic_delayed_create_app_session, + (void *) (u64) ctx_index); + if (quic_send_packets (ctx)) + { + quic_connection_closed (ctx_index); + } + return 0; +} + +static int +quic_reset_connection (quicly_context_t * quicly_ctx, u64 quic_session_handle, + struct sockaddr *sa, socklen_t salen, + quicly_decoded_packet_t packet) +{ + /* short header packet; potentially a dead connection. No need to check the length of the incoming packet, + * because loop is prevented by authenticating the CID (by checking node_id and thread_id). If the peer is also + * sending a reset, then the next CID is highly likely to contain a non-authenticating CID, ... */ + QUIC_DBG (2, "Sending stateless reset"); + quicly_datagram_t *dgram; + session_t *quic_session; + if (packet.cid.dest.plaintext.node_id == 0 + && packet.cid.dest.plaintext.thread_id == 0) + { + dgram = quicly_send_stateless_reset (quicly_ctx, sa, salen, + &packet.cid.dest.plaintext); + quic_session = session_get_from_handle (quic_session_handle); + if (quic_send_datagram (quic_session, dgram)) // TODO : missing event on fifo + QUIC_DBG (2, "Send reset failed"); + } + return 0; +} + +int +quic_app_rx_callback (session_t * quic_session) +{ + // Read data from UDP rx_fifo and pass it to the quicly conn. + QUIC_DBG (2, "Called quic_app_rx_callback"); + + quicly_decoded_packet_t packet; + session_dgram_hdr_t ph; + application_t *app; + quicly_conn_t *conn = NULL; + quic_ctx_t *lctx, *ctx = NULL; + svm_fifo_t *f; + size_t plen; + struct sockaddr_in6 sa6; + struct sockaddr *sa = (struct sockaddr *) &sa6; + socklen_t salen; + u32 max_deq, len; + u8 *data; + u32 lctx_index = quic_session->opaque; + u64 quic_session_handle = session_handle (quic_session); + + f = quic_session->rx_fifo; + + do + { + conn = NULL; + max_deq = svm_fifo_max_dequeue (f); + if (max_deq < sizeof (session_dgram_hdr_t)) + { + svm_fifo_unset_event (f); + return 0; + } + QUIC_DBG (2, "Processing one packet at %ld", quic_get_time (NULL)); + + svm_fifo_unset_event (f); + svm_fifo_peek (f, 0, sizeof (ph), (u8 *) & ph); + ASSERT (ph.data_length >= ph.data_offset); + len = ph.data_length - ph.data_offset; + + quic_build_sockaddr (sa, &salen, &ph.rmt_ip, ph.rmt_port, ph.is_ip4); + + // Quicly can read len bytes from the fifo at offset: + // ph.data_offset + SESSION_CONN_HDR_LEN + data = svm_fifo_head (f) + ph.data_offset + SESSION_CONN_HDR_LEN; + + lctx = quic_ctx_get (lctx_index); + app = application_get (lctx->c_quic_ctx_id.parent_app_id); + + plen = + quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, &packet, + data, len); + if (plen != SIZE_MAX) + { + quic_find_packet_ctx (&ctx, &conn, sa, salen, packet); + if (conn != NULL) + quic_receive (ctx, conn, packet); + else if (QUICLY_PACKET_IS_LONG_HEADER (packet.octets.base[0])) + quic_create_connection ((quicly_context_t *) app->quicly_ctx, + quic_session_handle, lctx_index, conn, + sa, salen, packet); + else if (((quicly_context_t *) app->quicly_ctx)->encrypt_cid) + quic_reset_connection ((quicly_context_t *) app->quicly_ctx, + quic_session_handle, sa, salen, packet); + } + svm_fifo_dequeue_drop (f, + ph.data_length + ph.data_offset + + SESSION_CONN_HDR_LEN); + } + while (1); + return 0; +} + +/***************************************************************************** + * END TRANSPORT PROTO FUNCTIONS +*****************************************************************************/ + +/* *INDENT-OFF* */ +static session_cb_vft_t quic_app_cb_vft = { + .session_accept_callback = NULL, + .session_disconnect_callback = quic_session_disconnect_callback, + .session_connected_callback = quic_session_connected_callback, + .session_reset_callback = quic_session_reset_callback, + .add_segment_callback = quic_add_segment_callback, + .del_segment_callback = quic_del_segment_callback, + .builtin_app_rx_callback = quic_app_rx_callback, +}; + +const static transport_proto_vft_t quic_proto = { + .connect = quic_connect, + .close = quic_disconnect, + .start_listen = quic_start_listen, + .stop_listen = quic_stop_listen, + .get_connection = quic_connection_get, + .get_listener = quic_listener_get, + .update_time = quic_update_time, + .custom_tx = quic_custom_tx_callback, + .tx_type = TRANSPORT_TX_INTERNAL, + .service_type = TRANSPORT_SERVICE_APP, + .format_connection = format_quic_connection, + .format_half_open = format_quic_half_open, + .format_listener = format_quic_listener, +}; +/* *INDENT-ON* */ + +static clib_error_t * +quic_init (vlib_main_t * vm) +{ + QUIC_DBG (2, "Called quic_init"); + vlib_thread_main_t *vtm = vlib_get_thread_main (); + vnet_app_attach_args_t _a, *a = &_a; + u64 options[APP_OPTIONS_N_OPTIONS]; + u32 segment_size = 512 << 20; + quic_main_t *qm = &quic_main; + u32 fifo_size = 64 << 10; + u32 num_threads; + + num_threads = 1 /* main thread */ + vtm->n_threads; + + memset (a, 0, sizeof (*a)); + memset (options, 0, sizeof (options)); + + a->session_cb_vft = &quic_app_cb_vft; + a->api_client_index = APP_INVALID_INDEX; + a->options = options; + a->name = format (0, "quic"); + a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size; + a->options[APP_OPTIONS_RX_FIFO_SIZE] = fifo_size; + a->options[APP_OPTIONS_TX_FIFO_SIZE] = fifo_size; + a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN; + a->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE; + a->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_IS_TRANSPORT_APP; + + if (vnet_application_attach (a)) + { + clib_warning ("failed to attach quic app"); + return clib_error_return (0, "failed to attach quic app"); + } + + vec_validate (qm->ctx_pool, num_threads - 1); + vec_validate (qm->wrk_ctx, num_threads - 1); + // Timers, one per thread. + tw_timer_wheel_1t_3w_1024sl_ov_t *tw; + /* *INDENT-OFF* */ + foreach_vlib_main (({ + tw = &qm->wrk_ctx[ii].timer_wheel; + tw_timer_wheel_init_1t_3w_1024sl_ov (tw, quic_expired_timers_dispatch, + 10e-3 /* timer period 1ms */ , ~0); + tw->last_run_time = vlib_time_now (this_vlib_main); + })); + /* *INDENT-ON* */ + + if (!qm->ca_cert_path) + qm->ca_cert_path = QUIC_DEFAULT_CA_CERT_PATH; + + qm->app_index = a->app_index; + clib_rwlock_init (&qm->half_open_rwlock); + qm->tstamp_ticks_per_clock = vm->clib_time.seconds_per_clock + / QUIC_TSTAMP_RESOLUTION; + + transport_register_protocol (TRANSPORT_PROTO_QUIC, &quic_proto, + FIB_PROTOCOL_IP4, ~0); + transport_register_protocol (TRANSPORT_PROTO_QUIC, &quic_proto, + FIB_PROTOCOL_IP6, ~0); + + vec_free (a->name); + return 0; +} + +quic_main_t * +vnet_quic_get_main (void) +{ + return &quic_main; +} + +VLIB_INIT_FUNCTION (quic_init); + +/* *INDENT-OFF* */ +VLIB_PLUGIN_REGISTER () = +{ + .version = VPP_BUILD_VER, + .description = "Quic transport protocol", +}; +/* *INDENT-ON* */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/quic/quic.h b/src/plugins/quic/quic.h new file mode 100644 index 00000000000..f574b29a583 --- /dev/null +++ b/src/plugins/quic/quic.h @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __included_quic_h__ +#define __included_quic_h__ + +#include + +#include +#include + +#include + +#define QUIC_DEBUG 4 +#define QUIC_DEBUG_LEVEL_CLIENT 0 +#define QUIC_DEBUG_LEVEL_SERVER 0 + +#define QUIC_DEFAULT_CA_CERT_PATH "/etc/ssl/certs/ca-certificates.crt" + +#define QUIC_TIMER_HANDLE_INVALID ((u32) ~0) + +#define QUIC_TSTAMP_RESOLUTION 0.001 /* QUIC tick resolution (1ms) */ + + +#if QUIC_DEBUG +#define QUIC_DBG(_lvl, _fmt, _args...) \ + if (_lvl <= QUIC_DEBUG) \ + clib_warning (_fmt, ##_args) +#else +#define QUIC_DBG(_lvl, _fmt, _args...) +#endif + +#define QUIC_CONN_STATE_HANDSHAKE 0 +#define QUIC_CONN_STATE_READY 1 + + +/* *INDENT-OFF* */ +typedef CLIB_PACKED (struct quic_ctx_id_ +{ + session_handle_t app_session; + session_handle_t quic_session; + u32 parent_app_wrk_idx; + u32 parent_app_id; + u32 listener_ctx_id; + u32 timer_handle; + quicly_conn_t *conn; + u8 udp_is_ip4; + u8 conn_state; +}) quic_ctx_id_t; +/* *INDENT-ON* */ + +STATIC_ASSERT (sizeof (quic_ctx_id_t) <= 42, "ctx id must be less than 42"); + +typedef struct quic_ctx_ +{ + union + { + transport_connection_t connection; + quic_ctx_id_t c_quic_ctx_id; + }; + + quicly_stream_t *stream; + u8 *srv_hostname; + u32 client_opaque; + u8 is_listener; +} quic_ctx_t; + +typedef struct quic_worker_ctx_ +{ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); + u32 time_now; /**< worker time */ + tw_timer_wheel_1t_3w_1024sl_ov_t timer_wheel; /**< worker timer wheel */ + u32 *tx_buffers; /**< tx buffer free list */ +} quic_worker_ctx_t; + +typedef struct quic_main_ +{ + u32 app_index; + quic_ctx_t *half_open_ctx_pool; + quic_ctx_t **ctx_pool; + clib_rwlock_t half_open_rwlock; + quic_worker_ctx_t *wrk_ctx; + f64 tstamp_ticks_per_clock; + + + /* + * Config + */ + quicly_context_t quicly_ctx; + ptls_handshake_properties_t hs_properties; + quicly_cid_plaintext_t next_cid; + u8 use_test_cert_in_ca; + char *ca_cert_path; +} quic_main_t; + +quic_main_t *vnet_quic_get_main (void); + +#endif /* __included_quic_h__ */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/tests/vnet/session/quic_echo.c b/src/tests/vnet/session/quic_echo.c new file mode 100644 index 00000000000..e109fba9ad8 --- /dev/null +++ b/src/tests/vnet/session/quic_echo.c @@ -0,0 +1,1412 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +#include + +#define vl_typedefs /* define message structures */ +#include +#undef vl_typedefs + +/* declare message handlers for each api */ + +#define vl_endianfun /* define message structures */ +#include +#undef vl_endianfun + +/* instantiate all the print functions we know about */ +#define vl_print(handle, ...) +#define vl_printfun +#include +#undef vl_printfun + +#define TCP_ECHO_DBG 0 +#define DBG(_fmt,_args...) \ + if (TCP_ECHO_DBG) \ + clib_warning (_fmt, _args) + +typedef struct +{ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); +#define _(type, name) type name; + foreach_app_session_field +#undef _ + u64 vpp_session_handle; + u64 bytes_sent; + u64 bytes_to_send; + volatile u64 bytes_received; + volatile u64 bytes_to_receive; + f64 start; +} echo_session_t; + +typedef enum +{ + STATE_START, + STATE_ATTACHED, + STATE_LISTEN, + STATE_READY, + STATE_DISCONNECTING, + STATE_FAILED, + STATE_DETACHED +} connection_state_t; + +typedef struct +{ + /* vpe input queue */ + svm_queue_t *vl_input_queue; + + /* API client handle */ + u32 my_client_index; + + /* The URI we're playing with */ + u8 *uri; + + /* Session pool */ + echo_session_t *sessions; + + /* Hash table for disconnect processing */ + uword *session_index_by_vpp_handles; + + /* intermediate rx buffer */ + u8 *rx_buf; + + /* URI for slave's connect */ + u8 *connect_uri; + + u32 connected_session_index; + + int i_am_master; + + /* drop all packets */ + int no_return; + + /* Our event queue */ + svm_msg_q_t *our_event_queue; + + u8 *socket_name; + + pid_t my_pid; + + /* For deadman timers */ + clib_time_t clib_time; + + /* State of the connection, shared between msg RX thread and main thread */ + volatile connection_state_t state; + + /* Signal variables */ + volatile int time_to_stop; + volatile int time_to_print_stats; + + u32 configured_segment_size; + + /* VNET_API_ERROR_FOO -> "Foo" hash table */ + uword *error_string_by_error_number; + + u8 *connect_test_data; + pthread_t *client_thread_handles; + u32 *thread_args; + u32 client_bytes_received; + u8 test_return_packets; + u64 bytes_to_send; + u32 fifo_size; + + u32 n_clients; + u64 tx_total; + u64 rx_total; + + volatile u32 n_clients_connected; + volatile u32 n_active_clients; + + + /** Flag that decides if socket, instead of svm, api is used to connect to + * vpp. If sock api is used, shm binary api is subsequently bootstrapped + * and all other messages are exchanged using shm IPC. */ + u8 use_sock_api; + + svm_fifo_segment_main_t segment_main; +} echo_main_t; + +echo_main_t echo_main; + +#if CLIB_DEBUG > 0 +#define NITER 10000 +#else +#define NITER 4000000 +#endif + +static u8 * +format_api_error (u8 * s, va_list * args) +{ + echo_main_t *em = &echo_main; + i32 error = va_arg (*args, u32); + uword *p; + + p = hash_get (em->error_string_by_error_number, -error); + + if (p) + s = format (s, "%s", p[0]); + else + s = format (s, "%d", error); + return s; +} + +static void +init_error_string_table (echo_main_t * em) +{ + em->error_string_by_error_number = hash_create (0, sizeof (uword)); + +#define _(n,v,s) hash_set (em->error_string_by_error_number, -v, s); + foreach_vnet_api_error; +#undef _ + + hash_set (em->error_string_by_error_number, 99, "Misc"); +} + +static void handle_mq_event (session_event_t * e); + +static int +wait_for_state_change (echo_main_t * em, connection_state_t state) +{ + svm_msg_q_msg_t msg; + session_event_t *e; + f64 timeout; + +#if CLIB_DEBUG > 0 +#define TIMEOUT 600.0 +#else +#define TIMEOUT 600.0 +#endif + + timeout = clib_time_now (&em->clib_time) + TIMEOUT; + + while (clib_time_now (&em->clib_time) < timeout) + { + if (em->state == state) + return 0; + if (em->state == STATE_FAILED) + return -1; + if (em->time_to_stop == 1) + return 0; + if (!em->our_event_queue || em->state < STATE_ATTACHED) + continue; + + if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0)) + continue; + e = svm_msg_q_msg_data (em->our_event_queue, &msg); + handle_mq_event (e); + svm_msg_q_free_msg (em->our_event_queue, &msg); + } + clib_warning ("timeout waiting for state %d", state); + return -1; +} + +void +application_send_attach (echo_main_t * em) +{ + vl_api_application_attach_t *bmp; + vl_api_application_tls_cert_add_t *cert_mp; + vl_api_application_tls_key_add_t *key_mp; + + bmp = vl_msg_api_alloc (sizeof (*bmp)); + clib_memset (bmp, 0, sizeof (*bmp)); + + bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH); + bmp->client_index = em->my_client_index; + bmp->context = ntohl (0xfeedface); + bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ACCEPT_REDIRECT; + bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ADD_SEGMENT; + bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16; + bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = em->fifo_size; + bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = em->fifo_size; + bmp->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20; + bmp->options[APP_OPTIONS_SEGMENT_SIZE] = 256 << 20; + bmp->options[APP_OPTIONS_EVT_QUEUE_SIZE] = 256; + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & bmp); + + cert_mp = vl_msg_api_alloc (sizeof (*cert_mp) + test_srv_crt_rsa_len); + clib_memset (cert_mp, 0, sizeof (*cert_mp)); + cert_mp->_vl_msg_id = ntohs (VL_API_APPLICATION_TLS_CERT_ADD); + cert_mp->client_index = em->my_client_index; + cert_mp->context = ntohl (0xfeedface); + cert_mp->cert_len = clib_host_to_net_u16 (test_srv_crt_rsa_len); + clib_memcpy_fast (cert_mp->cert, test_srv_crt_rsa, test_srv_crt_rsa_len); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cert_mp); + + key_mp = vl_msg_api_alloc (sizeof (*key_mp) + test_srv_key_rsa_len); + clib_memset (key_mp, 0, sizeof (*key_mp) + test_srv_key_rsa_len); + key_mp->_vl_msg_id = ntohs (VL_API_APPLICATION_TLS_KEY_ADD); + key_mp->client_index = em->my_client_index; + key_mp->context = ntohl (0xfeedface); + key_mp->key_len = clib_host_to_net_u16 (test_srv_key_rsa_len); + clib_memcpy_fast (key_mp->key, test_srv_key_rsa, test_srv_key_rsa_len); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & key_mp); +} + +static int +application_attach (echo_main_t * em) +{ + application_send_attach (em); + if (wait_for_state_change (em, STATE_ATTACHED)) + { + clib_warning ("timeout waiting for STATE_ATTACHED"); + return -1; + } + return 0; +} + +void +application_detach (echo_main_t * em) +{ + vl_api_application_detach_t *bmp; + bmp = vl_msg_api_alloc (sizeof (*bmp)); + clib_memset (bmp, 0, sizeof (*bmp)); + + bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_DETACH); + bmp->client_index = em->my_client_index; + bmp->context = ntohl (0xfeedface); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & bmp); + + DBG ("%s", "Sent detach"); +} + +static int +ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd) +{ + svm_fifo_segment_create_args_t _a, *a = &_a; + svm_fifo_segment_main_t *sm = &echo_main.segment_main; + int rv; + + clib_memset (a, 0, sizeof (*a)); + a->segment_name = (char *) name; + a->segment_type = type; + + if (type == SSVM_SEGMENT_MEMFD) + a->memfd_fd = fd; + + if ((rv = svm_fifo_segment_attach (sm, a))) + { + clib_warning ("svm_fifo_segment_attach ('%s') failed", name); + return rv; + } + + vec_reset_length (a->new_segment_indices); + return 0; +} + +static void +vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * + mp) +{ + echo_main_t *em = &echo_main; + int *fds = 0; + u32 n_fds = 0; + + if (mp->retval) + { + clib_warning ("attach failed: %U", format_api_error, + clib_net_to_host_u32 (mp->retval)); + goto failed; + } + + if (mp->segment_name_length == 0) + { + clib_warning ("segment_name_length zero"); + goto failed; + } + + ASSERT (mp->app_event_queue_address); + em->our_event_queue = uword_to_pointer (mp->app_event_queue_address, + svm_msg_q_t *); + + if (mp->n_fds) + { + vec_validate (fds, mp->n_fds); + vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5); + + if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT) + if (ssvm_segment_attach (0, SSVM_SEGMENT_MEMFD, fds[n_fds++])) + goto failed; + + if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT) + if (ssvm_segment_attach ((char *) mp->segment_name, + SSVM_SEGMENT_MEMFD, fds[n_fds++])) + goto failed; + + if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) + svm_msg_q_set_consumer_eventfd (em->our_event_queue, fds[n_fds++]); + + vec_free (fds); + } + else + { + if (ssvm_segment_attach ((char *) mp->segment_name, SSVM_SEGMENT_SHM, + -1)) + goto failed; + } + + em->state = STATE_ATTACHED; + return; +failed: + em->state = STATE_FAILED; + return; +} + +static void +vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t * + mp) +{ + if (mp->retval) + clib_warning ("detach returned with err: %d", mp->retval); + echo_main.state = STATE_DETACHED; +} + +static void +stop_signal (int signum) +{ + echo_main_t *um = &echo_main; + um->time_to_stop = 1; +} + +static void +stats_signal (int signum) +{ + echo_main_t *um = &echo_main; + um->time_to_print_stats = 1; +} + +static clib_error_t * +setup_signal_handlers (void) +{ + signal (SIGUSR2, stats_signal); + signal (SIGINT, stop_signal); + signal (SIGQUIT, stop_signal); + signal (SIGTERM, stop_signal); + return 0; +} + +void +vlib_cli_output (struct vlib_main_t *vm, char *fmt, ...) +{ + clib_warning ("BUG"); +} + +int +connect_to_vpp (char *name) +{ + echo_main_t *em = &echo_main; + api_main_t *am = &api_main; + + if (em->use_sock_api) + { + if (vl_socket_client_connect ((char *) em->socket_name, name, + 0 /* default rx, tx buffer */ )) + { + clib_warning ("socket connect failed"); + return -1; + } + + if (vl_socket_client_init_shm (0, 1 /* want_pthread */ )) + { + clib_warning ("init shm api failed"); + return -1; + } + } + else + { + if (vl_client_connect_to_vlib ("/vpe-api", name, 32) < 0) + { + clib_warning ("shmem connect failed"); + return -1; + } + } + em->vl_input_queue = am->shmem_hdr->vl_input_queue; + em->my_client_index = am->my_client_index; + return 0; +} + +void +disconnect_from_vpp (echo_main_t * em) +{ + if (em->use_sock_api) + vl_socket_client_disconnect (); + else + vl_client_disconnect_from_vlib (); +} + +static void +vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) +{ + svm_fifo_segment_main_t *sm = &echo_main.segment_main; + svm_fifo_segment_create_args_t _a, *a = &_a; + int rv; + + clib_memset (a, 0, sizeof (*a)); + a->segment_name = (char *) mp->segment_name; + a->segment_size = mp->segment_size; + /* Attach to the segment vpp created */ + rv = svm_fifo_segment_attach (sm, a); + if (rv) + { + clib_warning ("svm_fifo_segment_attach ('%s') failed", + mp->segment_name); + return; + } + clib_warning ("Mapped new segment '%s' size %d", mp->segment_name, + mp->segment_size); +} + +static void +session_print_stats (echo_main_t * em, echo_session_t * session) +{ + f64 deltat; + u64 bytes; + + deltat = clib_time_now (&em->clib_time) - session->start; + bytes = em->i_am_master ? session->bytes_received : em->bytes_to_send; + fformat (stdout, "Finished in %.6f\n", deltat); + fformat (stdout, "%.4f Gbit/second\n", (bytes * 8.0) / deltat / 1e9); +} + +static void +test_recv_bytes (echo_session_t * s, u8 * rx_buf, u32 n_read) +{ + int i; + for (i = 0; i < n_read; i++) + { + if (rx_buf[i] != ((s->bytes_received + i) & 0xff)) + { + clib_warning ("error at byte %lld, 0x%x not 0x%x", + s->bytes_received + i, rx_buf[i], + ((s->bytes_received + i) & 0xff)); + } + } +} + +static void +recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf) +{ + int n_to_read, n_read; + + n_to_read = svm_fifo_max_dequeue (s->rx_fifo); + if (!n_to_read) + return; + + do + { + n_read = app_recv_stream ((app_session_t *) s, rx_buf, + vec_len (rx_buf)); + + if (n_read > 0) + { + if (em->test_return_packets) + test_recv_bytes (s, rx_buf, n_read); + + n_to_read -= n_read; + + s->bytes_received += n_read; + s->bytes_to_receive -= n_read; + } + else + break; + } + while (n_to_read > 0); +} + +static void +send_data_chunk (echo_main_t * em, echo_session_t * s) +{ + u64 test_buf_len, bytes_this_chunk, test_buf_offset; + u8 *test_data = em->connect_test_data; + int n_sent; + + test_buf_len = vec_len (test_data); + test_buf_offset = s->bytes_sent % test_buf_len; + bytes_this_chunk = clib_min (test_buf_len - test_buf_offset, + s->bytes_to_send); + + n_sent = app_send_stream ((app_session_t *) s, test_data + test_buf_offset, + bytes_this_chunk, 0); + + if (n_sent > 0) + { + s->bytes_to_send -= n_sent; + s->bytes_sent += n_sent; + } +} + +/* + * Rx/Tx polling thread per connection + */ +static void * +client_thread_fn (void *arg) +{ + echo_main_t *em = &echo_main; + static u8 *rx_buf = 0; + u32 session_index = *(u32 *) arg; + echo_session_t *s; + + vec_validate (rx_buf, 1 << 20); + + while (!em->time_to_stop && em->state != STATE_READY) + ; + + s = pool_elt_at_index (em->sessions, session_index); + while (!em->time_to_stop) + { + send_data_chunk (em, s); + recv_data_chunk (em, s, rx_buf); + if (!s->bytes_to_send && !s->bytes_to_receive) + break; + } + + clib_warning ("GOT OUT"); + DBG ("session %d done", session_index); + em->tx_total += s->bytes_sent; + em->rx_total += s->bytes_received; + em->n_active_clients--; + + pthread_exit (0); +} + +void +client_send_connect (echo_main_t * em) +{ + vl_api_connect_uri_t *cmp; + cmp = vl_msg_api_alloc (sizeof (*cmp)); + clib_memset (cmp, 0, sizeof (*cmp)); + + cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI); + cmp->client_index = em->my_client_index; + cmp->context = ntohl (0xfeedface); + memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri)); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp); +} + +void +client_send_disconnect (echo_main_t * em, echo_session_t * s) +{ + vl_api_disconnect_session_t *dmp; + dmp = vl_msg_api_alloc (sizeof (*dmp)); + clib_memset (dmp, 0, sizeof (*dmp)); + dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION); + dmp->client_index = em->my_client_index; + dmp->handle = s->vpp_session_handle; + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp); +} + +int +client_disconnect (echo_main_t * em, echo_session_t * s) +{ + client_send_disconnect (em, s); + pool_put (em->sessions, s); + clib_memset (s, 0xfe, sizeof (*s)); + return 0; +} + +static void +session_bound_handler (session_bound_msg_t * mp) +{ + echo_main_t *em = &echo_main; + + if (mp->retval) + { + clib_warning ("bind failed: %U", format_api_error, + clib_net_to_host_u32 (mp->retval)); + em->state = STATE_FAILED; + return; + } + + clib_warning ("listening on %U:%u", format_ip46_address, mp->lcl_ip, + mp->lcl_is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6, mp->lcl_port); + em->state = STATE_READY; +} + +static void +session_accepted_handler (session_accepted_msg_t * mp) +{ + app_session_evt_t _app_evt, *app_evt = &_app_evt; + session_accepted_reply_msg_t *rmp; + svm_fifo_t *rx_fifo, *tx_fifo; + echo_main_t *em = &echo_main; + echo_session_t *session; + static f64 start_time; + u32 session_index; + u8 *ip_str; + + if (start_time == 0.0) + start_time = clib_time_now (&em->clib_time); + + ip_str = format (0, "%U", format_ip46_address, &mp->ip, mp->is_ip4); + clib_warning ("Accepted session from: %s:%d", ip_str, + clib_net_to_host_u16 (mp->port)); + + /* Allocate local session and set it up */ + pool_get (em->sessions, session); + session_index = session - em->sessions; + + rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); + rx_fifo->client_session_index = session_index; + tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); + tx_fifo->client_session_index = session_index; + + session->rx_fifo = rx_fifo; + session->tx_fifo = tx_fifo; + session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, + svm_msg_q_t *); + + /* Add it to lookup table */ + hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); + + em->state = STATE_READY; + + /* Stats printing */ + if (pool_elts (em->sessions) && (pool_elts (em->sessions) % 20000) == 0) + { + f64 now = clib_time_now (&em->clib_time); + fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n", + pool_elts (em->sessions), now - start_time, + (f64) pool_elts (em->sessions) / (now - start_time)); + } + + /* + * Send accept reply to vpp + */ + app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, + SESSION_CTRL_EVT_ACCEPTED_REPLY); + rmp = (session_accepted_reply_msg_t *) app_evt->evt->data; + rmp->handle = mp->handle; + rmp->context = mp->context; + app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); + + session->bytes_received = 0; + session->start = clib_time_now (&em->clib_time); +} + +static void +session_connected_handler (session_connected_msg_t * mp) +{ + echo_main_t *em = &echo_main; + echo_session_t *session; + u32 session_index; + svm_fifo_t *rx_fifo, *tx_fifo; + int rv; + + if (mp->retval) + { + clib_warning ("connection failed with code: %U", format_api_error, + clib_net_to_host_u32 (mp->retval)); + em->state = STATE_FAILED; + return; + } + + /* + * Setup session + */ + + pool_get (em->sessions, session); + clib_memset (session, 0, sizeof (*session)); + session_index = session - em->sessions; + + rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); + rx_fifo->client_session_index = session_index; + tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); + tx_fifo->client_session_index = session_index; + + session->rx_fifo = rx_fifo; + session->tx_fifo = tx_fifo; + session->vpp_session_handle = mp->handle; + session->start = clib_time_now (&em->clib_time); + session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, + svm_msg_q_t *); + + hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); + + /* + * Start RX thread + */ + em->thread_args[em->n_clients_connected] = session_index; + rv = pthread_create (&em->client_thread_handles[em->n_clients_connected], + NULL /*attr */ , client_thread_fn, + (void *) &em->thread_args[em->n_clients_connected]); + if (rv) + { + clib_warning ("pthread_create returned %d", rv); + return; + } + + em->n_clients_connected += 1; + clib_warning ("session %u (0x%llx) connected with local ip %U port %d", + session_index, mp->handle, format_ip46_address, mp->lcl_ip, + mp->is_ip4, clib_net_to_host_u16 (mp->lcl_port)); +} + +static void +session_disconnected_handler (session_disconnected_msg_t * mp) +{ + app_session_evt_t _app_evt, *app_evt = &_app_evt; + session_disconnected_reply_msg_t *rmp; + echo_main_t *em = &echo_main; + echo_session_t *session = 0; + uword *p; + int rv = 0; + + p = hash_get (em->session_index_by_vpp_handles, mp->handle); + if (!p) + { + clib_warning ("couldn't find session key %llx", mp->handle); + return; + } + + session = pool_elt_at_index (em->sessions, p[0]); + hash_unset (em->session_index_by_vpp_handles, mp->handle); + pool_put (em->sessions, session); + + app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, + SESSION_CTRL_EVT_DISCONNECTED_REPLY); + rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data; + rmp->retval = rv; + rmp->handle = mp->handle; + rmp->context = mp->context; + app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); + + session_print_stats (em, session); +} + +static void +session_reset_handler (session_reset_msg_t * mp) +{ + app_session_evt_t _app_evt, *app_evt = &_app_evt; + echo_main_t *em = &echo_main; + session_reset_reply_msg_t *rmp; + echo_session_t *session = 0; + uword *p; + int rv = 0; + + p = hash_get (em->session_index_by_vpp_handles, mp->handle); + + if (p) + { + session = pool_elt_at_index (em->sessions, p[0]); + clib_warning ("got reset"); + /* Cleanup later */ + em->time_to_stop = 1; + } + else + { + clib_warning ("couldn't find session key %llx", mp->handle); + return; + } + + app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, + SESSION_CTRL_EVT_RESET_REPLY); + rmp = (session_reset_reply_msg_t *) app_evt->evt->data; + rmp->retval = rv; + rmp->handle = mp->handle; + app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); +} + +static void +handle_mq_event (session_event_t * e) +{ + switch (e->event_type) + { + case SESSION_CTRL_EVT_BOUND: + session_bound_handler ((session_bound_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_ACCEPTED: + session_accepted_handler ((session_accepted_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_CONNECTED: + session_connected_handler ((session_connected_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_DISCONNECTED: + session_disconnected_handler ((session_disconnected_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_RESET: + session_reset_handler ((session_reset_msg_t *) e->data); + break; + default: + clib_warning ("unhandled %u", e->event_type); + } +} + +static void +clients_run (echo_main_t * em) +{ + f64 start_time, deltat, timeout = 100.0; + svm_msg_q_msg_t msg; + session_event_t *e; + echo_session_t *s; + int i; + + /* Init test data */ + vec_validate (em->connect_test_data, 1024 * 1024 - 1); + for (i = 0; i < vec_len (em->connect_test_data); i++) + em->connect_test_data[i] = i & 0xff; + + /* + * Attach and connect the clients + */ + if (application_attach (em)) + return; + + for (i = 0; i < em->n_clients; i++) + client_send_connect (em); + + start_time = clib_time_now (&em->clib_time); + while (em->n_clients_connected < em->n_clients + && (clib_time_now (&em->clib_time) - start_time < timeout) + && em->state != STATE_FAILED && em->time_to_stop != 1) + + { + int rc = svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1); + if (rc == ETIMEDOUT && em->time_to_stop) + break; + if (rc == ETIMEDOUT) + continue; + e = svm_msg_q_msg_data (em->our_event_queue, &msg); + handle_mq_event (e); + svm_msg_q_free_msg (em->our_event_queue, &msg); + } + + if (em->n_clients_connected != em->n_clients) + { + clib_warning ("failed to initialize all connections"); + return; + } + + /* + * Initialize connections + */ + for (i = 0; i < em->n_clients; i++) + { + s = pool_elt_at_index (em->sessions, i); + s->bytes_to_send = em->bytes_to_send; + if (!em->no_return) + s->bytes_to_receive = em->bytes_to_send; + } + em->n_active_clients = em->n_clients_connected; + + /* + * Wait for client threads to send the data + */ + start_time = clib_time_now (&em->clib_time); + em->state = STATE_READY; + while (em->n_active_clients) + if (!svm_msg_q_is_empty (em->our_event_queue)) + { + if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 0)) + { + clib_warning ("svm msg q returned"); + continue; + } + e = svm_msg_q_msg_data (em->our_event_queue, &msg); + if (e->event_type != FIFO_EVENT_APP_RX) + handle_mq_event (e); + svm_msg_q_free_msg (em->our_event_queue, &msg); + } + + for (i = 0; i < em->n_clients; i++) + { + s = pool_elt_at_index (em->sessions, i); + client_disconnect (em, s); + } + + /* + * Stats and detach + */ + deltat = clib_time_now (&em->clib_time) - start_time; + fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds\n", + em->tx_total, em->tx_total / (1ULL << 20), + em->tx_total / (1ULL << 30), deltat); + fformat (stdout, "%.4f Gbit/second\n", (em->tx_total * 8.0) / deltat / 1e9); + + application_detach (em); +} + +static void +vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp) +{ + echo_main_t *em = &echo_main; + + if (mp->retval) + { + clib_warning ("bind failed: %U", format_api_error, + clib_net_to_host_u32 (mp->retval)); + em->state = STATE_FAILED; + return; + } + + em->state = STATE_READY; +} + +static void +vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp) +{ + echo_main_t *em = &echo_main; + + if (mp->retval != 0) + clib_warning ("returned %d", ntohl (mp->retval)); + + em->state = STATE_START; +} + +u8 * +format_ip4_address (u8 * s, va_list * args) +{ + u8 *a = va_arg (*args, u8 *); + return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]); +} + +u8 * +format_ip6_address (u8 * s, va_list * args) +{ + ip6_address_t *a = va_arg (*args, ip6_address_t *); + u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon; + + i_max_n_zero = ARRAY_LEN (a->as_u16); + max_n_zeros = 0; + i_first_zero = i_max_n_zero; + n_zeros = 0; + for (i = 0; i < ARRAY_LEN (a->as_u16); i++) + { + u32 is_zero = a->as_u16[i] == 0; + if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16)) + { + i_first_zero = i; + n_zeros = 0; + } + n_zeros += is_zero; + if ((!is_zero && n_zeros > max_n_zeros) + || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros)) + { + i_max_n_zero = i_first_zero; + max_n_zeros = n_zeros; + i_first_zero = ARRAY_LEN (a->as_u16); + n_zeros = 0; + } + } + + last_double_colon = 0; + for (i = 0; i < ARRAY_LEN (a->as_u16); i++) + { + if (i == i_max_n_zero && max_n_zeros > 1) + { + s = format (s, "::"); + i += max_n_zeros - 1; + last_double_colon = 1; + } + else + { + s = format (s, "%s%x", + (last_double_colon || i == 0) ? "" : ":", + clib_net_to_host_u16 (a->as_u16[i])); + last_double_colon = 0; + } + } + + return s; +} + +/* Format an IP46 address. */ +u8 * +format_ip46_address (u8 * s, va_list * args) +{ + ip46_address_t *ip46 = va_arg (*args, ip46_address_t *); + ip46_type_t type = va_arg (*args, ip46_type_t); + int is_ip4 = 1; + + switch (type) + { + case IP46_TYPE_ANY: + is_ip4 = ip46_address_is_ip4 (ip46); + break; + case IP46_TYPE_IP4: + is_ip4 = 1; + break; + case IP46_TYPE_IP6: + is_ip4 = 0; + break; + } + + return is_ip4 ? + format (s, "%U", format_ip4_address, &ip46->ip4) : + format (s, "%U", format_ip6_address, &ip46->ip6); +} + +static void +server_handle_rx (echo_main_t * em, session_event_t * e) +{ + int n_read, max_dequeue, n_sent; + u32 offset, to_dequeue; + echo_session_t *s; + + s = pool_elt_at_index (em->sessions, e->session_index); + + /* Clear event only once. Otherwise, if we do it in the loop by calling + * app_recv_stream, we may end up with a lot of unhandled rx events on the + * message queue */ + svm_fifo_unset_event (s->rx_fifo); + + max_dequeue = svm_fifo_max_dequeue (s->rx_fifo); + if (PREDICT_FALSE (!max_dequeue)) + return; + + do + { + /* The options here are to limit ourselves to max_dequeue or read + * even the data that was enqueued while we were dequeueing and which + * now has an rx event in the mq. Either of the two work. */ + to_dequeue = clib_min (max_dequeue, vec_len (em->rx_buf)); + n_read = app_recv_stream_raw (s->rx_fifo, em->rx_buf, to_dequeue, + 0 /* clear evt */ , 0 /* peek */ ); + if (n_read > 0) + { + max_dequeue -= n_read; + s->bytes_received += n_read; + } + else + break; + + /* Reflect if a non-drop session */ + if (!em->no_return && n_read > 0) + { + offset = 0; + do + { + n_sent = app_send_stream ((app_session_t *) s, + &em->rx_buf[offset], + n_read, SVM_Q_WAIT); + if (n_sent > 0) + { + n_read -= n_sent; + offset += n_sent; + } + } + while ((n_sent <= 0 || n_read > 0) && !em->time_to_stop); + } + } + while (max_dequeue > 0 && !em->time_to_stop); +} + +static void +server_handle_mq (echo_main_t * em) +{ + svm_msg_q_msg_t msg; + session_event_t *e; + + while (1) + { + int rc = svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1); + if (PREDICT_FALSE (rc == ETIMEDOUT && em->time_to_stop)) + break; + if (PREDICT_FALSE (em->time_to_print_stats == 1)) + { + em->time_to_print_stats = 0; + fformat (stdout, "%d connections\n", pool_elts (em->sessions)); + } + if (rc == ETIMEDOUT) + continue; + e = svm_msg_q_msg_data (em->our_event_queue, &msg); + clib_warning ("Event %d", e->event_type); + switch (e->event_type) + { + case FIFO_EVENT_APP_RX: + server_handle_rx (em, e); + break; + default: + handle_mq_event (e); + break; + } + svm_msg_q_free_msg (em->our_event_queue, &msg); + } +} + +void +server_send_listen (echo_main_t * em) +{ + vl_api_bind_uri_t *bmp; + bmp = vl_msg_api_alloc (sizeof (*bmp)); + clib_memset (bmp, 0, sizeof (*bmp)); + + bmp->_vl_msg_id = ntohs (VL_API_BIND_URI); + bmp->client_index = em->my_client_index; + bmp->context = ntohl (0xfeedface); + memcpy (bmp->uri, em->uri, vec_len (em->uri)); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & bmp); +} + +int +server_listen (echo_main_t * em) +{ + server_send_listen (em); + if (wait_for_state_change (em, STATE_READY)) + { + clib_warning ("timeout waiting for STATE_READY"); + return -1; + } + return 0; +} + +void +server_send_unbind (echo_main_t * em) +{ + vl_api_unbind_uri_t *ump; + + ump = vl_msg_api_alloc (sizeof (*ump)); + clib_memset (ump, 0, sizeof (*ump)); + + ump->_vl_msg_id = ntohs (VL_API_UNBIND_URI); + ump->client_index = em->my_client_index; + memcpy (ump->uri, em->uri, vec_len (em->uri)); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & ump); +} + +void +server_run (echo_main_t * em) +{ + echo_session_t *session; + int i; + + /* $$$$ hack preallocation */ + for (i = 0; i < 200000; i++) + { + pool_get (em->sessions, session); + clib_memset (session, 0, sizeof (*session)); + } + for (i = 0; i < 200000; i++) + pool_put_index (em->sessions, i); + + if (application_attach (em)) + return; + + /* Bind to uri */ + if (server_listen (em)) + return; + + /* Enter handle event loop */ + server_handle_mq (em); + + /* Cleanup */ + server_send_unbind (em); + + application_detach (em); + + fformat (stdout, "Test complete...\n"); +} + +static void +vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * + mp) +{ + echo_main_t *em = &echo_main; + uword *p; + + if (mp->retval) + { + clib_warning ("vpp complained about disconnect: %d", + ntohl (mp->retval)); + return; + } + + em->state = STATE_START; + + p = hash_get (em->session_index_by_vpp_handles, mp->handle); + if (p) + { + hash_unset (em->session_index_by_vpp_handles, mp->handle); + } + else + { + clib_warning ("couldn't find session key %llx", mp->handle); + } +} + +static void + vl_api_application_tls_cert_add_reply_t_handler + (vl_api_application_tls_cert_add_reply_t * mp) +{ + if (mp->retval) + clib_warning ("failed to add tls cert"); +} + +static void + vl_api_application_tls_key_add_reply_t_handler + (vl_api_application_tls_key_add_reply_t * mp) +{ + if (mp->retval) + clib_warning ("failed to add tls key"); +} + +#define foreach_quic_echo_msg \ +_(BIND_URI_REPLY, bind_uri_reply) \ +_(UNBIND_URI_REPLY, unbind_uri_reply) \ +_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \ +_(APPLICATION_ATTACH_REPLY, application_attach_reply) \ +_(APPLICATION_DETACH_REPLY, application_detach_reply) \ +_(MAP_ANOTHER_SEGMENT, map_another_segment) \ +_(APPLICATION_TLS_CERT_ADD_REPLY, application_tls_cert_add_reply) \ +_(APPLICATION_TLS_KEY_ADD_REPLY, application_tls_key_add_reply) \ + +void +quic_echo_api_hookup (echo_main_t * em) +{ +#define _(N,n) \ + vl_msg_api_set_handlers(VL_API_##N, #n, \ + vl_api_##n##_t_handler, \ + vl_noop_handler, \ + vl_api_##n##_t_endian, \ + vl_api_##n##_t_print, \ + sizeof(vl_api_##n##_t), 1); + foreach_quic_echo_msg; +#undef _ +} + +int +main (int argc, char **argv) +{ + int i_am_server = 1, test_return_packets = 0; + echo_main_t *em = &echo_main; + svm_fifo_segment_main_t *sm = &em->segment_main; + unformat_input_t _argv, *a = &_argv; + u8 *chroot_prefix; + u8 *uri = 0; + u8 *bind_uri = (u8 *) "quic://0.0.0.0/1234"; + u8 *connect_uri = (u8 *) "quic://6.0.1.1/1234"; + u64 bytes_to_send = 64 << 10, mbytes; + char *app_name; + u32 tmp; + + clib_mem_init_thread_safe (0, 256 << 20); + + clib_memset (em, 0, sizeof (*em)); + em->session_index_by_vpp_handles = hash_create (0, sizeof (uword)); + em->my_pid = getpid (); + em->configured_segment_size = 1 << 20; + em->socket_name = 0; + em->use_sock_api = 1; + em->fifo_size = 64 << 10; + em->n_clients = 1; + + clib_time_init (&em->clib_time); + init_error_string_table (em); + svm_fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20); + unformat_init_command_line (a, argv); + + while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT) + { + if (unformat (a, "chroot prefix %s", &chroot_prefix)) + { + vl_set_memory_root_path ((char *) chroot_prefix); + } + else if (unformat (a, "uri %s", &uri)) + ; + else if (unformat (a, "segment-size %dM", &tmp)) + em->configured_segment_size = tmp << 20; + else if (unformat (a, "segment-size %dG", &tmp)) + em->configured_segment_size = tmp << 30; + else if (unformat (a, "server")) + i_am_server = 1; + else if (unformat (a, "client")) + i_am_server = 0; + else if (unformat (a, "no-return")) + em->no_return = 1; + else if (unformat (a, "test")) + test_return_packets = 1; + else if (unformat (a, "bytes %lld", &mbytes)) + { + bytes_to_send = mbytes; + } + else if (unformat (a, "mbytes %lld", &mbytes)) + { + bytes_to_send = mbytes << 20; + } + else if (unformat (a, "gbytes %lld", &mbytes)) + { + bytes_to_send = mbytes << 30; + } + else if (unformat (a, "socket-name %s", &em->socket_name)) + ; + else if (unformat (a, "use-svm-api")) + em->use_sock_api = 0; + else if (unformat (a, "fifo-size %d", &tmp)) + em->fifo_size = tmp << 10; + else if (unformat (a, "nclients %d", &em->n_clients)) + ; + else + { + fformat (stderr, "%s: usage [master|slave]\n", argv[0]); + exit (1); + } + } + + if (!em->socket_name) + em->socket_name = format (0, "%s%c", API_SOCKET_FILE, 0); + + if (uri) + { + em->uri = format (0, "%s%c", uri, 0); + em->connect_uri = format (0, "%s%c", uri, 0); + } + else + { + em->uri = format (0, "%s%c", bind_uri, 0); + em->connect_uri = format (0, "%s%c", connect_uri, 0); + } + + em->i_am_master = i_am_server; + em->test_return_packets = test_return_packets; + em->bytes_to_send = bytes_to_send; + em->time_to_stop = 0; + vec_validate (em->rx_buf, 4 << 20); + vec_validate (em->client_thread_handles, em->n_clients - 1); + vec_validate (em->thread_args, em->n_clients - 1); + + setup_signal_handlers (); + quic_echo_api_hookup (em); + + app_name = i_am_server ? "quic_echo_server" : "quic_echo_client"; + if (connect_to_vpp (app_name) < 0) + { + svm_region_exit (); + fformat (stderr, "Couldn't connect to vpe, exiting...\n"); + exit (1); + } + + if (i_am_server == 0) + clients_run (em); + else + server_run (em); + + /* Make sure detach finishes */ + wait_for_state_change (em, STATE_DETACHED); + + disconnect_from_vpp (em); + exit (0); +} + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/vnet/CMakeLists.txt b/src/vnet/CMakeLists.txt index 8e56ac35c71..c96da2a6c57 100644 --- a/src/vnet/CMakeLists.txt +++ b/src/vnet/CMakeLists.txt @@ -1588,6 +1588,12 @@ if(VPP_BUILD_SESSION_ECHO_APPS) DEPENDS api_headers NO_INSTALL ) + add_vpp_executable(quic_echo + SOURCES ../tests/vnet/session/quic_echo.c + LINK_LIBRARIES vlibmemoryclient svm vppinfra pthread m rt + DEPENDS api_headers + NO_INSTALL + ) add_vpp_executable(udp_echo SOURCES ../tests/vnet/session/udp_echo.c LINK_LIBRARIES vlibmemoryclient svm vppinfra pthread m rt diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index 3e1590ecb9f..3d601cecfc9 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -109,7 +109,7 @@ typedef struct application_ app_listener_t *listeners; /* - * TLS Specific + * TLS & QUIC Specific */ /** Certificate to be used for listen sessions */ @@ -121,6 +121,8 @@ typedef struct application_ /** Preferred tls engine */ u8 tls_engine; + u64 *quicly_ctx; + } application_t; typedef struct app_main_ diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index e51b138968e..7769f1faace 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -105,12 +105,19 @@ session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type) SESSION_CTRL_EVT_CLOSE); } +void +session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, + void *rpc_args) +{ + session_send_evt_to_thread (fp, rpc_args, thread_index, + SESSION_CTRL_EVT_RPC); +} + void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) { if (thread_index != vlib_get_thread_index ()) - session_send_evt_to_thread (fp, rpc_args, thread_index, - SESSION_CTRL_EVT_RPC); + session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args); else { void (*fnp) (void *) = fp; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 518fc62c97d..a3b84a6c8ef 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -335,6 +335,8 @@ int session_send_io_evt_to_thread_custom (void *data, u32 thread_index, session_evt_type_t evt_type); void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args); +void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, + void *rpc_args); transport_connection_t *session_get_transport (session_t * s); diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index e453ee01879..abab0865674 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -69,6 +69,9 @@ format_transport_proto (u8 * s, va_list * args) case TRANSPORT_PROTO_UDPC: s = format (s, "UDPC"); break; + case TRANSPORT_PROTO_QUIC: + s = format (s, "QUIC"); + break; } return s; } @@ -91,6 +94,9 @@ format_transport_proto_short (u8 * s, va_list * args) case TRANSPORT_PROTO_UDPC: s = format (s, "U"); break; + case TRANSPORT_PROTO_QUIC: + s = format (s, "Q"); + break; } return s; } @@ -175,6 +181,10 @@ unformat_transport_proto (unformat_input_t * input, va_list * args) *proto = TRANSPORT_PROTO_TLS; else if (unformat (input, "TLS")) *proto = TRANSPORT_PROTO_TLS; + else if (unformat (input, "quic")) + *proto = TRANSPORT_PROTO_QUIC; + else if (unformat (input, "QUIC")) + *proto = TRANSPORT_PROTO_QUIC; else return 0; return 1; diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h index d309c581db8..e262ddad583 100644 --- a/src/vnet/session/transport_types.h +++ b/src/vnet/session/transport_types.h @@ -130,6 +130,7 @@ typedef enum _transport_proto TRANSPORT_PROTO_NONE, TRANSPORT_PROTO_TLS, TRANSPORT_PROTO_UDPC, + TRANSPORT_PROTO_QUIC, TRANSPORT_N_PROTO } transport_proto_t; -- 2.16.6