Add support for tcp/session buffer chains 13/6613/11
authorFlorin Coras <fcoras@cisco.com>
Mon, 8 May 2017 02:12:02 +0000 (19:12 -0700)
committerDamjan Marion <dmarion.lists@gmail.com>
Tue, 9 May 2017 14:38:56 +0000 (14:38 +0000)
Change-Id: I01c6e3dc3a1b2785df37bb66b19c4b5cbb8f3211
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/scripts/vnet/uri/dummy_app.py
src/uri/uri_socket_server.c
src/vnet/session/node.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/tcp/tcp_input.c
src/vnet/tcp/tcp_output.c

index 5033392..ff00f2f 100644 (file)
@@ -6,14 +6,28 @@ import time
 
 # action can be reflect or drop 
 action = "drop"
+test = 0
+
+def test_data (data, n_rcvd):
+    n_read = len (data);
+    for i in range(n_read):
+        expected = (n_rcvd + i) & 0xff
+        byte_got = ord (data[i])
+        if (byte_got != expected):
+            print("Difference at byte {}. Expected {} got {}"
+                  .format(n_rcvd + i, expected, byte_got))
+    return n_read
 
 def handle_connection (connection, client_address):
     print("Received connection from {}".format(repr(client_address)))
+    n_rcvd = 0
     try:
         while True:
             data = connection.recv(4096)
             if not data:
                 break;
+            if (test == 1):
+                n_rcvd += test_data (data, n_rcvd)
             if (action != "drop"):
                 connection.sendall(data)
     finally:
@@ -78,8 +92,9 @@ def run(mode, ip, port):
 
 if __name__ == "__main__":
     if (len(sys.argv)) < 4:
-        raise Exception("Usage: ./dummy_app <mode> <ip> <port> [<action>]")
-    if (len(sys.argv) == 5):
+        raise Exception("Usage: ./dummy_app <mode> <ip> <port> [<action> <test>]")
+    if (len(sys.argv) == 6):
         action = sys.argv[4]
+        test = int(sys.argv[5])
 
     run (sys.argv[1], sys.argv[2], int(sys.argv[3]))
index 64d3b49..2366f42 100644 (file)
@@ -17,6 +17,7 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <arpa/inet.h>
 #include <netdb.h>
 #include <vppinfra/format.h>
 #include <signal.h>
