quic: fix stream tx_fifo race condition 54/23754/4
authorNathan Skrzypczak <nathan.skrzypczak@gmail.com>
Tue, 3 Dec 2019 14:08:27 +0000 (15:08 +0100)
committerDave Wallace <dwallacelf@gmail.com>
Wed, 4 Dec 2019 18:08:51 +0000 (18:08 +0000)
Type: fix

There is a race condition in when receiving TX from
a client application :
As egress_emit writes as much data as possible to
the stream, if during egress_emit the app writes
to the fifo, the data will be directly passed to
quicly. Then TX callback happens and triggers
a scheduler update telling quilcy the stream has
data to send. When the next egress_emit is called
and no more data has come, we have nothing to write,
we return len = 0 to quicly which breaks an assert if
a loss happens later on.

Change-Id: I47e00a14dfc9068b5dac7b5c090a89124aea004f
Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
src/plugins/quic/quic.c
src/plugins/quic/quic.h

index d1f1884..f51881e 100644 (file)
@@ -169,7 +169,7 @@ quic_ctx_free (quic_ctx_t * ctx)
 {
   QUIC_DBG (2, "Free ctx %u %x", ctx->c_thread_index, ctx->c_c_index);
   u32 thread_index = ctx->c_thread_index;
-  ASSERT (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID);
+  QUIC_ASSERT (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID);
   if (CLIB_DEBUG)
     clib_memset (ctx, 0xfb, sizeof (*ctx));
   pool_put (quic_main.ctx_pool[thread_index], ctx);
@@ -295,14 +295,14 @@ quic_ack_rx_data (session_t * stream_session)
 
   sctx = quic_ctx_get (stream_session->connection_index,
                       stream_session->thread_index);
-  ASSERT (quic_ctx_is_stream (sctx));
+  QUIC_ASSERT (quic_ctx_is_stream (sctx));
   stream = sctx->stream;
   stream_data = (quic_stream_data_t *) stream->data;
 
   f = stream_session->rx_fifo;
   max_deq = svm_fifo_max_dequeue (f);
 
-  ASSERT (stream_data->app_rx_data_len >= max_deq);
+  QUIC_ASSERT (stream_data->app_rx_data_len >= max_deq);
   quicly_stream_sync_recvbuf (stream, stream_data->app_rx_data_len - max_deq);
   QUIC_DBG (3, "Acking %u bytes", stream_data->app_rx_data_len - max_deq);
   stream_data->app_rx_data_len = max_deq;
@@ -330,7 +330,7 @@ quic_connection_delete (quic_ctx_t * ctx)
 
   QUIC_DBG (2, "Deleting connection %u", ctx->c_c_index);
 
-  ASSERT (!quic_ctx_is_stream (ctx));
+  QUIC_ASSERT (!quic_ctx_is_stream (ctx));
   quic_stop_ctx_timer (ctx);
 
   /*  Delete the connection from the connection map */
@@ -428,14 +428,14 @@ quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet)
   /*  Read dest address from quicly-provided sockaddr */
   if (hdr.is_ip4)
     {
-      ASSERT (packet->dest.sa.sa_family == AF_INET);
+      QUIC_ASSERT (packet->dest.sa.sa_family == AF_INET);
       struct sockaddr_in *sa4 = (struct sockaddr_in *) &packet->dest.sa;
       hdr.rmt_port = sa4->sin_port;
       hdr.rmt_ip.ip4.as_u32 = sa4->sin_addr.s_addr;
     }
   else
     {
-      ASSERT (packet->dest.sa.sa_family == AF_INET6);
+      QUIC_ASSERT (packet->dest.sa.sa_family == AF_INET6);
       struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) &packet->dest.sa;
       hdr.rmt_port = sa6->sin6_port;
       clib_memcpy (&hdr.rmt_ip.ip6, &sa6->sin6_addr, 16);
@@ -474,7 +474,7 @@ quic_send_packets (quic_ctx_t * ctx)
   if (quic_ctx_is_stream (ctx))
     ctx = quic_ctx_get (ctx->quic_connection_ctx_id, ctx->c_thread_index);
 
-  ASSERT (!quic_ctx_is_stream (ctx));
+  QUIC_ASSERT (!quic_ctx_is_stream (ctx));
 
   udp_session = session_get_from_handle_if_valid (ctx->udp_session_handle);
   if (!udp_session)
