quic : Use TX event for app read notification
[vpp.git] / src / plugins / quic / quic.c
index f099d07..93c0162 100644 (file)
@@ -184,18 +184,10 @@ quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet)
   tc = session_get_transport (udp_session);
 
   max_enqueue = svm_fifo_max_enqueue (f);
-  if (max_enqueue <= sizeof (session_dgram_hdr_t))
-    {
-      QUIC_DBG (1, "Not enough space to enqueue header");
-      return QUIC_ERROR_FULL_FIFO;
-    }
-
-  max_enqueue -= sizeof (session_dgram_hdr_t);
-
-  if (max_enqueue < len)
+  if (max_enqueue < SESSION_CONN_HDR_LEN + len)
     {
       QUIC_DBG (1, "Too much data to send, max_enqueue %u, len %u",
-               max_enqueue, len);
+               max_enqueue, len + SESSION_CONN_HDR_LEN);
       return QUIC_ERROR_FULL_FIFO;
     }
 
@@ -243,10 +235,9 @@ static int
 quic_sendable_packet_count (session_t * udp_session)
 {
   u32 max_enqueue;
+  u32 packet_size = QUIC_MAX_PACKET_SIZE + SESSION_CONN_HDR_LEN;
   max_enqueue = svm_fifo_max_enqueue (udp_session->tx_fifo);
-  return clib_min (max_enqueue /
-                  (QUIC_MAX_PACKET_SIZE + sizeof (session_dgram_hdr_t)),
-                  QUIC_SEND_PACKET_VEC_SIZE);
+  return clib_min (max_enqueue / packet_size, QUIC_SEND_PACKET_VEC_SIZE);
 }
 
 static int
@@ -259,7 +250,7 @@ quic_send_packets (quic_ctx_t * ctx)
   quicly_context_t *quicly_context;
   app_worker_t *app_wrk;
   application_t *app;
-  int err;
+  int err = 0;
 
   /* We have sctx, get qctx */
   if (ctx->c_quic_ctx_id.is_stream)
@@ -270,7 +261,10 @@ quic_send_packets (quic_ctx_t * ctx)
   ASSERT (!ctx->c_quic_ctx_id.is_stream);
 
   udp_session =
-    session_get_from_handle (ctx->c_quic_ctx_id.udp_session_handle);
+    session_get_from_handle_if_valid (ctx->c_quic_ctx_id.udp_session_handle);
+  if (!udp_session)
+    goto quicly_error;
+
   conn = ctx->c_quic_ctx_id.conn;
 
   if (!conn)
@@ -311,16 +305,21 @@ quic_send_packets (quic_ctx_t * ctx)
     }
   while (num_packets > 0 && num_packets == max_packets);
 
+stop_sending:
   if (svm_fifo_set_event (udp_session->tx_fifo))
-    session_send_io_evt_to_thread (udp_session->tx_fifo, SESSION_IO_EVT_TX);
+    if ((err =
+        session_send_io_evt_to_thread (udp_session->tx_fifo,
+                                       SESSION_IO_EVT_TX)))
+      clib_warning ("Event enqueue errored %d", err);
 
-stop_sending:
+  QUIC_DBG (3, "%u[TX] %u[RX]", svm_fifo_max_dequeue (udp_session->tx_fifo),
+           svm_fifo_max_dequeue (udp_session->rx_fifo));
   quic_update_timer (ctx);
   return 0;
 
 quicly_error:
-  if ((err != QUICLY_ERROR_PACKET_IGNORED) & (err !=
-                                             QUICLY_ERROR_FREE_CONNECTION))
+  if (err && err != QUICLY_ERROR_PACKET_IGNORED
+      && err != QUICLY_ERROR_FREE_CONNECTION)
     clib_warning ("Quic error '%s'.", quic_format_err (err));
   quic_connection_closed (ctx->c_c_index, ctx->c_thread_index);
   return 1;
@@ -393,6 +392,31 @@ get_stream_session_from_stream (quicly_stream_t * stream)
   return session_get (ctx->c_s_index, stream_data->thread_index);
 }
 