@@ -72,32 +73,59 @@ setup_signal_handler (void)
 int
 main (int argc, char *argv[])
 {
-  int sockfd, portno, n, sent, accfd;
+  int sockfd, portno, n, sent, accfd, reuse;
+  socklen_t client_addr_len;
   struct sockaddr_in serv_addr;
+  struct sockaddr_in client;
   struct hostent *server;
   u8 *rx_buffer = 0;
 
-  if (0 && argc < 3)
+  if (argc > 1 && argc < 3)
     {
-      fformat (stderr, "usage %s hostname port\n", argv[0]);
+      fformat (stderr, "usage %s host port\n", argv[0]);
       exit (0);
     }
 
+  if (argc >= 3)
+    {
+      portno = atoi (argv[2]);
+      server = gethostbyname (argv[1]);
+      if (server == NULL)
+       {
+         clib_unix_warning ("gethostbyname");
+         exit (1);
+       }
+    }
+  else
+    {
+      /* Defaults */
+      portno = 1234;
+      server = gethostbyname ("6.0.1.1");
+      if (server == NULL)
+       {
+         clib_unix_warning ("gethostbyname");
+         exit (1);
+       }
+    }
+
+
   setup_signal_handler ();
 
-  portno = 1234;               // atoi(argv[2]);
   sockfd = socket (AF_INET, SOCK_STREAM, 0);
   if (sockfd < 0)
     {
       clib_unix_error ("socket");
       exit (1);
     }
-  server = gethostbyname ("6.0.1.1");
-  if (server == NULL)
+
+  reuse = 1;
+  if (setsockopt (sockfd, SOL_SOCKET, SO_REUSEADDR, (const char *) &reuse,
+                 sizeof (reuse)) < 0)
     {
-      clib_unix_warning ("gethostbyname");
+      clib_unix_error ("setsockopt(SO_REUSEADDR) failed");
       exit (1);
     }
+
   bzero ((char *) &serv_addr, sizeof (serv_addr));
   serv_addr.sin_family = AF_INET;
   bcopy ((char *) server->h_addr,
@@ -123,12 +151,15 @@ main (int argc, char *argv[])
       if (signal_received)
        break;
 
-      accfd = accept (sockfd, 0 /* don't care */ , 0);
+      client_addr_len = sizeof (struct sockaddr);
+      accfd = accept (sockfd, (struct sockaddr *) &client, &client_addr_len);
       if (accfd < 0)
        {
          clib_unix_warning ("accept");
          continue;
        }
+      fformat (stderr, "Accepted connection from: %s : %d\n",
+              inet_ntoa (client.sin_addr), client.sin_port);
       while (1)
        {
          n = recv (accfd, rx_buffer, vec_len (rx_buffer), 0 /* flags */ );
index 2d12ee2..ce7c386 100644 (file)
@@ -70,6 +70,58 @@ static u32 session_type_to_next[] = {
   SESSION_QUEUE_NEXT_IP6_LOOKUP,
 };
 
+always_inline void
+session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
+                           u8 thread_index, svm_fifo_t * fifo,
+                           vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg,
+                           u32 * left_to_snd0, u16 * n_bufs, u32 * rx_offset,
+                           u16 deq_per_buf, u8 peek_data)
+{
+  vlib_buffer_t *chain_b0, *prev_b0;
+  u32 chain_bi0;
+  u16 len_to_deq0, n_bytes_read;
+  u8 *data0, j;
+
+  chain_bi0 = bi0;
+  chain_b0 = b0;
+  for (j = 1; j < n_bufs_per_seg; j++)
+    {
+      prev_b0 = chain_b0;
+      len_to_deq0 = clib_min (*left_to_snd0, deq_per_buf);
+
+      *n_bufs -= 1;
+      chain_bi0 = smm->tx_buffers[thread_index][*n_bufs];
+      _vec_len (smm->tx_buffers[thread_index]) = *n_bufs;
+
+      chain_b0 = vlib_get_buffer (vm, chain_bi0);
+      chain_b0->current_data = 0;
+      data0 = vlib_buffer_get_current (chain_b0);
+      if (peek_data)
+       {
+         n_bytes_read = svm_fifo_peek (fifo, *rx_offset, len_to_deq0, data0);
+         *rx_offset += n_bytes_read;
+       }
+      else
+       {
+         n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0);
+       }
+      ASSERT (n_bytes_read == len_to_deq0);
+      chain_b0->current_length = n_bytes_read;
+      b0->total_length_not_including_first_buffer += chain_b0->current_length;
+
+      /* update previous buffer */
+      prev_b0->next_buffer = chain_bi0;
+      prev_b0->flags |= VLIB_BUFFER_NEXT_PRESENT;
+
+      /* update current buffer */
+      chain_b0->next_buffer = 0;
+
+      *left_to_snd0 -= n_bytes_read;
+      if (*left_to_snd0 == 0)
+       break;
+    }
+}
+
 always_inline int
 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                session_manager_main_t * smm,
@@ -78,16 +130,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
                                int *n_tx_packets, u8 peek_data)
 {
   u32 n_trace = vlib_get_trace_count (vm, node);
-  u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
-  u32 n_frame_bytes, n_frames_per_evt;
+  u32 left_to_snd0, max_len_to_snd0, len_to_deq0, snd_space0;
+  u32 n_bufs_per_evt, n_frames_per_evt;
   transport_connection_t *tc0;
   transport_proto_vft_t *transport_vft;
   u32 next_index, next0, *to_next, n_left_to_next, bi0;
   vlib_buffer_t *b0;
-  u32 rx_offset = 0, max_dequeue0;
-  u16 snd_mss0;
+  u32 rx_offset = 0, max_dequeue0, n_bytes_per_seg;
+  u16 snd_mss0, n_bufs_per_seg, n_bufs;
   u8 *data0;
   int i, n_bytes_read;
+  u32 n_bytes_per_buf, deq_per_buf;
 
   next_index = next0 = session_type_to_next[s0->session_type];
 
@@ -134,8 +187,15 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
       max_len_to_snd0 = snd_space0;
     }
 