@@ -590,13 +590,12 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
                 size_t len)
 {
   QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
-  u32 max_enq;
+  u32 max_enq, rlen, rv;
   quic_ctx_t *sctx;
   session_t *stream_session;
   app_worker_t *app_wrk;
   svm_fifo_t *f;
   quic_stream_data_t *stream_data;
-  int rlen, rv;
 
   stream_data = (quic_stream_data_t *) stream->data;
   sctx = quic_ctx_get (stream_data->ctx_id, stream_data->thread_index);
@@ -640,7 +639,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
                stream_session->app_wrk_index,
                stream_session->thread_index, f, len, rlen, off, max_enq);
       stream_data->app_rx_data_len += rlen;
-      ASSERT (rlen >= len);
+      QUIC_ASSERT (rlen >= len);
       app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
       if (PREDICT_TRUE (app_wrk != 0))
        {
@@ -656,7 +655,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
       rlen = svm_fifo_enqueue_with_offset (f,
                                           off - stream_data->app_rx_data_len,
                                           len, (u8 *) src);
-      ASSERT (rlen == 0);
+      QUIC_ASSERT (rlen == 0);
     }
   return 0;
 }
@@ -664,16 +663,22 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
 void
 quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta)
 {
+  quic_stream_data_t *stream_data;
   session_t *stream_session;
   svm_fifo_t *f;
-  int rv;
+  u32 rv;
 
+  stream_data = (quic_stream_data_t *) stream->data;
   stream_session = get_stream_session_from_stream (stream);
   f = stream_session->tx_fifo;
 
+  QUIC_ASSERT (stream_data->app_tx_data_len >= delta);
+  stream_data->app_tx_data_len -= delta;
   rv = svm_fifo_dequeue_drop (f, delta);
-  ASSERT (rv == delta);
-  quicly_stream_sync_sendbuf (stream, 0);
+  QUIC_ASSERT (rv == delta);
+
+  rv = quicly_stream_sync_sendbuf (stream, 0);
+  QUIC_ASSERT (!rv);
 }
 
 int
