quic: Add support for unidirectional streams 59/23759/5
authorNathan Skrzypczak <nathan.skrzypczak@gmail.com>
Tue, 3 Dec 2019 15:25:11 +0000 (16:25 +0100)
committerFlorin Coras <florin.coras@gmail.com>
Wed, 11 Dec 2019 16:44:24 +0000 (16:44 +0000)
Type: feature

Change-Id: I3a642626a444504594c5e3df40dbc92df54136f0
Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
src/plugins/hs_apps/sapi/vpp_echo_bapi.c
src/plugins/hs_apps/sapi/vpp_echo_common.h
src/plugins/hs_apps/sapi/vpp_echo_proto_quic.c
src/plugins/quic/quic.c
src/vnet/session/application_interface.h
src/vnet/session/session_api.c
src/vnet/session/session_node.c
src/vnet/session/session_types.h

index 955d606..04eba24 100644 (file)
@@ -167,6 +167,7 @@ echo_send_connect (echo_main_t * em, void *args)
   mp->parent_handle = a->parent_session_handle;
   mp->ckpair_index = em->ckpair_index;
   mp->crypto_engine = em->crypto_engine;
+  mp->flags = em->connect_flag;
   app_send_ctrl_evt_to_vpp (mq, app_evt);
 }
 
index 5588b27..617acac 100644 (file)
@@ -94,6 +94,7 @@
   _(ECHO_FAIL_MISSING_START_EVENT, "ECHO_FAIL_MISSING_START_EVENT")     \
   _(ECHO_FAIL_MISSING_END_EVENT, "ECHO_FAIL_MISSING_END_EVENT")         \
   _(ECHO_FAIL_TEST_ASSERT_RX_TOTAL, "ECHO_FAIL_TEST_ASSERT_RX_TOTAL")   \
+  _(ECHO_FAIL_UNIDIRECTIONAL, "ECHO_FAIL_UNIDIRECTIONAL")               \
   _(ECHO_FAIL_TEST_ASSERT_TX_TOTAL, "ECHO_FAIL_TEST_ASSERT_TX_TOTAL")   \
   _(ECHO_FAIL_TEST_ASSERT_ALL_SESSIONS_CLOSED,                          \
     "ECHO_FAIL_TEST_ASSERT_ALL_SESSIONS_CLOSED")                        \
@@ -317,6 +318,7 @@ typedef struct
   u32 evt_q_size;              /* Size of the vpp MQ (app<->vpp events) */
   u32 ckpair_index;            /* Cert key pair used */
   u8 crypto_engine;            /* crypto engine used */
+  u8 connect_flag;             /* flags to pass to mq connect */
 
   u8 *appns_id;
   u64 appns_flags;
index 55fd890..5da81cc 100644 (file)
@@ -370,6 +370,12 @@ quic_echo_accepted_cb (session_accepted_msg_t * mp, echo_session_t * session)
       if (em->stats.accepted_count.s % LOGGING_BATCH == 0)
        ECHO_LOG (0, "Accepted S %d / %d", em->stats.accepted_count.s,
                  em->n_clients);
+
+      if (em->connect_flag && !(mp->flags & em->connect_flag))
+       {
+         ECHO_FAIL (ECHO_FAIL_UNIDIRECTIONAL,
+                    "expected unidirectional streams");
+       }
     }
 
   if (em->n_clients_connected == em->n_clients
@@ -448,11 +454,14 @@ quic_echo_unformat_setup_vft (unformat_input_t * input, va_list * args)
 static int
 quic_echo_process_opts_cb (unformat_input_t * a)
 {
+  echo_main_t *em = &echo_main;
   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
   if (unformat (a, "quic-streams %d", &eqm->n_stream_clients))
     ;
   else if (unformat (a, "quic-setup %U", quic_echo_unformat_setup_vft))
     ;
+  else if (unformat (a, "uni"))
+    em->connect_flag = SESSION_F_UNIDIRECTIONAL;
   else if (unformat (a, "qclose=%U",
                     echo_unformat_close, &eqm->send_quic_disconnects))
     ;
@@ -503,6 +512,7 @@ quic_echo_print_usage_cb ()
           "                      OPT=default : Client open N connections.\n"
           "                       On each one client opens M streams\n"
           "  qclose=[Y|N|W]      When connection is done send[Y]|nop[N]|wait[W] for close\n"
+          "  uni                 Use unidirectional streams\n"
           "\n"
           "  quic-streams N      Open N QUIC streams (defaults to 1)\n");
 }
index d59504a..04e969d 100644 (file)
@@ -792,6 +792,8 @@ quic_on_stream_open (quicly_stream_open_t * self, quicly_stream_t * stream)
   sctx->stream = stream;
   sctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
   sctx->flags |= QUIC_F_IS_STREAM;
+  if (quicly_stream_is_unidirectional (stream->stream_id))
+    stream_session->flags |= SESSION_F_UNIDIRECTIONAL;
 
   stream_data = (quic_stream_data_t *) stream->data;
   stream_data->ctx_id = sctx_id;
@@ -966,7 +968,7 @@ quic_expired_timers_dispatch (u32 * expired_timers)
 /* Transport proto functions */
 
 static int