-  n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE;
-  n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes);
+  n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm,
+                                                      VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
+  n_bytes_per_seg = MAX_HDRS_LEN + snd_mss0;
+  n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf);
+  n_bufs_per_evt = (ceil ((double) max_len_to_snd0 / n_bytes_per_seg))
+    * n_bufs_per_seg;
+  n_frames_per_evt = ceil ((double) n_bufs_per_evt / VLIB_FRAME_SIZE);
+
+  deq_per_buf = clib_min (snd_mss0, n_bytes_per_buf);
 
   n_bufs = vec_len (smm->tx_buffers[thread_index]);
   left_to_snd0 = max_len_to_snd0;
@@ -146,9 +206,9 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
        {
          vec_validate (smm->tx_buffers[thread_index],
                        n_bufs + VLIB_FRAME_SIZE - 1);
-         n_bufs +=
-           vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs],
-                              VLIB_FRAME_SIZE);
+         n_bufs += vlib_buffer_alloc (vm,
+                                      &smm->tx_buffers[thread_index][n_bufs],
+                                      VLIB_FRAME_SIZE);
 
          /* buffer shortage
           * XXX 0.9 because when debugging we might not get a full frame */
@@ -165,11 +225,14 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
        }
 
       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
-      while (left_to_snd0 && n_left_to_next)
+      while (left_to_snd0 && n_left_to_next >= n_bufs_per_seg)
        {
+         /*
+          * Handle first buffer in chain separately
+          */
+
          /* Get free buffer */
-         n_bufs--;
-         bi0 = smm->tx_buffers[thread_index][n_bufs];
+         bi0 = smm->tx_buffers[thread_index][--n_bufs];
          _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
 
          b0 = vlib_get_buffer (vm, bi0);
@@ -177,52 +240,19 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
          b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID
            | VNET_BUFFER_LOCALLY_ORIGINATED;
          b0->current_data = 0;
+         b0->total_length_not_including_first_buffer = 0;
 
          /* RX on the local interface. tx in default fib */
          vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
          vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
 
-         /* usual speculation, or the enqueue_x1 macro will barf */
-         to_next[0] = bi0;
-         to_next += 1;
-         n_left_to_next -= 1;
-
-         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
-         if (PREDICT_FALSE (n_trace > 0))
-           {
-             session_queue_trace_t *t0;
-             vlib_trace_buffer (vm, node, next_index, b0,
-                                1 /* follow_chain */ );
-             vlib_set_trace_count (vm, node, --n_trace);
-             t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
-             t0->session_index = s0->session_index;
-             t0->server_thread_index = s0->thread_index;
-           }
+         len_to_deq0 = clib_min (left_to_snd0, deq_per_buf);
 
-         len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
-
-         /* *INDENT-OFF* */
-         SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
-             ed->data[0] = e0->event_id;
-             ed->data[1] = max_dequeue0;
-             ed->data[2] = len_to_deq0;
-             ed->data[3] = left_to_snd0;
-         }));
-         /* *INDENT-ON* */
-
-         /* Make room for headers */
          data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
-
-         /* Dequeue the data
-          * TODO 1) peek instead of dequeue
-          *      2) buffer chains */
          if (peek_data)
            {
              n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset,
                                            len_to_deq0, data0);
-             if (n_bytes_read <= 0)
-               goto dequeue_fail;
-
              /* Keep track of progress locally, transport is also supposed to
               * increment it independently when pushing the header */
              rx_offset += n_bytes_read;
@@ -231,18 +261,56 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
            {
              n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
                                                      len_to_deq0, data0);
-             if (n_bytes_read <= 0)
-               goto dequeue_fail;
            }
 
-         b0->current_length = n_bytes_read;
+         if (n_bytes_read <= 0)
+           goto dequeue_fail;
 
-         /* Ask transport to push header */
-         transport_vft->push_header (tc0, b0);
+         b0->current_length = n_bytes_read;
 
          left_to_snd0 -= n_bytes_read;
          *n_tx_packets = *n_tx_packets + 1;
 