+static void
+quic_ack_rx_data (session_t * stream_session)
+{
+  u32 max_deq;
+  quic_ctx_t *sctx;
+  svm_fifo_t *f;
+  quicly_stream_t *stream;
+  quic_stream_data_t *stream_data;
+
+  sctx =
+    quic_ctx_get (stream_session->connection_index,
+                 stream_session->thread_index);
+  ASSERT (sctx->c_quic_ctx_id.is_stream);
+  stream = sctx->c_quic_ctx_id.stream;
+  stream_data = (quic_stream_data_t *) stream->data;
+
+  f = stream_session->rx_fifo;
+  max_deq = svm_fifo_max_dequeue (f);
+
+  ASSERT (stream_data->app_rx_data_len >= max_deq);
+  quicly_stream_sync_recvbuf (stream, stream_data->app_rx_data_len - max_deq);
+  QUIC_DBG (3, "Acking %u bytes", stream_data->app_rx_data_len - max_deq);
+  stream_data->app_rx_data_len = max_deq;
+}
+
 static int
 quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
                 size_t len)
@@ -413,26 +437,28 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
 
   max_enq = svm_fifo_max_enqueue_prod (f);
   QUIC_DBG (3, "Enqueuing %u at off %u in %u space", len, off, max_enq);
-  if (off + len > max_enq)
+  if (off - stream_data->app_rx_data_len + len > max_enq)
     {
-      /* TODO : can we find a better solution, listening on RX fifo evts ? */
-      QUIC_DBG (3, "Ingoring packet, RX fifo is full");
-      return QUICLY_ERROR_PACKET_IGNORED;
+      QUIC_DBG (1, "Error RX fifo is full");
+      return 1;
     }
-  if (off == 0)
+  if (off == stream_data->app_rx_data_len)
     {
+      /* Streams live on the same thread so (f, stream_data) should stay consistent */
       rlen = svm_fifo_enqueue (f, len, (u8 *) src);
+      stream_data->app_rx_data_len += rlen;
       ASSERT (rlen >= len);
-
-      quicly_stream_sync_recvbuf (stream, rlen);
       app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
       if (PREDICT_TRUE (app_wrk != 0))
        app_worker_lock_and_send_event (app_wrk, stream_session,
                                        SESSION_IO_EVT_RX);
+      quic_ack_rx_data (stream_session);
     }
   else
     {
-      rlen = svm_fifo_enqueue_with_offset (f, off, len, (u8 *) src);
+      rlen =
+       svm_fifo_enqueue_with_offset (f, off - stream_data->app_rx_data_len,
+                                     len, (u8 *) src);
       ASSERT (rlen == 0);
     }
   return 0;
@@ -443,11 +469,13 @@ quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta)
 {
   session_t *stream_session;
   svm_fifo_t *f;
+  int rv;
 
   stream_session = get_stream_session_from_stream (stream);
   f = stream_session->tx_fifo;
 
-  ASSERT (svm_fifo_dequeue_drop (f, delta) == delta);
+  rv = svm_fifo_dequeue_drop (f, delta);
+  ASSERT (rv == delta);
   quicly_stream_sync_sendbuf (stream, 0);
 }
 
@@ -472,9 +500,9 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
     }
   else
     {
-      QUIC_DBG (3, "Wrote ALL");
       *wrote_all = 1;
       *len = deq_max - off;
+      QUIC_DBG (3, "Wrote ALL, %u", *len);
     }
 
   /* TODO, use something like : return svm_fifo_peek (f, off, *len, dst); */
@@ -538,6 +566,7 @@ quic_accept_stream (void *s)
   stream_data = (quic_stream_data_t *) stream->data;
   stream_data->ctx_id = sctx_id;
   stream_data->thread_index = sctx->c_thread_index;
+  stream_data->app_rx_data_len = 0;
 
   sctx->c_s_index = stream_session->session_index;
   stream_session->session_state = SESSION_STATE_CREATED;