-quic_connect_stream (session_t * quic_session, u32 opaque)
+quic_connect_stream (session_t * quic_session, session_endpoint_cfg_t * sep)
 {
   uint64_t quic_session_handle;
   session_t *stream_session;
@@ -1019,7 +1021,9 @@ quic_connect_stream (session_t * quic_session, u32 opaque)
   if (!conn || !quicly_connection_is_ready (conn))
     return -1;
 
-  if ((rv = quicly_open_stream (conn, &stream, 0 /* uni */ )))
+  if ((rv =
+       quicly_open_stream (conn, &stream,
+                          sep->flags & SESSION_F_UNIDIRECTIONAL)))
     {
       QUIC_DBG (2, "Stream open failed with %d", rv);
       return -1;
@@ -1038,6 +1042,8 @@ quic_connect_stream (session_t * quic_session, u32 opaque)
   stream_session->listener_handle = quic_session_handle;
   stream_session->session_type =
     session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, qctx->udp_is_ip4);
+  if (sep->flags & SESSION_F_UNIDIRECTIONAL)
+    stream_session->flags |= SESSION_F_UNIDIRECTIONAL;
 
   sctx->c_s_index = stream_session->session_index;
   stream_data = (quic_stream_data_t *) stream->data;
@@ -1052,14 +1058,14 @@ quic_connect_stream (session_t * quic_session, u32 opaque)
     {
       QUIC_ERR ("failed to app_worker_init_connected");
       quicly_reset_stream (stream, QUIC_APP_CONNECT_NOTIFY_ERROR);
-      return app_worker_connect_notify (app_wrk, NULL, opaque);
+      return app_worker_connect_notify (app_wrk, NULL, sep->opaque);
     }
 
   svm_fifo_add_want_deq_ntf (stream_session->rx_fifo,
                             SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
                             SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
 
-  if (app_worker_connect_notify (app_wrk, stream_session, opaque))
+  if (app_worker_connect_notify (app_wrk, stream_session, sep->opaque))
     {
       QUIC_ERR ("failed to notify app");
       quic_increment_counter (QUIC_ERROR_CLOSED_STREAM, 1);
@@ -1134,7 +1140,7 @@ quic_connect (transport_endpoint_cfg_t * tep)
 
   quic_session = session_get_from_handle_if_valid (sep->parent_handle);
   if (quic_session)
-    return quic_connect_stream (quic_session, sep->opaque);
+    return quic_connect_stream (quic_session, sep);
   else
     return quic_connect_connection (sep);
 }
@@ -1153,6 +1159,9 @@ quic_proto_on_close (u32 ctx_index, u32 thread_index)
   if (quic_ctx_is_stream (ctx))
     {
       quicly_stream_t *stream = ctx->stream;
+      if (!quicly_stream_has_send_side (quicly_is_client (stream->conn),
+                                       stream->stream_id))
+       return;
       quicly_reset_stream (stream, QUIC_APP_ERROR_CLOSE_NOTIFY);
       quic_send_packets (ctx);
       return;
index 3a4f992..ec29d18 100644 (file)
@@ -361,6 +361,7 @@ typedef struct session_accepted_msg_
   u64 segment_handle;
   uword vpp_event_queue_address;
   transport_endpoint_t rmt;
+  u8 flags;
 } __clib_packed session_accepted_msg_t;
 
 typedef struct session_accepted_reply_msg_
@@ -386,6 +387,7 @@ typedef struct session_connect_msg_
   u64 parent_handle;
   u32 ckpair_index;
   u8 crypto_engine;
+  u8 flags;
 } __clib_packed session_connect_msg_t;
 
 STATIC_ASSERT (sizeof (session_connect_msg_t) <= SESSION_CTRL_MSG_MAX_SIZE,
index 0846e57..99e00a7 100644 (file)
@@ -196,6 +196,7 @@ mq_send_session_accepted_cb (session_t * s)
   mp->server_rx_fifo = pointer_to_uword (s->rx_fifo);
   mp->server_tx_fifo = pointer_to_uword (s->tx_fifo);
   mp->segment_handle = session_segment_handle (s);
+  mp->flags = s->flags;
 
   if (session_has_transport (s))
     {
index 191be7e..f3ba9e7 100644 (file)
@@ -117,6 +117,7 @@ session_mq_connect_handler (void *data)
   a->sep_ext.parent_handle = mp->parent_handle;
   a->sep_ext.ckpair_index = mp->ckpair_index;
   a->sep_ext.crypto_engine = mp->crypto_engine;
+  a->sep_ext.flags = mp->flags;
   if (mp->hostname_len)
     {
       vec_validate (a->sep_ext.hostname, mp->hostname_len - 1);
index 6d7f84f..16d4843 100644 (file)
@@ -47,6 +47,7 @@ typedef struct _session_endpoint_cfg
   u64 parent_handle;
   u32 ckpair_index;
   u8 crypto_engine;
+  u8 flags;
 } session_endpoint_cfg_t;
 
 #define SESSION_IP46_ZERO                      \
@@ -152,6 +153,7 @@ typedef enum
   _(PROXY, "proxy")                                    \
   _(CUSTOM_TX, "custom-tx")                            \
   _(IS_MIGRATING, "migrating")                         \
+  _(UNIDIRECTIONAL, "unidirectional")                  \
 
 typedef enum session_flags_bits_
 {