+         /*
+          * Fill in the remaining buffers in the chain, if any
+          */
+         if (PREDICT_FALSE (n_bufs_per_seg > 1))
+           session_tx_fifo_chain_tail (smm, vm, thread_index,
+                                       s0->server_tx_fifo, b0, bi0,
+                                       n_bufs_per_seg, &left_to_snd0,
+                                       &n_bufs, &rx_offset, deq_per_buf,
+                                       peek_data);
+
+         /* Ask transport to push header after current_length and
+          * total_length_not_including_first_buffer are updated */
+         transport_vft->push_header (tc0, b0);
+
+         /* *INDENT-OFF* */
+         SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
+             ed->data[0] = e0->event_id;
+             ed->data[1] = max_dequeue0;
+             ed->data[2] = len_to_deq0;
+             ed->data[3] = left_to_snd0;
+         }));
+         /* *INDENT-ON* */
+
+         /* usual speculation, or the enqueue_x1 macro will barf */
+         to_next[0] = bi0;
+         to_next += 1;
+         n_left_to_next -= 1;
+
+         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
+         if (PREDICT_FALSE (n_trace > 0))
+           {
+             session_queue_trace_t *t0;
+             vlib_trace_buffer (vm, node, next_index, b0,
+                                1 /* follow_chain */ );
+             vlib_set_trace_count (vm, node, --n_trace);
+             t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
+             t0->session_index = s0->session_index;
+             t0->server_thread_index = s0->thread_index;
+           }
+
          vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
                                           to_next, n_left_to_next,
                                           bi0, next0);
index e92bb44..6e129dd 100644 (file)
@@ -432,33 +432,97 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
   return 0;
 }
 
+/** Enqueue buffer chain tail */
+always_inline int
+session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
+                           u32 offset, u8 is_in_order)
+{
+  vlib_buffer_t *chain_b;
+  u32 chain_bi = b->next_buffer;
+  vlib_main_t *vm = vlib_get_main ();
+  u8 *data, len;
+  u16 written = 0;
+  int rv = 0;
+
+  do
+    {
+      chain_b = vlib_get_buffer (vm, chain_bi);
+      data = vlib_buffer_get_current (chain_b);
+      len = chain_b->current_length;
+      if (is_in_order)
+       {
+         rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
+         if (rv < len)
+           {
+             return (rv > 0) ? (written + rv) : written;
+           }
+         written += rv;
+       }
+      else
+       {
+         rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
+                                            data);
+         if (rv)
+           return -1;
+         offset += len;
+       }
+    }
+  while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
+         ? chain_b->next_buffer : 0));
+
+  if (is_in_order)
+    return written;
+
+  return 0;
+}
+
 /*
  * Enqueue data for delivery to session peer. Does not notify peer of enqueue
  * event but on request can queue notification events for later delivery by
  * calling stream_server_flush_enqueue_events().
  *
  * @param tc Transport connection which is to be enqueued data
- * @param data Data to be enqueued
- * @param len Length of data to be enqueued
+ * @param b Buffer to be enqueued
+ * @param offset Offset at which to start enqueueing if out-of-order
  * @param queue_event Flag to indicate if peer is to be notified or if event
  *                    is to be queued. The former is useful when more data is
  *                    enqueued and only one event is to be generated.
+ * @param is_in_order Flag to indicate if data is in order
  * @return Number of bytes enqueued or a negative value if enqueueing failed.
  */
 int
-stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
-                            u8 queue_event)
+stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
+                            u32 offset, u8 queue_event, u8 is_in_order)
 {
   stream_session_t *s;
-  int enqueued;
+  int enqueued = 0, rv;
 
   s = stream_session_get (tc->s_index, tc->thread_index);
 
-  /* Make sure there's enough space left. We might've filled the pipes */
-  if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo)))
-    return -1;
-
-  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
+  if (is_in_order)
+    {
+      enqueued =
+       svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
+                                vlib_buffer_get_current (b));
+      if (PREDICT_FALSE
+         ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0))
+       {
+         rv = session_enqueue_chain_tail (s, b, 0, 1);
+         if (rv <= 0)
+           return enqueued;
+         enqueued += rv;
+       }
+    }
+  else
+    {
+      rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
+                                        b->current_length,
+                                        vlib_buffer_get_current (b));
+      if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
+       rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
+      if (rv)
+       return -1;
+    }
 
   if (queue_event)
     {
@@ -476,7 +540,10 @@ stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
        }
     }
 