@@ -557,6 +586,9 @@ quic_accept_stream (void *s)
       quicly_reset_stream (stream, QUIC_APP_ALLOCATION_ERROR);
       return;
     }
+  svm_fifo_add_want_tx_ntf (stream_session->rx_fifo,
+                           SVM_FIFO_WANT_TX_NOTIF_IF_FULL |
+                           SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY);
 
   rv = app_worker_accept_notify (app_wrk, stream_session);
   if (rv)
@@ -1211,6 +1243,10 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep)
       return app_worker_connect_notify (app_wrk, NULL, sep->opaque);
     }
 
+  svm_fifo_add_want_tx_ntf (stream_session->rx_fifo,
+                           SVM_FIFO_WANT_TX_NOTIF_IF_FULL |
+                           SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY);
+
   stream_session->session_state = SESSION_STATE_READY;
   if (app_worker_connect_notify (app_wrk, stream_session, sep->opaque))
     {
@@ -1225,6 +1261,7 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep)
   stream_data = (quic_stream_data_t *) stream->data;
   stream_data->ctx_id = sctx->c_c_index;
   stream_data->thread_index = sctx->c_thread_index;
+  stream_data->app_rx_data_len = 0;
   return 0;
 }
 
@@ -1798,6 +1835,17 @@ quic_del_segment_callback (u32 client_index, u64 seg_handle)
   return 0;
 }
 
+
+static int
+quic_custom_app_rx_callback (transport_connection_t * tc)
+{
+  session_t *stream_session = session_get (tc->s_index, tc->thread_index);
+  QUIC_DBG (2, "Received app READ notification");
+  quic_ack_rx_data (stream_session);
+  svm_fifo_reset_tx_ntf (stream_session->rx_fifo);
+  return 0;
+}
+
 static int
 quic_custom_tx_callback (void *s)
 {
@@ -1806,7 +1854,6 @@ quic_custom_tx_callback (void *s)
   quic_ctx_t *ctx;
   int rv;
 
-  svm_fifo_unset_event (stream_session->tx_fifo);
   if (PREDICT_FALSE
       (stream_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING))
     return 0;
@@ -1818,6 +1865,11 @@ quic_custom_tx_callback (void *s)
       goto tx_end;             /* Most probably a reschedule */
     }
 
+  QUIC_DBG (3, "Stream TX event");
+  quic_ack_rx_data (stream_session);
+  if (!svm_fifo_max_dequeue (stream_session->tx_fifo))
+    return 0;
+
   stream = ctx->c_quic_ctx_id.stream;
   if (!quicly_sendstate_is_open (&stream->sendstate))
     {
@@ -2013,6 +2065,7 @@ quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle,
    * reset, then the next CID is highly likely to contain a non-authenticating
    * CID, ... */
   QUIC_DBG (2, "Sending stateless reset");
+  int rv;
   quicly_datagram_t *dgram;
   session_t *udp_session;
   if (packet.cid.dest.plaintext.node_id == 0
@@ -2023,7 +2076,11 @@ quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle,
       if (dgram == NULL)
        return 1;
       udp_session = session_get_from_handle (udp_session_handle);
-      return quic_send_datagram (udp_session, dgram);  /*  TODO : set event on fifo */
+      rv = quic_send_datagram (udp_session, dgram);
+      if (svm_fifo_set_event (udp_session->tx_fifo))
+       session_send_io_evt_to_thread (udp_session->tx_fifo,
+                                      SESSION_IO_EVT_TX);
+      return rv;
     }
   return 0;
 }
@@ -2041,8 +2098,7 @@ quic_app_rx_callback (session_t * udp_session)
   struct sockaddr_in6 sa6;
   struct sockaddr *sa = (struct sockaddr *) &sa6;
   socklen_t salen;
-  u32 max_deq, len, full_len, ctx_index = UINT32_MAX, ctx_thread =
-    UINT32_MAX, ret;
+  u32 max_deq, full_len, ctx_index = UINT32_MAX, ctx_thread = UINT32_MAX, ret;
   u8 *data;
   int err;
   u32 *opening_ctx_pool, *ctx_index_ptr;