@@ -681,16 +686,18 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
                       size_t * len, int *wrote_all)
 {
   u32 deq_max, first_deq, max_rd_chunk, rem_offset;
+  quic_stream_data_t *stream_data;
   session_t *stream_session;
   svm_fifo_t *f;
 
+  stream_data = (quic_stream_data_t *) stream->data;
   stream_session = get_stream_session_from_stream (stream);
   f = stream_session->tx_fifo;
 
   QUIC_DBG (3, "Emitting %u, offset %u", *len, off);
 
   deq_max = svm_fifo_max_dequeue_cons (f);
-  ASSERT (off <= deq_max);
+  QUIC_ASSERT (off <= deq_max);
   if (off + *len < deq_max)
     {
       *wrote_all = 0;
@@ -699,8 +706,11 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
     {
       *wrote_all = 1;
       *len = deq_max - off;
-      QUIC_DBG (3, "Wrote ALL, %u", *len);
     }
+  QUIC_ASSERT (*len > 0);
+
+  if (off + *len > stream_data->app_tx_data_len)
+    stream_data->app_tx_data_len = off + *len;
 
   /* TODO, use something like : return svm_fifo_peek (f, off, *len, dst); */
   max_rd_chunk = svm_fifo_max_read_chunk (f);
@@ -780,6 +790,7 @@ quic_on_stream_open (quicly_stream_open_t * self, quicly_stream_t * stream)
   stream_data->ctx_id = sctx_id;
   stream_data->thread_index = sctx->c_thread_index;
   stream_data->app_rx_data_len = 0;
+  stream_data->app_tx_data_len = 0;
 
   sctx->c_s_index = stream_session->session_index;
   stream_session->session_state = SESSION_STATE_CREATED;
@@ -1024,6 +1035,7 @@ quic_connect_stream (session_t * quic_session, u32 opaque)
   stream_data->ctx_id = sctx->c_c_index;
   stream_data->thread_index = sctx->c_thread_index;
   stream_data->app_rx_data_len = 0;
+  stream_data->app_tx_data_len = 0;
   stream_session->session_state = SESSION_STATE_READY;
 
   /* For now we only reset streams. Cleanup will be triggered by timers */
@@ -1235,7 +1247,7 @@ quic_stop_listen (u32 lctx_index)
   QUIC_DBG (2, "Called quic_stop_listen");
   quic_ctx_t *lctx;
   lctx = quic_ctx_get (lctx_index, 0);
-  ASSERT (quic_ctx_is_listener (lctx));
+  QUIC_ASSERT (quic_ctx_is_listener (lctx));
   vnet_unlisten_args_t a = {
     .handle = lctx->udp_session_handle,
     .app_index = quic_main.app_index,
@@ -1470,7 +1482,7 @@ quic_transfer_connection (u32 ctx_index, u32 dest_thread)
   QUIC_DBG (2, "Transferring conn %u to thread %u", ctx_index, dest_thread);
 
   temp_ctx = clib_mem_alloc (sizeof (quic_ctx_t));
-  ASSERT (temp_ctx);
+  QUIC_ASSERT (temp_ctx != NULL);
   ctx = quic_ctx_get (ctx_index, thread_index);
 
   clib_memcpy (temp_ctx, ctx, sizeof (quic_ctx_t));
@@ -1590,9 +1602,9 @@ quic_udp_session_migrate_callback (session_t * s, session_handle_t new_sh)
   quic_ctx_t *ctx;
 
   QUIC_ERR ("Session %x migrated to %lx", s->session_index, new_sh);
-  ASSERT (vlib_get_thread_index () == s->thread_index);
+  QUIC_ASSERT (vlib_get_thread_index () == s->thread_index);
   ctx = quic_ctx_get (s->opaque, s->thread_index);
-  ASSERT (ctx->udp_session_handle == session_handle (s));
+  QUIC_ASSERT (ctx->udp_session_handle == session_handle (s));
 
   ctx->udp_session_handle = new_sh;
 #if QUIC_DEBUG >= 1
@@ -1674,8 +1686,10 @@ static int
 quic_custom_tx_callback (void *s, u32 max_burst_size)
 {
   session_t *stream_session = (session_t *) s;
+  quic_stream_data_t *stream_data;
   quicly_stream_t *stream;
   quic_ctx_t *ctx;
+  u32 max_deq;
   int rv;
 
   if (PREDICT_FALSE
@@ -1690,9 +1704,6 @@ quic_custom_tx_callback (void *s, u32 max_burst_size)
 
   QUIC_DBG (3, "Stream TX event");
   quic_ack_rx_data (stream_session);
-  if (!svm_fifo_max_dequeue (stream_session->tx_fifo))
-    return 0;
-
   stream = ctx->stream;
   if (!quicly_sendstate_is_open (&stream->sendstate))
     {
@@ -1700,8 +1711,18 @@ quic_custom_tx_callback (void *s, u32 max_burst_size)
       return -1;
     }
 
-  if ((rv = quicly_stream_sync_sendbuf (stream, 1)) != 0)
-    return rv;
+  stream_data = (quic_stream_data_t *) stream->data;
+  max_deq = svm_fifo_max_dequeue (stream_session->tx_fifo);
+  QUIC_ASSERT (max_deq >= stream_data->app_tx_data_len);
+  if (max_deq == stream_data->app_tx_data_len)
+    {
+      QUIC_DBG (3, "TX but no data %d / %d", max_deq,
+               stream_data->app_tx_data_len);
+      return 0;
+    }
+  stream_data->app_tx_data_len = max_deq;
+  rv = quicly_stream_sync_sendbuf (stream, 1);
+  QUIC_ASSERT (!rv);
 
 tx_end:
   quic_send_packets (ctx);
@@ -1878,8 +1899,8 @@ quic_process_one_rx_packet (u64 udp_session_handle, svm_fifo_t * f,
 
   ret = svm_fifo_peek (f, fifo_offset,
                       SESSION_CONN_HDR_LEN, (u8 *) & pctx->ph);
-  ASSERT (ret == SESSION_CONN_HDR_LEN);
-  ASSERT (pctx->ph.data_offset == 0);
+  QUIC_ASSERT (ret == SESSION_CONN_HDR_LEN);
+  QUIC_ASSERT (pctx->ph.data_offset == 0);
   full_len = pctx->ph.data_length + SESSION_CONN_HDR_LEN;
   if (full_len > cur_deq)
     {
index dfcb0e6..9433a89 100644 (file)
 #define QUIC_DBG(_lvl, _fmt, _args...)
 #endif
 
+#if CLIB_ASSERT_ENABLE
+#define QUIC_ASSERT(truth) ASSERT (truth)
+#else
+#define QUIC_ASSERT(truth)                        \
+  do {                                            \
+    if (PREDICT_FALSE (! (truth)))                \
+      QUIC_ERR ("ASSERT(%s) failed", # truth);    \
+  } while (0)
+#endif
+
 #define QUIC_ERR(_fmt, _args...)                \
   do {                                          \
     clib_warning ("QUIC-ERR: " _fmt, ##_args);  \
   } while (0)
 
+
+
 extern vlib_node_registration_t quic_input_node;
 
 typedef enum
@@ -167,6 +179,7 @@ typedef struct quic_stream_data_
   u32 ctx_id;
   u32 thread_index;
   u32 app_rx_data_len;         /**< bytes received, to be read by external app */
+  u32 app_tx_data_len;         /**< bytes sent */
 } quic_stream_data_t;
 
 typedef struct quic_worker_ctx_