-  return enqueued;
+  if (is_in_order)
+    return enqueued;
+
+  return 0;
 }
 
 /** Check if we have space in rx fifo to push more bytes */
index f41a8a9..f152a2b 100644 (file)
@@ -345,8 +345,8 @@ stream_session_fifo_size (transport_connection_t * tc)
 }
 
 int
-stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
-                            u8 queue_event);
+stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
+                            u32 offset, u8 queue_event, u8 is_in_order);
 u32
 stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
                           u32 offset, u32 max_bytes);
index d268251..ceb00fc 100644 (file)
@@ -993,9 +993,8 @@ tcp_session_enqueue_data (tcp_connection_t * tc, vlib_buffer_t * b,
       return TCP_ERROR_PURE_ACK;
     }
 
-  written = stream_session_enqueue_data (&tc->connection,
-                                        vlib_buffer_get_current (b),
-                                        data_len, 1 /* queue event */ );
+  written = stream_session_enqueue_data (&tc->connection, b, 0,
+                                        1 /* queue event */ , 1);
 
   TCP_EVT_DBG (TCP_EVT_INPUT, tc, 0, data_len, written);
 
@@ -1053,12 +1052,10 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
       return TCP_ERROR_PURE_ACK;
     }
 
-  s0 = stream_session_get (tc->c_s_index, tc->c_thread_index);
-
   /* Enqueue out-of-order data with absolute offset */
-  rv = svm_fifo_enqueue_with_offset (s0->server_rx_fifo,
-                                    vnet_buffer (b)->tcp.seq_number,
-                                    data_len, vlib_buffer_get_current (b));
+  rv = stream_session_enqueue_data (&tc->connection, b,
+                                   vnet_buffer (b)->tcp.seq_number,
+                                   0 /* queue event */ , 0);
 
   /* Nothing written */
   if (rv)
@@ -1075,6 +1072,8 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
       ooo_segment_t *newest;
       u32 start, end;
 
+      s0 = stream_session_get (tc->c_s_index, tc->c_thread_index);
+
       /* Get the newest segment from the fifo */
       newest = svm_fifo_newest_ooo_segment (s0->server_rx_fifo);
       start = ooo_segment_offset (s0->server_rx_fifo, newest);
@@ -2543,6 +2542,7 @@ do {                                                              \
   _(FIN_WAIT_1, TCP_FLAG_FIN, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE);
   /* FIN confirming that the peer (app) has closed */
   _(FIN_WAIT_2, TCP_FLAG_FIN, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE);
+  _(FIN_WAIT_2, TCP_FLAG_ACK, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE);
   _(FIN_WAIT_2, TCP_FLAG_FIN | TCP_FLAG_ACK, TCP_INPUT_NEXT_RCV_PROCESS,
     TCP_ERROR_NONE);
   _(LAST_ACK, TCP_FLAG_ACK, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE);
index 2a1b140..33e599e 100644 (file)
@@ -46,7 +46,7 @@ typedef struct
   tcp_connection_t tcp_connection;
 } tcp_tx_trace_t;
 
-u16 dummy_mtu = 400;
+u16 dummy_mtu = 1460;
 
 u8 *
 format_tcp_tx_trace (u8 * s, va_list * args)
@@ -923,7 +923,7 @@ tcp_push_hdr_i (tcp_connection_t * tc, vlib_buffer_t * b,
   u8 tcp_hdr_opts_len, opts_write_len, flags;
   tcp_header_t *th;
 
-  data_len = b->current_length;
+  data_len = b->current_length + b->total_length_not_including_first_buffer;
   vnet_buffer (b)->tcp.flags = 0;
 
   if (compute_opts)