@@ -2050,7 +2106,6 @@ quic_app_rx_callback (session_t * udp_session)
   u64 udp_session_handle = session_handle (udp_session);
   int rv = 0;
   u32 thread_index = vlib_get_thread_index ();
-
   app = application_get_if_valid (app_index);
   if (!app)
     {
@@ -2063,24 +2118,23 @@ quic_app_rx_callback (session_t * udp_session)
     {
       udp_session = session_get_from_handle (udp_session_handle);      /*  session alloc might have happened */
       f = udp_session->rx_fifo;
-      svm_fifo_unset_event (f);
       max_deq = svm_fifo_max_dequeue (f);
-      if (max_deq < sizeof (session_dgram_hdr_t))
+      if (max_deq == 0)
        return 0;
 
-      ret = svm_fifo_peek (f, 0, SESSION_CONN_HDR_LEN, (u8 *) & ph);
-      if (ret != SESSION_CONN_HDR_LEN)
+      if (max_deq < SESSION_CONN_HDR_LEN)
        {
-         QUIC_DBG (1, "Not enough data for header in RX");
+         QUIC_DBG (1, "Not enough data for even a header in RX");
          return 1;
        }
-      if (ph.data_length < ph.data_offset)
+      ret = svm_fifo_peek (f, 0, SESSION_CONN_HDR_LEN, (u8 *) & ph);
+      if (ret != SESSION_CONN_HDR_LEN)
        {
-         QUIC_DBG (1, "Not enough data vs offset in RX");
+         QUIC_DBG (1, "Not enough data for header in RX");
          return 1;
        }
-      len = ph.data_length - ph.data_offset;
-      full_len = ph.data_length + ph.data_offset + SESSION_CONN_HDR_LEN;
+      ASSERT (ph.data_offset == 0);
+      full_len = ph.data_length + SESSION_CONN_HDR_LEN;
       if (full_len > max_deq)
        {
          QUIC_DBG (1, "Not enough data in fifo RX");
@@ -2090,9 +2144,7 @@ quic_app_rx_callback (session_t * udp_session)
       /* Quicly can read len bytes from the fifo at offset:
        * ph.data_offset + SESSION_CONN_HDR_LEN */
       data = malloc (ph.data_length);
-      ret =
-       svm_fifo_peek (f, ph.data_offset + SESSION_CONN_HDR_LEN,
-                      ph.data_length, data);
+      ret = svm_fifo_peek (f, SESSION_CONN_HDR_LEN, ph.data_length, data);
       if (ret != ph.data_length)
        {
          QUIC_DBG (1, "Not enough data peeked in RX");
@@ -2100,15 +2152,10 @@ quic_app_rx_callback (session_t * udp_session)
          return 1;
        }
 
-      plen =
-       quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, &packet,
-                             data, len);
-
       rv = 0;
       quic_build_sockaddr (sa, &salen, &ph.rmt_ip, ph.rmt_port, ph.is_ip4);
-      plen =
-       quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, &packet,
-                             data, len);
+      plen = quicly_decode_packet ((quicly_context_t *) app->quicly_ctx,
+                                  &packet, data, ph.data_length);
 
       if (plen != SIZE_MAX)
        {
@@ -2157,9 +2204,7 @@ quic_app_rx_callback (session_t * udp_session)
            }
        }
     ctx_search_done:
-      svm_fifo_dequeue_drop (f,
-                            ph.data_length + ph.data_offset +
-                            SESSION_CONN_HDR_LEN);
+      svm_fifo_dequeue_drop (f, full_len);
       free (data);
     }
   while (1);
@@ -2233,6 +2278,7 @@ static const transport_proto_vft_t quic_proto = {
   .get_connection = quic_connection_get,
   .get_listener = quic_listener_get,
   .update_time = quic_update_time,
+  .app_rx_evt = quic_custom_app_rx_callback,
   .custom_tx = quic_custom_tx_callback,
   .format_connection = format_quic_connection,
   .format_half_open = format_quic_half_open,