TCP/session improvements 44/5744/19
authorFlorin Coras <fcoras@cisco.com>
Mon, 13 Mar 2017 10:49:51 +0000 (03:49 -0700)
committerFlorin Coras <fcoras@cisco.com>
Tue, 28 Mar 2017 06:16:55 +0000 (23:16 -0700)
- Added svm fifo flag for tracking fifo dequeue events (replaces event
  length). Updated all code to switch to the new scheme.
- More session debugging
- Fix peek index wrap
- Add a trivial socket test client
- Fast retransmit/cc fixes
- tx and rx SACK fixes and unit testing
- SRTT computation fix
- remove dupack/ack burst filters
- improve ack rx
- improved segment rx
- builtin client test code

Change-Id: Ic4eb2d5ca446eb2260ccd3ccbcdaa73c64e7f4e1
Signed-off-by: Florin Coras <fcoras@cisco.com>
Signed-off-by: Dave Barach <dbarach@cisco.com>
28 files changed:
src/svm/svm_fifo.c
src/svm/svm_fifo.h
src/svm/svm_fifo_segment.h
src/uri.am
src/uri/uri_socket_test.c [new file with mode: 0644]
src/uri/uri_tcp_test.c
src/uri/uri_udp_test.c
src/vnet.am
src/vnet/session/application.h
src/vnet/session/node.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_cli.c
src/vnet/session/session_debug.h
src/vnet/session/transport.h
src/vnet/tcp/builtin_client.c [new file with mode: 0644]
src/vnet/tcp/builtin_client.h [new file with mode: 0644]
src/vnet/tcp/builtin_server.c
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp.h
src/vnet/tcp/tcp_debug.h
src/vnet/tcp/tcp_error.def
src/vnet/tcp/tcp_input.c
src/vnet/tcp/tcp_output.c
src/vnet/tcp/tcp_packet.h
src/vnet/tcp/tcp_test.c [new file with mode: 0644]
src/vnet/udp/builtin_server.c
src/vnet/udp/udp_input.c

index e3f534b..07b0d2d 100644 (file)
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-#include "svm_fifo.h"
+#include <svm/svm_fifo.h>
 
 /** create an svm fifo, in the current heap. Fails vs blow up the process */
 svm_fifo_t *
@@ -362,18 +362,19 @@ svm_fifo_enqueue_nowait (svm_fifo_t * f,
   return svm_fifo_enqueue_internal (f, pid, max_bytes, copy_from_here);
 }
 
-/** Enqueue a future segment.
+/**
+ * Enqueue a future segment.
+ *
  * Two choices: either copies the entire segment, or copies nothing
  * Returns 0 of the entire segment was copied
  * Returns -1 if none of the segment was copied due to lack of space
  */
-
 static int
-svm_fifo_enqueue_with_offset_internal2 (svm_fifo_t * f,
-                                       int pid,
-                                       u32 offset,
-                                       u32 required_bytes,
-                                       u8 * copy_from_here)
+svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
+                                      int pid,
+                                      u32 offset,
+                                      u32 required_bytes,
+                                      u8 * copy_from_here)
 {
   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
   u32 cursize, nitems;
@@ -424,14 +425,14 @@ svm_fifo_enqueue_with_offset (svm_fifo_t * f,
                              u32 offset,
                              u32 required_bytes, u8 * copy_from_here)
 {
-  return svm_fifo_enqueue_with_offset_internal2
+  return svm_fifo_enqueue_with_offset_internal
     (f, pid, offset, required_bytes, copy_from_here);
 }
 
 
 static int
-svm_fifo_dequeue_internal2 (svm_fifo_t * f,
-                           int pid, u32 max_bytes, u8 * copy_here)
+svm_fifo_dequeue_internal (svm_fifo_t * f,
+                          int pid, u32 max_bytes, u8 * copy_here)
 {
   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
   u32 cursize, nitems;
@@ -484,7 +485,7 @@ int
 svm_fifo_dequeue_nowait (svm_fifo_t * f,
                         int pid, u32 max_bytes, u8 * copy_here)
 {
-  return svm_fifo_dequeue_internal2 (f, pid, max_bytes, copy_here);
+  return svm_fifo_dequeue_internal (f, pid, max_bytes, copy_here);
 }
 
 int
@@ -492,7 +493,7 @@ svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
               u8 * copy_here)
 {
   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
-  u32 cursize, nitems;
+  u32 cursize, nitems, real_head;
 
   if (PREDICT_FALSE (f->cursize == 0))
     return -2;                 /* nothing in the fifo */
@@ -500,6 +501,8 @@ svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
   /* read cursize, which can only increase while we're working */
   cursize = f->cursize;
   nitems = f->nitems;
+  real_head = f->head + offset;
+  real_head = real_head >= nitems ? real_head - nitems : real_head;
 
   /* Number of bytes we're going to copy */
   total_copy_bytes = (cursize < max_bytes) ? cursize : max_bytes;
@@ -508,9 +511,9 @@ svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
     {
       /* Number of bytes in first copy segment */
       first_copy_bytes =
-       ((nitems - f->head + offset) < total_copy_bytes) ?
-       (nitems - f->head + offset) : total_copy_bytes;
-      clib_memcpy (copy_here, &f->data[f->head + offset], first_copy_bytes);
+       ((nitems - real_head) < total_copy_bytes) ?
+       (nitems - real_head) : total_copy_bytes;
+      clib_memcpy (copy_here, &f->data[real_head], first_copy_bytes);
 
       /* Number of bytes in second copy segment, if any */
       second_copy_bytes = total_copy_bytes - first_copy_bytes;
index 70624b7..3955617 100644 (file)
@@ -46,9 +46,11 @@ typedef struct
 {
   pthread_mutex_t mutex;       /* 8 bytes */
   pthread_cond_t condvar;      /* 8 bytes */
-  u32 owner_pid;
   svm_lock_tag_t tag;
-  volatile u32 cursize;
+
+  volatile u32 cursize;                /**< current fifo size */
+  volatile u8 has_event;       /**< non-zero if deq event exists */
+  u32 owner_pid;
   u32 nitems;
 
   /* Backpointers */
@@ -112,6 +114,28 @@ svm_fifo_has_ooo_data (svm_fifo_t * f)
   return f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX;
 }
 
+/**
+ * Sets fifo event flag.
+ *
+ * @return 1 if flag was not set.
+ */
+always_inline u8
+svm_fifo_set_event (svm_fifo_t * f)
+{
+  /* Probably doesn't need to be atomic. Still, better avoid surprises */
+  return __sync_lock_test_and_set (&f->has_event, 1) == 0;
+}
+
+/**
+ * Unsets fifo event flag.
+ */
+always_inline void
+svm_fifo_unset_event (svm_fifo_t * f)
+{
+  /* Probably doesn't need to be atomic. Still, better avoid surprises */
+  __sync_lock_test_and_set (&f->has_event, 0);
+}
+
 svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes);
 
 int svm_fifo_enqueue_nowait (svm_fifo_t * f, int pid, u32 max_bytes,
index 793fa7c..ecb5653 100644 (file)
@@ -15,8 +15,8 @@
 #ifndef __included_ssvm_fifo_segment_h__
 #define __included_ssvm_fifo_segment_h__
 
-#include "svm_fifo.h"
-#include "ssvm.h"
+#include <svm/svm_fifo.h>
+#include <svm/ssvm.h>
 
 typedef struct
 {
index 09b5b15..ad4d65d 100644 (file)
@@ -11,7 +11,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-noinst_PROGRAMS += uri_udp_test uri_tcp_test
+noinst_PROGRAMS += uri_udp_test uri_tcp_test uri_socket_test
 
 uri_udp_test_SOURCES = uri/uri_udp_test.c
 uri_udp_test_LDADD = libvlibmemoryclient.la libvlibapi.la libsvm.la \
@@ -20,3 +20,6 @@ uri_udp_test_LDADD = libvlibmemoryclient.la libvlibapi.la libsvm.la \
 uri_tcp_test_SOURCES = uri/uri_tcp_test.c
 uri_tcp_test_LDADD = libvlibmemoryclient.la libvlibapi.la libsvm.la \
        libvppinfra.la -lpthread -lm -lrt
+
+uri_socket_test_SOURCES = uri/uri_socket_test.c
+uri_socket_test_LDADD = libvppinfra.la -lpthread -lm -lrt
diff --git a/src/uri/uri_socket_test.c b/src/uri/uri_socket_test.c
new file mode 100644 (file)
index 0000000..9f049bd
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <vppinfra/format.h>
+
+int
+main (int argc, char *argv[])
+{
+  int sockfd, portno, n;
+  struct sockaddr_in serv_addr;
+  struct hostent *server;
+  u8 *rx_buffer = 0, *tx_buffer = 0;
+  u32 offset;
+  int iter, i;
+  if (0 && argc < 3)
+    {
+      fformat (stderr, "usage %s hostname port\n", argv[0]);
+      exit (0);
+    }
+
+  portno = 1234;               // atoi(argv[2]);
+  sockfd = socket (AF_INET, SOCK_STREAM, 0);
+  if (sockfd < 0)
+    {
+      clib_unix_error ("socket");
+      exit (1);
+    }
+  server = gethostbyname ("6.0.1.1" /* argv[1] */ );
+  if (server == NULL)
+    {
+      clib_unix_warning ("gethostbyname");
+      exit (1);
+    }
+  bzero ((char *) &serv_addr, sizeof (serv_addr));
+  serv_addr.sin_family = AF_INET;
+  bcopy ((char *) server->h_addr,
+        (char *) &serv_addr.sin_addr.s_addr, server->h_length);
+  serv_addr.sin_port = htons (portno);
+  if (connect (sockfd, (const void *) &serv_addr, sizeof (serv_addr)) < 0)
+    {
+      clib_unix_warning ("connect");
+      exit (1);
+    }
+
+  vec_validate (rx_buffer, 1400);
+  vec_validate (tx_buffer, 1400);
+
+  for (i = 0; i < vec_len (tx_buffer); i++)
+    tx_buffer[i] = (i + 1) % 0xff;
+
+  /*
+   * Send one packet to warm up the RX pipeline
+   */
+  n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
+  if (n != vec_len (tx_buffer))
+    {
+      clib_unix_warning ("write");
+      exit (0);
+    }
+
+  for (iter = 0; iter < 100000; iter++)
+    {
+      if (iter < 99999)
+       {
+         n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
+         if (n != vec_len (tx_buffer))
+           {
+             clib_unix_warning ("write");
+             exit (0);
+           }
+       }
+      offset = 0;
+
+      do
+       {
+         n = recv (sockfd, rx_buffer + offset,
+                   vec_len (rx_buffer) - offset, 0 /* flags */ );
+         if (n < 0)
+           {
+             clib_unix_warning ("read");
+             exit (0);
+           }
+         offset += n;
+       }
+      while (offset < vec_len (rx_buffer));
+
+      for (i = 0; i < vec_len (rx_buffer); i++)
+       {
+         if (rx_buffer[i] != tx_buffer[i])
+           {
+             clib_warning ("[%d] read 0x%x not 0x%x",
+                           rx_buffer[i], tx_buffer[i]);
+             exit (1);
+           }
+       }
+
+    }
+  close (sockfd);
+  return 0;
+}
+
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
index 406a5f4..e283481 100644 (file)
@@ -116,6 +116,7 @@ typedef struct
   pthread_t client_rx_thread_handle;
   u32 client_bytes_received;
   u8 test_return_packets;
+  u32 bytes_to_send;
 
   /* convenience */
   svm_fifo_segment_main_t *segment_main;
@@ -313,11 +314,16 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
 
   rx_fifo = e->fifo;
 
-  bytes = e->enqueue_length;
+  bytes = svm_fifo_max_dequeue (rx_fifo);
+  /* Allow enqueuing of new event */
+  svm_fifo_unset_event (rx_fifo);
+
+  /* Read the bytes */
   do
     {
-      n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
-                                       utm->rx_buf);
+      n_read = svm_fifo_dequeue_nowait (rx_fifo, 0,
+                                       clib_min (vec_len (utm->rx_buf),
+                                                 bytes), utm->rx_buf);
       if (n_read > 0)
        {
          bytes -= n_read;
@@ -333,9 +339,17 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
            }
          utm->client_bytes_received += n_read;
        }
+      else
+       {
+         if (n_read == -2)
+           {
+             clib_warning ("weird!");
+             break;
+           }
+       }
 
     }
-  while (n_read < 0 || bytes > 0);
+  while (bytes > 0);
 }
 
 void
@@ -479,47 +493,41 @@ vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
     }
 }
 
-void
-client_send_data (uri_tcp_test_main_t * utm)
+static void
+send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
+                u32 bytes)
 {
   u8 *test_data = utm->connect_test_data;
   u64 bytes_sent = 0;
-  int rv;
-  int mypid = getpid ();
-  session_t *session;
-  svm_fifo_t *tx_fifo;
-  int buffer_offset, bytes_to_send = 0;
+  int test_buf_offset = 0;
+  u32 bytes_to_snd;
+  u32 queue_max_chunk = 64 << 10, actual_write;
   session_fifo_event_t evt;
   static int serial_number = 0;
-  int i;
-  u32 max_chunk = 64 << 10, write;
-
-  session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
-  tx_fifo = session->server_tx_fifo;
+  int rv;
 
-  vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+  bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
+  if (bytes_to_snd > vec_len (test_data))
+    bytes_to_snd = vec_len (test_data);
 
-  for (i = 0; i < 1; i++)
+  while (bytes_to_snd > 0)
     {
-      bytes_to_send = vec_len (test_data);
-      buffer_offset = 0;
-      while (bytes_to_send > 0)
+      actual_write =
+       bytes_to_snd > queue_max_chunk ? queue_max_chunk : bytes_to_snd;
+      rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, actual_write,
+                                   test_data + test_buf_offset);
+
+      if (rv > 0)
        {
-         write = bytes_to_send > max_chunk ? max_chunk : bytes_to_send;
-         rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, write,
-                                       test_data + buffer_offset);
+         bytes_to_snd -= rv;
+         test_buf_offset += rv;
+         bytes_sent += rv;
 
-         if (rv > 0)
+         if (svm_fifo_set_event (tx_fifo))
            {
-             bytes_to_send -= rv;
-             buffer_offset += rv;
-             bytes_sent += rv;
-
              /* Fabricate TX event, send to vpp */
              evt.fifo = tx_fifo;
              evt.event_type = FIFO_EVENT_SERVER_TX;
-             /* $$$$ for event logging */
-             evt.enqueue_length = rv;
              evt.event_id = serial_number++;
 
              unix_shared_memory_queue_add (utm->vpp_event_queue,
@@ -528,13 +536,40 @@ client_send_data (uri_tcp_test_main_t * utm)
            }
        }
     }
+}
+
+void
+client_send_data (uri_tcp_test_main_t * utm)
+{
+  u8 *test_data = utm->connect_test_data;
+  int mypid = getpid ();
+  session_t *session;
+  svm_fifo_t *tx_fifo;
+  u32 n_iterations, leftover;
+  int i;
+
+  session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
+  tx_fifo = session->server_tx_fifo;
+
+  vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+  n_iterations = utm->bytes_to_send / vec_len (test_data);
+
+  for (i = 0; i < n_iterations; i++)
+    {
+      send_test_chunk (utm, tx_fifo, mypid, 0);
+    }
+
+  leftover = utm->bytes_to_send % vec_len (test_data);
+  if (leftover)
+    send_test_chunk (utm, tx_fifo, mypid, leftover);
 
   if (utm->test_return_packets)
     {
       f64 timeout = clib_time_now (&utm->clib_time) + 2;
 
       /* Wait for the outstanding packets */
-      while (utm->client_bytes_received < vec_len (test_data))
+      while (utm->client_bytes_received <
+            vec_len (test_data) * n_iterations + leftover)
        {
          if (clib_time_now (&utm->clib_time) > timeout)
            {
@@ -542,9 +577,8 @@ client_send_data (uri_tcp_test_main_t * utm)
              break;
            }
        }
-
-      utm->time_to_stop = 1;
     }
+  utm->time_to_stop = 1;
 }
 
 void
@@ -599,6 +633,11 @@ client_test (uri_tcp_test_main_t * utm)
 
   /* Disconnect */
   client_disconnect (utm);
+
+  if (wait_for_state_change (utm, STATE_START))
+    {
+      return;
+    }
 }
 
 static void
@@ -714,7 +753,6 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
 {
   svm_fifo_t *rx_fifo, *tx_fifo;
   int n_read;
-
   session_fifo_event_t evt;
   unix_shared_memory_queue_t *q;
   int rv, bytes;
@@ -722,34 +760,46 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
   rx_fifo = e->fifo;
   tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
 
-  bytes = e->enqueue_length;
+  bytes = svm_fifo_max_dequeue (rx_fifo);
+  /* Allow enqueuing of a new event */
+  svm_fifo_unset_event (rx_fifo);
+
+  if (bytes == 0)
+    return;
+
+  /* Read the bytes */
   do
     {
       n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
                                        utm->rx_buf);
+      if (n_read > 0)
+       bytes -= n_read;
+
+      if (utm->drop_packets)
+       continue;
 
       /* Reflect if a non-drop session */
-      if (!utm->drop_packets && n_read > 0)
+      if (n_read > 0)
        {
          do
            {
              rv = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, utm->rx_buf);
            }
-         while (rv == -2 && !utm->time_to_stop);
-
-         /* Fabricate TX event, send to vpp */
-         evt.fifo = tx_fifo;
-         evt.event_type = FIFO_EVENT_SERVER_TX;
-         /* $$$$ for event logging */
-         evt.enqueue_length = n_read;
-         evt.event_id = e->event_id;
-         q = utm->vpp_event_queue;
-         unix_shared_memory_queue_add (q, (u8 *) & evt,
-                                       0 /* do wait for mutex */ );
-       }
+         while (rv <= 0 && !utm->time_to_stop);
 
-      if (n_read > 0)
-       bytes -= n_read;
+         /* If event wasn't set, add one */
+         if (svm_fifo_set_event (tx_fifo))
+           {
+             /* Fabricate TX event, send to vpp */
+             evt.fifo = tx_fifo;
+             evt.event_type = FIFO_EVENT_SERVER_TX;
+             evt.event_id = e->event_id;
+
+             q = utm->vpp_event_queue;
+             unix_shared_memory_queue_add (q, (u8 *) & evt,
+                                           0 /* do wait for mutex */ );
+           }
+       }
     }
   while ((n_read < 0 || bytes > 0) && !utm->time_to_stop);
 }
@@ -852,7 +902,10 @@ static void
 vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
                                           mp)
 {
+  uri_tcp_test_main_t *utm = &uri_tcp_test_main;
+
   clib_warning ("retval %d", ntohl (mp->retval));
+  utm->state = STATE_START;
 }
 
 #define foreach_uri_msg                                 \
@@ -888,6 +941,7 @@ main (int argc, char **argv)
   u8 *heap, *uri = 0;
   u8 *bind_uri = (u8 *) "tcp://0.0.0.0/1234";
   u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234";
+  u32 bytes_to_send = 64 << 10, mbytes;
   u32 tmp;
   mheap_t *h;
   session_t *session;
@@ -934,6 +988,10 @@ main (int argc, char **argv)
        drop_packets = 1;
       else if (unformat (a, "test"))
        test_return_packets = 1;
+      else if (unformat (a, "mbytes %d", &mbytes))
+       {
+         bytes_to_send = mbytes << 20;
+       }
       else
        {
          fformat (stderr, "%s: usage [master|slave]\n");
@@ -956,6 +1014,7 @@ main (int argc, char **argv)
   utm->segment_main = &svm_fifo_segment_main;
   utm->drop_packets = drop_packets;
   utm->test_return_packets = test_return_packets;
+  utm->bytes_to_send = bytes_to_send;
 
   setup_signal_handlers ();
   uri_api_hookup (utm);
index 54625d6..e6c239c 100644 (file)
@@ -742,17 +742,20 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
   /* Fabricate TX event, send to vpp */
   evt.fifo = tx_fifo;
   evt.event_type = FIFO_EVENT_SERVER_TX;
-  /* $$$$ for event logging */
-  evt.enqueue_length = nbytes;
   evt.event_id = e->event_id;
-  q = utm->vpp_event_queue;
-  unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
+
+  if (svm_fifo_set_event (tx_fifo))
+    {
+      q = utm->vpp_event_queue;
+      unix_shared_memory_queue_add (q, (u8 *) & evt,
+                                   0 /* do wait for mutex */ );
+    }
 }
 
 void
 server_handle_event_queue (uri_udp_test_main_t * utm)
 {
-  session_fifo_event_t _e, *e = &_e;;
+  session_fifo_event_t _e, *e = &_e;
 
   while (1)
     {
index 3e73de8..9c55e33 100644 (file)
@@ -462,7 +462,9 @@ libvnet_la_SOURCES +=                               \
  vnet/tcp/tcp_output.c                         \
  vnet/tcp/tcp_input.c                          \
  vnet/tcp/tcp_newreno.c                                \
+ vnet/tcp/builtin_client.c                     \
  vnet/tcp/builtin_server.c                     \
+ vnet/tcp/tcp_test.c                           \
  vnet/tcp/tcp.c
 
 nobase_include_HEADERS +=                      \
index a60a8b8..480828f 100644 (file)
@@ -45,8 +45,7 @@ typedef struct _stream_session_cb_vft
   void (*session_reset_callback) (stream_session_t * s);
 
   /* Direct RX callback, for built-in servers */
-  int (*builtin_server_rx_callback) (stream_session_t * session,
-                                    session_fifo_event_t * ep);
+  int (*builtin_server_rx_callback) (stream_session_t * session);
 
   /* Redirect connection to local server */
   int (*redirect_connect_callback) (u32 api_client_index, void *mp);
index 822afeb..8681105 100644 (file)
  * limitations under the License.
  */
 
+#include <math.h>
 #include <vlib/vlib.h>
 #include <vnet/vnet.h>
-#include <vnet/pg/pg.h>
-#include <vnet/ip/ip.h>
-
 #include <vnet/tcp/tcp.h>
-
-#include <vppinfra/hash.h>
-#include <vppinfra/error.h>
 #include <vppinfra/elog.h>
-#include <vlibmemory/unix_shared_memory_queue.h>
-
-#include <vnet/udp/udp_packet.h>
-#include <math.h>
+#include <vnet/session/application.h>
 #include <vnet/session/session_debug.h>
+#include <vlibmemory/unix_shared_memory_queue.h>
 
 vlib_node_registration_t session_queue_node;
 
@@ -52,8 +45,8 @@ format_session_queue_trace (u8 * s, va_list * args)
 
 vlib_node_registration_t session_queue_node;
 
-#define foreach_session_queue_error                 \
-_(TX, "Packets transmitted")                    \
+#define foreach_session_queue_error            \
+_(TX, "Packets transmitted")                   \
 _(TIMER, "Timer events")
 
 typedef enum
@@ -91,10 +84,10 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   transport_proto_vft_t *transport_vft;
   u32 next_index, next0, *to_next, n_left_to_next, bi0;
   vlib_buffer_t *b0;
-  u32 rx_offset;
+  u32 rx_offset = 0, max_dequeue0;
   u16 snd_mss0;
   u8 *data0;
-  int i;
+  int i, n_bytes_read;
 
   next_index = next0 = session_type_to_next[s0->session_type];
 
@@ -106,24 +99,33 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   snd_mss0 = transport_vft->send_mss (tc0);
 
   /* Can't make any progress */
-  if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0
-      || snd_mss0 == 0)
+  if (snd_space0 == 0 || snd_mss0 == 0)
     {
       vec_add1 (smm->evts_partially_read[thread_index], *e0);
       return 0;
     }
 
-  ASSERT (e0->enqueue_length > 0);
-
-  /* Ensure we're not writing more than transport window allows */
-  max_len_to_snd0 = clib_min (e0->enqueue_length, snd_space0);
-
   if (peek_data)
     {
       /* Offset in rx fifo from where to peek data  */
       rx_offset = transport_vft->tx_fifo_offset (tc0);
     }
 
+  /* Check how much we can pull. If buffering, subtract the offset */
+  max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo) - rx_offset;
+
+  /* Allow enqueuing of a new event */
+  svm_fifo_unset_event (s0->server_tx_fifo);
+
+  /* Nothing to read return */
+  if (max_dequeue0 == 0)
+    {
+      return 0;
+    }
+
+  /* Ensure we're not writing more than transport window allows */
+  max_len_to_snd0 = clib_min (max_dequeue0, snd_space0);
+
   /* TODO check if transport is willing to send len_to_snd0
    * bytes (Nagle) */
 
@@ -147,13 +149,10 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
           * XXX 0.9 because when debugging we might not get a full frame */
          if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
            {
-             /* Keep track of how much we've dequeued and exit */
-             if (left_to_snd0 != max_len_to_snd0)
+             if (svm_fifo_set_event (s0->server_tx_fifo))
                {
-                 e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
                  vec_add1 (smm->evts_partially_read[thread_index], *e0);
                }
-
              return -1;
            }
 
@@ -198,9 +197,9 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
          len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
 
          /* *INDENT-OFF* */
-         SESSION_EVT_DBG(s0, SESSION_EVT_DEQ, ({
+         SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
              ed->data[0] = e0->event_id;
-             ed->data[1] = e0->enqueue_length;
+             ed->data[1] = max_dequeue0;
              ed->data[2] = len_to_deq0;
              ed->data[3] = left_to_snd0;
          }));
@@ -214,29 +213,30 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
           *      2) buffer chains */
          if (peek_data)
            {
-             int n_bytes_read;
              n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
                                            rx_offset, len_to_deq0, data0);
-             if (n_bytes_read < 0)
+             if (n_bytes_read <= 0)
                goto dequeue_fail;
 
              /* Keep track of progress locally, transport is also supposed to
-              * increment it independently when pushing header */
+              * increment it independently when pushing the header */
              rx_offset += n_bytes_read;
            }
          else
            {
-             if (svm_fifo_dequeue_nowait (s0->server_tx_fifo, s0->pid,
-                                          len_to_deq0, data0) < 0)
+             n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
+                                                     s0->pid, len_to_deq0,
+                                                     data0);
+             if (n_bytes_read <= 0)
                goto dequeue_fail;
            }
 
-         b0->current_length = len_to_deq0;
+         b0->current_length = n_bytes_read;
 
          /* Ask transport to push header */
          transport_vft->push_header (tc0, b0);
 
-         left_to_snd0 -= len_to_deq0;
+         left_to_snd0 -= n_bytes_read;
          *n_tx_packets = *n_tx_packets + 1;
 
          vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
@@ -246,25 +246,31 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
     }
 
-  /* If we couldn't dequeue all bytes store progress */
-  if (max_len_to_snd0 < e0->enqueue_length)
+  /* If we couldn't dequeue all bytes mark as partially read */
+  if (max_len_to_snd0 < max_dequeue0)
     {
-      e0->enqueue_length -= max_len_to_snd0;
-      vec_add1 (smm->evts_partially_read[thread_index], *e0);
+      /* If we don't already have new event */
+      if (svm_fifo_set_event (s0->server_tx_fifo))
+       {
+         vec_add1 (smm->evts_partially_read[thread_index], *e0);
+       }
     }
   return 0;
 
 dequeue_fail:
-  /* Can't read from fifo. Store event rx progress, save as partially read,
-   * return buff to free list and return  */
-  e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
-  vec_add1 (smm->evts_partially_read[thread_index], *e0);
+  /*
+   * Can't read from fifo. If we don't already have an event, save as partially
+   * read, return buff to free list and return
+   */
+  clib_warning ("dequeue fail");
 
-  to_next -= 1;
-  n_left_to_next += 1;
+  if (svm_fifo_set_event (s0->server_tx_fifo))
+    {
+      vec_add1 (smm->evts_partially_read[thread_index], *e0);
+    }
+  vlib_put_next_frame (vm, node, next_index, n_left_to_next + 1);
   _vec_len (smm->tx_buffers[thread_index]) += 1;
 
-  clib_warning ("dequeue fail");
   return 0;
 }
 
@@ -298,6 +304,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   session_fifo_event_t *my_fifo_events, *e;
   u32 n_to_dequeue, n_events;
   unix_shared_memory_queue_t *q;
+  application_t *app;
   int n_tx_packets = 0;
   u32 my_thread_index = vm->cpu_index;
   int i, rv;
@@ -321,13 +328,18 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0)
     return 0;
 
+  SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
+
   /*
    * If we didn't manage to process previous events try going
    * over them again without dequeuing new ones.
    */
   /* XXX: Block senders to sessions that can't keep up */
   if (vec_len (my_fifo_events) >= 100)
-    goto skip_dequeue;
+    {
+      clib_warning ("too many fifo events unsolved");
+      goto skip_dequeue;
+    }
 
   /* See you in the next life, don't be late */
   if (pthread_mutex_trylock (&q->mutex))
@@ -352,19 +364,17 @@ skip_dequeue:
     {
       svm_fifo_t *f0;          /* $$$ prefetch 1 ahead maybe */
       stream_session_t *s0;
-      u32 server_session_index0, server_thread_index0;
+      u32 session_index0;
       session_fifo_event_t *e0;
 
       e0 = &my_fifo_events[i];
       f0 = e0->fifo;
-      server_session_index0 = f0->server_session_index;
-      server_thread_index0 = f0->server_thread_index;
+      session_index0 = f0->server_session_index;
 
       /* $$$ add multiple event queues, per vpp worker thread */
-      ASSERT (server_thread_index0 == my_thread_index);
+      ASSERT (f0->server_thread_index == my_thread_index);
 
-      s0 = stream_session_get_if_valid (server_session_index0,
-                                       my_thread_index);
+      s0 = stream_session_get_if_valid (session_index0, my_thread_index);
 
       if (CLIB_DEBUG && !s0)
        {
@@ -385,11 +395,20 @@ skip_dequeue:
          rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
                                                        my_thread_index,
                                                        &n_tx_packets);
+         /* Out of buffers */
          if (rv < 0)
            goto done;
 
          break;
-
+       case FIFO_EVENT_SERVER_EXIT:
+         stream_session_disconnect (s0);
+         break;
+       case FIFO_EVENT_BUILTIN_RX:
+         svm_fifo_unset_event (s0->server_rx_fifo);
+         /* Get session's server */
+         app = application_get (s0->app_index);
+         app->cb_fns.builtin_server_rx_callback (s0);
+         break;
        default:
          clib_warning ("unhandled event type %d", e0->event_type);
        }
@@ -418,6 +437,8 @@ done:
   vlib_node_increment_counter (vm, session_queue_node.index,
                               SESSION_QUEUE_ERROR_TX, n_tx_packets);
 
+  SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 1);
+
   return n_tx_packets;
 }
 
index 06e2a09..f10918a 100644 (file)
@@ -804,30 +804,36 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block)
   /* Get session's server */
   app = application_get (s->app_index);
 
-  /* Fabricate event */
-  evt.fifo = s->server_rx_fifo;
-  evt.event_type = FIFO_EVENT_SERVER_RX;
-  evt.event_id = serial_number++;
-  evt.enqueue_length = svm_fifo_max_dequeue (s->server_rx_fifo);
-
   /* Built-in server? Hand event to the callback... */
   if (app->cb_fns.builtin_server_rx_callback)
-    return app->cb_fns.builtin_server_rx_callback (s, &evt);
-
-  /* Add event to server's event queue */
-  q = app->event_queue;
+    return app->cb_fns.builtin_server_rx_callback (s);
 
-  /* Based on request block (or not) for lack of space */
-  if (block || PREDICT_TRUE (q->cursize < q->maxsize))
-    unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
-                                 0 /* do wait for mutex */ );
-  else
-    return -1;
+  /* If no event, send one */
+  if (svm_fifo_set_event (s->server_rx_fifo))
+    {
+      /* Fabricate event */
+      evt.fifo = s->server_rx_fifo;
+      evt.event_type = FIFO_EVENT_SERVER_RX;
+      evt.event_id = serial_number++;
+
+      /* Add event to server's event queue */
+      q = app->event_queue;
+
+      /* Based on request block (or not) for lack of space */
+      if (block || PREDICT_TRUE (q->cursize < q->maxsize))
+       unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
+                                     0 /* do wait for mutex */ );
+      else
+       {
+         clib_warning ("fifo full");
+         return -1;
+       }
+    }
 
   /* *INDENT-OFF* */
-  SESSION_EVT_DBG(s, SESSION_EVT_ENQ, ({
+  SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
       ed->data[0] = evt.event_id;
-      ed->data[1] = evt.enqueue_length;
+      ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
   }));
   /* *INDENT-ON* */
 
@@ -1192,8 +1198,29 @@ stream_session_open (u8 sst, ip46_address_t * addr, u16 port_host_byte_order,
 void
 stream_session_disconnect (stream_session_t * s)
 {
+//  session_fifo_event_t evt;
+
   s->session_state = SESSION_STATE_CLOSED;
+  /* RPC to vpp evt queue in the right thread */
+
   tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
+
+//  {
+//  /* Fabricate event */
+//  evt.fifo = s->server_rx_fifo;
+//  evt.event_type = FIFO_EVENT_SERVER_RX;
+//  evt.event_id = serial_number++;
+//
+//  /* Based on request block (or not) for lack of space */
+//  if (PREDICT_TRUE(q->cursize < q->maxsize))
+//    unix_shared_memory_queue_add (app->event_queue, (u8 *) &evt,
+//                                0 /* do wait for mutex */);
+//  else
+//    {
+//      clib_warning("fifo full");
+//      return -1;
+//    }
+//  }
 }
 
 /**
index 96c00d8..a39bc06 100644 (file)
@@ -33,6 +33,7 @@ typedef enum
   FIFO_EVENT_SERVER_TX,
   FIFO_EVENT_TIMEOUT,
   FIFO_EVENT_SERVER_EXIT,
+  FIFO_EVENT_BUILTIN_RX
 } fifo_event_type_t;
 
 #define foreach_session_input_error                                         \
@@ -91,14 +92,13 @@ typedef enum
   SESSION_STATE_N_STATES,
 } stream_session_state_t;
 
-typedef CLIB_PACKED (struct
-                    {
-                    svm_fifo_t * fifo;
-                    u8 event_type;
-                    /* $$$$ for event logging */
-                    u16 event_id;
-                    u32 enqueue_length;
-                    }) session_fifo_event_t;
+/* *INDENT-OFF* */
+typedef CLIB_PACKED (struct {
+  svm_fifo_t * fifo;
+  u8 event_type;
+  u16 event_id;
+}) session_fifo_event_t;
+/* *INDENT-ON* */
 
 typedef struct _stream_session_t
 {
@@ -333,7 +333,7 @@ stream_session_get_index (stream_session_t * s)
 }
 
 always_inline u32
-stream_session_max_enqueue (transport_connection_t * tc)
+stream_session_max_rx_enqueue (transport_connection_t * tc)
 {
   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
   return svm_fifo_max_enqueue (s->server_rx_fifo);
@@ -346,7 +346,6 @@ stream_session_fifo_size (transport_connection_t * tc)
   return s->server_rx_fifo->nitems;
 }
 
-
 int
 stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
                             u8 queue_event);
index b029ee6..38762af 100644 (file)
@@ -107,7 +107,7 @@ show_session_command_fn (vlib_main_t * vm, unformat_input_t * input,
            {
              if (once_per_pool)
                {
-                 str = format (str, "%-40s%-20s%-20s%-15s",
+                 str = format (str, "%-50s%-20s%-20s%-15s",
                                "Connection", "Rx fifo", "Tx fifo",
                                "Session Index");
                  vlib_cli_output (vm, "%v", str);
index 858f12e..80a97cd 100644 (file)
@@ -21,7 +21,8 @@
 
 #define foreach_session_dbg_evt                \
   _(ENQ, "enqueue")                    \
-  _(DEQ, "dequeue")
+  _(DEQ, "dequeue")                    \
+  _(DEQ_NODE, "dequeue")
 
 typedef enum _session_evt_dbg
 {
@@ -30,7 +31,10 @@ typedef enum _session_evt_dbg
 #undef _
 } session_evt_dbg_e;
 
-#if TRANSPORT_DEBUG
+#define SESSION_DBG (0)
+#define SESSION_DEQ_NODE_EVTS (0)
+
+#if TRANSPORT_DEBUG && SESSION_DBG
 
 #define DEC_SESSION_ETD(_s, _e, _size)                                 \
   struct                                                               \
@@ -44,6 +48,12 @@ typedef enum _session_evt_dbg
   ed = ELOG_TRACK_DATA (&vlib_global_main.elog_main,                   \
                        _e, _tc->elog_track)
 
+#define DEC_SESSION_ED(_e, _size)                                      \
+  struct                                                               \
+  {                                                                    \
+    u32 data[_size];                                                   \
+  } * ed;                                                              \
+  ed = ELOG_DATA (&vlib_global_main.elog_main, _e)
 
 #define SESSION_EVT_DEQ_HANDLER(_s, _body)                             \
 {                                                                      \
@@ -67,13 +77,33 @@ typedef enum _session_evt_dbg
   do { _body; } while (0);                                             \
 }
 
+#if SESSION_DEQ_NODE_EVTS
+#define SESSION_EVT_DEQ_NODE_HANDLER(_node_evt)                                \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "deq-node: %s",                                          \
+    .format_args = "t4",                                               \
+    .n_enum_strings = 2,                                               \
+    .enum_strings = {                                                  \
+      "start",                                                         \
+      "end",                                                           \
+    },                                                                 \
+  };                                                                   \
+  DEC_SESSION_ED(_e, 1);                                               \
+  ed->data[0] = _node_evt;                                             \
+}
+#else
+#define SESSION_EVT_DEQ_NODE_HANDLER(_node_evt)
+#endif
+
 #define CONCAT_HELPER(_a, _b) _a##_b
 #define CC(_a, _b) CONCAT_HELPER(_a, _b)
 
-#define SESSION_EVT_DBG(_s, _evt, _body) CC(_evt, _HANDLER)(_s, _body)
+#define SESSION_EVT_DBG(_evt, _args...) CC(_evt, _HANDLER)(_args)
 
 #else
-#define SESSION_EVT_DBG(_s, _evt, _body)
+#define SESSION_EVT_DBG(_evt, _args...)
 #endif
 
 #endif /* SRC_VNET_SESSION_SESSION_DEBUG_H_ */
index 421121d..2f912cb 100644 (file)
@@ -38,7 +38,7 @@ typedef struct _transport_connection
   u32 thread_index;            /**< Worker-thread index */
 
 #if TRANSPORT_DEBUG
-  elog_track_t elog_track;     /**< Debug purposes */
+  elog_track_t elog_track;     /**< Event logging */
 #endif
 
   /** Macros for 'derived classes' where base is named "connection" */
diff --git a/src/vnet/tcp/builtin_client.c b/src/vnet/tcp/builtin_client.c
new file mode 100644 (file)
index 0000000..a6eeb77
--- /dev/null
@@ -0,0 +1,411 @@
+/*
+ * builtin_client.c - vpp built-in tcp client/connect code
+ *
+ * Copyright (c) 2017 by Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vnet/vnet.h>
+#include <vnet/plugin/plugin.h>
+#include <vnet/tcp/builtin_client.h>
+
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vlibsocket/api.h>
+#include <vpp/app/version.h>
+
+/* define message IDs */
+#include <vpp/api/vpe_msg_enum.h>
+
+/* define message structures */
+#define vl_typedefs
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_typedefs
+
+/* define generated endian-swappers */
+#define vl_endianfun
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_endianfun
+
+/* instantiate all the print functions we know about */
+#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__)
+#define vl_printfun
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_printfun
+
+static void
+send_test_chunk (tclient_main_t * tm, session_t * s)
+{
+  u8 *test_data = tm->connect_test_data;
+  int test_buf_offset = 0;
+  u32 bytes_this_chunk;
+  session_fifo_event_t evt;
+  static int serial_number = 0;
+  int rv;
+
+  while (s->bytes_to_send > 0)
+    {
+      bytes_this_chunk = vec_len (test_data) < s->bytes_to_send
+       ? vec_len (test_data) : s->bytes_to_send;
+
+      rv = svm_fifo_enqueue_nowait (s->server_tx_fifo, 0 /*pid */ ,
+                                   bytes_this_chunk,
+                                   test_data + test_buf_offset);
+
+      if (rv > 0)
+       {
+         s->bytes_to_send -= rv;
+         test_buf_offset += rv;
+
+         if (svm_fifo_set_event (s->server_tx_fifo))
+           {
+             /* Fabricate TX event, send to vpp */
+             evt.fifo = s->server_tx_fifo;
+             evt.event_type = FIFO_EVENT_SERVER_TX;
+             evt.event_id = serial_number++;
+
+             unix_shared_memory_queue_add (tm->vpp_event_queue, (u8 *) & evt,
+                                           0 /* do wait for mutex */ );
+           }
+       }
+    }
+}
+
+static void
+receive_test_chunk (tclient_main_t * tm, session_t * s)
+{
+  svm_fifo_t *rx_fifo = s->server_rx_fifo;
+  int n_read, bytes, i;
+
+  bytes = svm_fifo_max_dequeue (rx_fifo);
+  /* Allow enqueuing of new event */
+  svm_fifo_unset_event (rx_fifo);
+
+  /* Read the bytes */
+  do
+    {
+      n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (tm->rx_buf),
+                                       tm->rx_buf);
+      if (n_read > 0)
+       {
+         bytes -= n_read;
+         for (i = 0; i < n_read; i++)
+           {
+             if (tm->rx_buf[i] != ((s->bytes_received + i) & 0xff))
+               {
+                 clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
+                               n_read, s->bytes_received + i,
+                               tm->rx_buf[i],
+                               ((s->bytes_received + i) & 0xff));
+               }
+           }
+         s->bytes_to_receive -= n_read;
+         s->bytes_received += n_read;
+       }
+
+    }
+  while (n_read < 0 || bytes > 0);
+}
+
+static void *
+tclient_thread_fn (void *arg)
+{
+  tclient_main_t *tm = &tclient_main;
+  vl_api_disconnect_session_t *dmp;
+  session_t *sp;
+  struct timespec ts, tsrem;
+  int i;
+  int try_tx, try_rx;
+  u32 *session_indices = 0;
+
+  /* stats thread wants no signals. */
+  {
+    sigset_t s;
+    sigfillset (&s);
+    pthread_sigmask (SIG_SETMASK, &s, 0);
+  }
+
+  while (1)
+    {
+      /* Wait until we're told to get busy */
+      while (tm->run_test == 0
+            || (tm->ready_connections != tm->expected_connections))
+       {
+         ts.tv_sec = 0;
+         ts.tv_nsec = 100000000;
+         while (nanosleep (&ts, &tsrem) < 0)
+           ts = tsrem;
+       }
+      tm->run_test = 0;
+
+      clib_warning ("Run %d iterations", tm->n_iterations);
+
+      for (i = 0; i < tm->n_iterations; i++)
+       {
+         session_t *sp;
+
+         do
+           {
+             try_tx = try_rx = 0;
+
+             /* *INDENT-OFF* */
+             pool_foreach (sp, tm->sessions, ({
+                if (sp->bytes_to_send > 0)
+                  {
+                    send_test_chunk (tm, sp);
+                    try_tx = 1;
+                  }
+             }));
+             pool_foreach (sp, tm->sessions, ({
+               if (sp->bytes_to_receive > 0)
+                  {
+                    receive_test_chunk (tm, sp);
+                    try_rx = 1;
+                  }
+              }));
+             /* *INDENT-ON* */
+
+           }
+         while (try_tx || try_rx);
+       }
+      clib_warning ("Done %d iterations", tm->n_iterations);
+
+      /* Disconnect sessions... */
+      vec_reset_length (session_indices);
+      pool_foreach (sp, tm->sessions, (
+                                       {
+                                       vec_add1 (session_indices,
+                                                 sp - tm->sessions);
+                                       }
+                   ));
+
+      for (i = 0; i < vec_len (session_indices); i++)
+       {
+         sp = pool_elt_at_index (tm->sessions, session_indices[i]);
+         dmp = vl_msg_api_alloc_as_if_client (sizeof (*dmp));
+         memset (dmp, 0, sizeof (*dmp));
+         dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
+         dmp->client_index = tm->my_client_index;
+         dmp->session_index = sp->vpp_session_index;
+         dmp->session_thread_index = sp->vpp_session_thread;
+         vl_msg_api_send_shmem (tm->vl_input_queue, (u8 *) & dmp);
+         pool_put (tm->sessions, sp);
+       }
+    }
+  /* NOTREACHED */
+  return 0;
+}
+
+/* So we don't get "no handler for... " msgs */
+static void
+vl_api_memclnt_create_reply_t_handler (vl_api_memclnt_create_reply_t * mp)
+{
+  tclient_main_t *tm = &tclient_main;
+
+  tm->my_client_index = mp->index;
+}
+
+static void
+vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
+{
+  tclient_main_t *tm = &tclient_main;
+  session_t *session;
+  u32 session_index;
+  u64 key;
+  i32 retval = /* clib_net_to_host_u32 ( */ mp->retval /*) */ ;
+
+  if (retval < 0)
+    {
+      clib_warning ("connection failed: retval %d", retval);
+      return;
+    }
+
+  tm->our_event_queue = (unix_shared_memory_queue_t *)
+    mp->vpp_event_queue_address;
+
+  tm->vpp_event_queue = (unix_shared_memory_queue_t *)
+    mp->vpp_event_queue_address;
+
+  /*
+   * Setup session
+   */
+  pool_get (tm->sessions, session);
+  memset (session, 0, sizeof (*session));
+  session_index = session - tm->sessions;
+  session->bytes_to_receive = session->bytes_to_send = tm->bytes_to_send;
+
+  session->server_rx_fifo = (svm_fifo_t *) mp->server_rx_fifo;
+  session->server_rx_fifo->client_session_index = session_index;
+  session->server_tx_fifo = (svm_fifo_t *) mp->server_tx_fifo;
+  session->server_tx_fifo->client_session_index = session_index;
+
+  session->vpp_session_index = mp->session_index;
+  session->vpp_session_thread = mp->session_thread_index;
+
+  /* Add it to the session lookup table */
+  key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
+  hash_set (tm->session_index_by_vpp_handles, key, session_index);
+
+  tm->ready_connections++;
+}
+
+static void
+create_api_loopback (tclient_main_t * tm)
+{
+  vl_api_memclnt_create_t _m, *mp = &_m;
+  extern void vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t *);
+  api_main_t *am = &api_main;
+  vl_shmem_hdr_t *shmem_hdr;
+
+  /*
+   * Create a "loopback" API client connection
+   * Don't do things like this unless you know what you're doing...
+   */
+
+  shmem_hdr = am->shmem_hdr;
+  tm->vl_input_queue = shmem_hdr->vl_input_queue;
+  memset (mp, 0, sizeof (*mp));
+  mp->_vl_msg_id = VL_API_MEMCLNT_CREATE;
+  mp->context = 0xFEEDFACE;
+  mp->input_queue = (u64) tm->vl_input_queue;
+  strncpy ((char *) mp->name, "tcp_tester", sizeof (mp->name) - 1);
+
+  vl_api_memclnt_create_t_handler (mp);
+}
+
+#define foreach_tclient_static_api_msg         \
+_(MEMCLNT_CREATE_REPLY, memclnt_create_reply)   \
+_(CONNECT_URI_REPLY, connect_uri_reply)
+
+static clib_error_t *
+tclient_api_hookup (vlib_main_t * vm)
+{
+  tclient_main_t *tm = &tclient_main;
+  vl_msg_api_msg_config_t _c, *c = &_c;
+  int i;
+
+  /* Init test data */
+  vec_validate (tm->connect_test_data, 64 * 1024 - 1);
+  for (i = 0; i < vec_len (tm->connect_test_data); i++)
+    tm->connect_test_data[i] = i & 0xff;
+
+  tm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
+  vec_validate (tm->rx_buf, vec_len (tm->connect_test_data) - 1);
+
+  /* Hook up client-side static APIs to our handlers */
+#define _(N,n) do {                                             \
+    c->id = VL_API_##N;                                         \
+    c->name = #n;                                               \
+    c->handler = vl_api_##n##_t_handler;                        \
+    c->cleanup = vl_noop_handler;                               \
+    c->endian = vl_api_##n##_t_endian;                          \
+    c->print = vl_api_##n##_t_print;                            \
+    c->size = sizeof(vl_api_##n##_t);                           \
+    c->traced = 1; /* trace, so these msgs print */             \
+    c->replay = 0; /* don't replay client create/delete msgs */ \
+    c->message_bounce = 0; /* don't bounce this message */     \
+    vl_msg_api_config(c);} while (0);
+
+  foreach_tclient_static_api_msg;
+#undef _
+
+  return 0;
+}
+
+VLIB_API_INIT_FUNCTION (tclient_api_hookup);
+
+static clib_error_t *
+test_tcp_clients_command_fn (vlib_main_t * vm,
+                            unformat_input_t * input,
+                            vlib_cli_command_t * cmd)
+{
+  u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234";
+  u8 *uri;
+  tclient_main_t *tm = &tclient_main;
+  int i;
+  u32 n_clients = 1;
+
+  tm->bytes_to_send = 8192;
+  tm->n_iterations = 1;
+  vec_free (tm->connect_uri);
+
+  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (input, "nclients %d", &n_clients))
+       ;
+      else if (unformat (input, "iterations %d", &tm->n_iterations))
+       ;
+      else if (unformat (input, "bytes %d", &tm->bytes_to_send))
+       ;
+      else if (unformat (input, "uri %s", &tm->connect_uri))
+       ;
+      else
+       return clib_error_return (0, "unknown input `%U'",
+                                 format_unformat_error, input);
+    }
+
+  tm->ready_connections = 0;
+  tm->expected_connections = n_clients;
+  uri = connect_uri;
+  if (tm->connect_uri)
+    uri = tm->connect_uri;
+
+  create_api_loopback (tm);
+
+  /* Start a transmit thread */
+  if (tm->client_thread_handle == 0)
+    {
+      int rv = pthread_create (&tm->client_thread_handle,
+                              NULL /*attr */ , tclient_thread_fn, 0);
+      if (rv)
+       {
+         tm->client_thread_handle = 0;
+         return clib_error_return (0, "pthread_create returned %d", rv);
+       }
+    }
+
+  /* Fire off connect requests, in something approaching a normal manner */
+  for (i = 0; i < n_clients; i++)
+    {
+      vl_api_connect_uri_t *cmp;
+      cmp = vl_msg_api_alloc_as_if_client (sizeof (*cmp));
+      memset (cmp, 0, sizeof (*cmp));
+
+      cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
+      cmp->client_index = tm->my_client_index;
+      cmp->context = ntohl (0xfeedface);
+      memcpy (cmp->uri, uri, strlen ((char *) uri) + 1);
+      vl_msg_api_send_shmem (tm->vl_input_queue, (u8 *) & cmp);
+    }
+
+  tm->run_test = 1;
+
+  return 0;
+}
+
+/* *INDENT-OFF* */
+VLIB_CLI_COMMAND (test_clients_command, static) =
+{
+  .path = "test tcp clients",
+  .short_help = "test tcp clients",
+  .function = test_tcp_clients_command_fn,
+};
+/* *INDENT-ON* */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/vnet/tcp/builtin_client.h b/src/vnet/tcp/builtin_client.h
new file mode 100644 (file)
index 0000000..6403030
--- /dev/null
@@ -0,0 +1,131 @@
+
+/*
+ * tclient.h - skeleton vpp engine plug-in header file
+ *
+ * Copyright (c) <current-year> <your-organization>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __included_tclient_h__
+#define __included_tclient_h__
+
+#include <vnet/vnet.h>
+#include <vnet/ip/ip.h>
+#include <vnet/ethernet/ethernet.h>
+
+#include <vppinfra/hash.h>
+#include <vppinfra/error.h>
+#include <vlibmemory/unix_shared_memory_queue.h>
+#include <svm/svm_fifo_segment.h>
+#include <vnet/session/session.h>
+#include <vnet/session/application_interface.h>
+
+typedef struct
+{
+  u32 bytes_to_send;
+  u32 bytes_sent;
+  u32 bytes_to_receive;
+  u32 bytes_received;
+
+  svm_fifo_t *server_rx_fifo;
+  svm_fifo_t *server_tx_fifo;
+
+  u32 vpp_session_index;
+  u32 vpp_session_thread;
+} session_t;
+
+typedef struct
+{
+  /* API message ID base */
+  u16 msg_id_base;
+
+  /* vpe input queue */
+  unix_shared_memory_queue_t *vl_input_queue;
+
+  /* API client handle */
+  u32 my_client_index;
+
+  /* The URI we're playing with */
+  u8 *uri;
+
+  /* Session pool */
+  session_t *sessions;
+
+  /* Hash table for disconnect processing */
+  uword *session_index_by_vpp_handles;
+
+  /* intermediate rx buffer */
+  u8 *rx_buf;
+
+  /* URI for slave's connect */
+  u8 *connect_uri;
+
+  u32 connected_session_index;
+
+  int i_am_master;
+
+  /* drop all packets */
+  int drop_packets;
+
+  /* Our event queue */
+  unix_shared_memory_queue_t *our_event_queue;
+
+  /* $$$ single thread only for the moment */
+  unix_shared_memory_queue_t *vpp_event_queue;
+
+  pid_t my_pid;
+
+  /* For deadman timers */
+  clib_time_t clib_time;
+
+  /* Connection counts */
+  u32 expected_connections;
+  volatile u32 ready_connections;
+
+  /* Signal variables */
+  volatile int run_test;
+
+  /* Number of iterations */
+  int n_iterations;
+
+  /* Bytes to send */
+  u32 bytes_to_send;
+
+  u32 configured_segment_size;
+
+  /* VNET_API_ERROR_FOO -> "Foo" hash table */
+  uword *error_string_by_error_number;
+
+  u8 *connect_test_data;
+  pthread_t client_thread_handle;
+  u32 client_bytes_received;
+  u8 test_return_packets;
+
+  /* convenience */
+  vlib_main_t *vlib_main;
+  vnet_main_t *vnet_main;
+  ethernet_main_t *ethernet_main;
+} tclient_main_t;
+
+tclient_main_t tclient_main;
+
+vlib_node_registration_t tclient_node;
+
+#endif /* __included_tclient_h__ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
index dd6759c..efd26e9 100644 (file)
@@ -22,6 +22,7 @@ typedef struct
 {
   u8 *rx_buf;
   unix_shared_memory_queue_t **vpp_queue;
+  u32 byte_index;
   vlib_main_t *vlib_main;
 } builtin_server_main_t;
 
@@ -37,6 +38,7 @@ builtin_session_accept_callback (stream_session_t * s)
   bsm->vpp_queue[s->thread_index] =
     session_manager_get_vpp_event_queue (s->thread_index);
   s->session_state = SESSION_STATE_READY;
+  bsm->byte_index = 0;
   return 0;
 }
 
@@ -80,57 +82,94 @@ builtin_redirect_connect_callback (u32 client_index, void *mp)
   return -1;
 }
 
+void
+test_bytes (builtin_server_main_t * bsm, int actual_transfer)
+{
+  int i;
+
+  for (i = 0; i < actual_transfer; i++)
+    {
+      if (bsm->rx_buf[i] != ((bsm->byte_index + i) & 0xff))
+       {
+         clib_warning ("at %d expected %d got %d", bsm->byte_index + i,
+                       (bsm->byte_index + i) & 0xff, bsm->rx_buf[i]);
+       }
+    }
+  bsm->byte_index += actual_transfer;
+}
+
 int
-builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * e)
+builtin_server_rx_callback (stream_session_t * s)
 {
-  int n_written, bytes, total_copy_bytes;
-  int n_read;
-  svm_fifo_t *tx_fifo;
+  u32 n_written, max_dequeue, max_enqueue, max_transfer;
+  int actual_transfer;
+  svm_fifo_t *tx_fifo, *rx_fifo;
   builtin_server_main_t *bsm = &builtin_server_main;
   session_fifo_event_t evt;
   static int serial_number = 0;
 
-  bytes = e->enqueue_length;
-  if (PREDICT_FALSE (bytes <= 0))
+  max_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
+  max_enqueue = svm_fifo_max_enqueue (s->server_tx_fifo);
+
+  if (PREDICT_FALSE (max_dequeue == 0))
     {
-      clib_warning ("bizarre rx callback: bytes %d", bytes);
       return 0;
     }
 
   tx_fifo = s->server_tx_fifo;
+  rx_fifo = s->server_rx_fifo;
 
   /* Number of bytes we're going to copy */
-  total_copy_bytes = (bytes < (tx_fifo->nitems - tx_fifo->cursize)) ? bytes :
-    tx_fifo->nitems - tx_fifo->cursize;
+  max_transfer = (max_dequeue < max_enqueue) ? max_dequeue : max_enqueue;
 
-  if (PREDICT_FALSE (total_copy_bytes <= 0))
+  /* No space in tx fifo */
+  if (PREDICT_FALSE (max_transfer == 0))
     {
-      clib_warning ("no space in tx fifo, event had %d bytes", bytes);
+      /* XXX timeout for session that are stuck */
+
+      /* Program self-tap to retry */
+      if (svm_fifo_set_event (rx_fifo))
+       {
+         evt.fifo = rx_fifo;
+         evt.event_type = FIFO_EVENT_BUILTIN_RX;
+         evt.event_id = 0;
+         unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
+                                       (u8 *) & evt,
+                                       0 /* do wait for mutex */ );
+       }
+
       return 0;
     }
 
-  vec_validate (bsm->rx_buf, total_copy_bytes - 1);
-  _vec_len (bsm->rx_buf) = total_copy_bytes;
+  svm_fifo_unset_event (rx_fifo);
+
+  vec_validate (bsm->rx_buf, max_transfer - 1);
+  _vec_len (bsm->rx_buf) = max_transfer;
 
-  n_read = svm_fifo_dequeue_nowait (s->server_rx_fifo, 0, total_copy_bytes,
-                                   bsm->rx_buf);
-  ASSERT (n_read == total_copy_bytes);
+  actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, 0, max_transfer,
+                                            bsm->rx_buf);
+  ASSERT (actual_transfer == max_transfer);
+
+//  test_bytes (bsm, actual_transfer);
 
   /*
    * Echo back
    */
 
-  n_written = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, bsm->rx_buf);
-  ASSERT (n_written == total_copy_bytes);
+  n_written =
+    svm_fifo_enqueue_nowait (tx_fifo, 0, actual_transfer, bsm->rx_buf);
+  ASSERT (n_written == max_transfer);
 
-  /* Fabricate TX event, send to vpp */
-  evt.fifo = tx_fifo;
-  evt.event_type = FIFO_EVENT_SERVER_TX;
-  evt.enqueue_length = total_copy_bytes;
-  evt.event_id = serial_number++;
+  if (svm_fifo_set_event (tx_fifo))
+    {
+      /* Fabricate TX event, send to vpp */
+      evt.fifo = tx_fifo;
+      evt.event_type = FIFO_EVENT_SERVER_TX;
+      evt.event_id = serial_number++;
 
-  unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index], (u8 *) & evt,
-                               0 /* do wait for mutex */ );
+      unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
+                                   (u8 *) & evt, 0 /* do wait for mutex */ );
+    }
 
   return 0;
 }
@@ -164,7 +203,7 @@ server_create (vlib_main_t * vm)
   a->api_client_index = ~0;
   a->session_cb_vft = &builtin_session_cb_vft;
   a->options = options;
-  a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 10;
+  a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 128 << 20;
   a->options[SESSION_OPTIONS_RX_FIFO_SIZE] = 64 << 10;
   a->options[SESSION_OPTIONS_TX_FIFO_SIZE] = 64 << 10;
   a->segment_name = segment_name;
index 0d2e6d0..c3df5bc 100644 (file)
@@ -328,7 +328,7 @@ tcp_connection_init_vars (tcp_connection_t * tc)
 {
   tcp_connection_timers_init (tc);
   tcp_set_snd_mss (tc);
-  tc->sack_sb.head = TCP_INVALID_SACK_HOLE_INDEX;
+  scoreboard_init (&tc->sack_sb);
   tcp_cc_init (tc);
 }
 
@@ -558,17 +558,48 @@ tcp_session_send_mss (transport_connection_t * trans_conn)
   return tc->snd_mss;
 }
 
+/**
+ * Compute tx window session is allowed to fill.
+ */
 u32
 tcp_session_send_space (transport_connection_t * trans_conn)
 {
+  u32 snd_space;
   tcp_connection_t *tc = (tcp_connection_t *) trans_conn;
-  return tcp_available_snd_space (tc);
+
+  /* If we haven't gotten dupacks or if we did and have gotten sacked bytes
+   * then we can still send */
+  if (PREDICT_TRUE (tcp_in_fastrecovery (tc) == 0
+                   && (tc->rcv_dupacks == 0
+                       || tc->sack_sb.last_sacked_bytes)))
+    {
+      snd_space = tcp_available_snd_space (tc);
+
+      /* If we can't write at least a segment, don't try at all */
+      if (snd_space < tc->snd_mss)
+       return 0;
+      return snd_space;
+    }
+
+  /* If in fast recovery, send 1 SMSS if wnd allows */
+  if (tcp_in_fastrecovery (tc) && tcp_available_snd_space (tc)
+      && tcp_fastrecovery_sent_1_smss (tc))
+    {
+      tcp_fastrecovery_1_smss_on (tc);
+      return tc->snd_mss;
+    }
+
+  return 0;
 }
 
 u32
 tcp_session_tx_fifo_offset (transport_connection_t * trans_conn)
 {
   tcp_connection_t *tc = (tcp_connection_t *) trans_conn;
+
+  ASSERT (seq_geq (tc->snd_nxt, tc->snd_una));
+
+  /* This still works if fast retransmit is on */
   return (tc->snd_nxt - tc->snd_una);
 }
 
@@ -762,7 +793,7 @@ tcp_main_enable (vlib_main_t * vm)
   vec_validate (tm->timer_wheels, num_threads - 1);
   tcp_initialize_timer_wheels (tm);
 
-  vec_validate (tm->delack_connections, num_threads - 1);
+//  vec_validate (tm->delack_connections, num_threads - 1);
 
   /* Initialize clocks per tick for TCP timestamp. Used to compute
    * monotonically increasing timestamps. */
index 082ab1d..b4286bc 100644 (file)
 #define TCP_PAWS_IDLE 24 * 24 * 60 * 60 * THZ /**< 24 days */
 #define TCP_MAX_OPTION_SPACE 40
 
-#define TCP_DUPACK_THRESHOLD 3
-#define TCP_MAX_RX_FIFO_SIZE 2 << 20
-#define TCP_IW_N_SEGMENTS 10
+#define TCP_DUPACK_THRESHOLD   3
+#define TCP_MAX_RX_FIFO_SIZE   2 << 20
+#define TCP_IW_N_SEGMENTS      10
+#define TCP_ALWAYS_ACK         0       /**< If on, we always ack */
 
 /** TCP FSM state definitions as per RFC793. */
 #define foreach_tcp_fsm_state   \
@@ -102,13 +103,12 @@ void tcp_update_time (f64 now, u32 thread_index);
 
 /** TCP connection flags */
 #define foreach_tcp_connection_flag             \
-  _(DELACK, "Delay ACK")                        \
   _(SNDACK, "Send ACK")                         \
-  _(BURSTACK, "Burst ACK set")                  \
   _(FINSNT, "FIN sent")                                \
   _(SENT_RCV_WND0, "Sent 0 receive window")     \
   _(RECOVERY, "Recovery on")                    \
-  _(FAST_RECOVERY, "Fast Recovery on")
+  _(FAST_RECOVERY, "Fast Recovery on")         \
+  _(FR_1_SMSS, "Sent 1 SMSS")
 
 typedef enum _tcp_connection_flag_bits
 {
@@ -160,8 +160,12 @@ typedef struct _sack_scoreboard_hole
 typedef struct _sack_scoreboard
 {
   sack_scoreboard_hole_t *holes;       /**< Pool of holes */
-  u32 head;                            /**< Index to first entry */
+  u32 head;                            /**< Index of first entry */
+  u32 tail;                            /**< Index of last entry */
   u32 sacked_bytes;                    /**< Number of bytes sacked in sb */
+  u32 last_sacked_bytes;               /**< Number of bytes last sacked */
+  u32 snd_una_adv;                     /**< Bytes to add to snd_una */
+  u32 max_byte_sacked;                 /**< Highest byte acked */
 } sack_scoreboard_t;
 
 typedef enum _tcp_cc_algorithm_type
@@ -214,7 +218,7 @@ typedef struct _tcp_connection
   sack_block_t *snd_sacks;     /**< Vector of SACKs to send. XXX Fixed size? */
   sack_scoreboard_t sack_sb;   /**< SACK "scoreboard" that tracks holes */
 
-  u8 rcv_dupacks;      /**< Number of DUPACKs received */
+  u16 rcv_dupacks;     /**< Number of DUPACKs received */
   u8 snt_dupacks;      /**< Number of DUPACKs sent in a burst */
 
   /* Congestion control */
@@ -224,6 +228,7 @@ typedef struct _tcp_connection
   u32 bytes_acked;     /**< Bytes acknowledged by current segment */
   u32 rtx_bytes;       /**< Retransmitted bytes */
   u32 tsecr_last_ack;  /**< Timestamp echoed to us in last healthy ACK */
+  u32 snd_congestion;  /**< snd_una_max when congestion is detected */
   tcp_cc_algorithm_t *cc_algo; /**< Congestion control algorithm */
 
   /* RTT and RTO */
@@ -250,8 +255,10 @@ struct _tcp_cc_algorithm
 #define tcp_fastrecovery_off(tc) (tc)->flags &= ~TCP_CONN_FAST_RECOVERY
 #define tcp_in_fastrecovery(tc) ((tc)->flags & TCP_CONN_FAST_RECOVERY)
 #define tcp_in_recovery(tc) ((tc)->flags & (TCP_CONN_FAST_RECOVERY | TCP_CONN_RECOVERY))
-#define tcp_recovery_off(tc) ((tc)->flags &= ~(TCP_CONN_FAST_RECOVERY | TCP_CONN_RECOVERY))
 #define tcp_in_slowstart(tc) (tc->cwnd < tc->ssthresh)
+#define tcp_fastrecovery_sent_1_smss(tc) ((tc)->flags & TCP_CONN_FR_1_SMSS)
+#define tcp_fastrecovery_1_smss_on(tc) ((tc)->flags |= TCP_CONN_FR_1_SMSS)
+#define tcp_fastrecovery_1_smss_off(tc) ((tc)->flags &= ~TCP_CONN_FR_1_SMSS)
 
 typedef enum
 {
@@ -293,8 +300,8 @@ typedef struct _tcp_main
   /* Per worker-thread timer wheel for connections timers */
   tw_timer_wheel_16t_2w_512sl_t *timer_wheels;
 
-  /* Convenience per worker-thread vector of connections to DELACK */
-  u32 **delack_connections;
+//  /* Convenience per worker-thread vector of connections to DELACK */
+//  u32 **delack_connections;
 
   /* Pool of half-open connections on which we've sent a SYN */
   tcp_connection_t *half_open_connections;
@@ -397,8 +404,16 @@ tcp_end_seq (tcp_header_t * th, u32 len)
 always_inline u32
 tcp_flight_size (const tcp_connection_t * tc)
 {
-  return tc->snd_una_max - tc->snd_una - tc->sack_sb.sacked_bytes
-    + tc->rtx_bytes;
+  int flight_size;
+
+  flight_size = (int) ((tc->snd_una_max - tc->snd_una) + tc->rtx_bytes)
+    - (tc->rcv_dupacks * tc->snd_mss) /* - tc->sack_sb.sacked_bytes */ ;
+
+  /* Happens if we don't clear sacked bytes */
+  if (flight_size < 0)
+    return 0;
+
+  return flight_size;
 }
 
 /**
@@ -439,9 +454,13 @@ tcp_available_snd_space (const tcp_connection_t * tc)
   return available_wnd - flight_size;
 }
 
+void tcp_update_rcv_wnd (tcp_connection_t * tc);
+
 void tcp_retransmit_first_unacked (tcp_connection_t * tc);
 
 void tcp_fast_retransmit (tcp_connection_t * tc);
+void tcp_cc_congestion (tcp_connection_t * tc);
+void tcp_cc_recover (tcp_connection_t * tc);
 
 always_inline u32
 tcp_time_now (void)
@@ -453,7 +472,7 @@ u32 tcp_push_header (transport_connection_t * tconn, vlib_buffer_t * b);
 
 u32
 tcp_prepare_retransmit_segment (tcp_connection_t * tc, vlib_buffer_t * b,
-                               u32 max_bytes);
+                               u32 offset, u32 max_bytes);
 
 void tcp_connection_timers_init (tcp_connection_t * tc);
 void tcp_connection_timers_reset (tcp_connection_t * tc);
@@ -476,14 +495,6 @@ tcp_timer_set (tcp_connection_t * tc, u8 timer_id, u32 interval)
                                   tc->c_c_index, timer_id, interval);
 }
 
-always_inline void
-tcp_retransmit_timer_set (tcp_connection_t * tc)
-{
-  /* XXX Switch to faster TW */
-  tcp_timer_set (tc, TCP_TIMER_RETRANSMIT,
-                clib_max (tc->rto * TCP_TO_TIMER_TICK, 1));
-}
-
 always_inline void
 tcp_timer_reset (tcp_connection_t * tc, u8 timer_id)
 {
@@ -506,6 +517,27 @@ tcp_timer_update (tcp_connection_t * tc, u8 timer_id, u32 interval)
                                 tc->c_c_index, timer_id, interval);
 }
 
+/* XXX Switch retransmit to faster TW */
+always_inline void
+tcp_retransmit_timer_set (tcp_connection_t * tc)
+{
+  tcp_timer_set (tc, TCP_TIMER_RETRANSMIT,
+                clib_max (tc->rto * TCP_TO_TIMER_TICK, 1));
+}
+
+always_inline void
+tcp_retransmit_timer_update (tcp_connection_t * tc)
+{
+  tcp_timer_update (tc, TCP_TIMER_RETRANSMIT,
+                   clib_max (tc->rto * TCP_TO_TIMER_TICK, 1));
+}
+
+always_inline void
+tcp_retransmit_timer_reset (tcp_connection_t * tc)
+{
+  tcp_timer_reset (tc, TCP_TIMER_RETRANSMIT);
+}
+
 always_inline u8
 tcp_timer_is_active (tcp_connection_t * tc, tcp_timers_e timer)
 {
@@ -516,6 +548,14 @@ void
 scoreboard_remove_hole (sack_scoreboard_t * sb,
                        sack_scoreboard_hole_t * hole);
 
+always_inline sack_scoreboard_hole_t *
+scoreboard_get_hole (sack_scoreboard_t * sb, u32 index)
+{
+  if (index != TCP_INVALID_SACK_HOLE_INDEX)
+    return pool_elt_at_index (sb->holes, index);
+  return 0;
+}
+
 always_inline sack_scoreboard_hole_t *
 scoreboard_next_hole (sack_scoreboard_t * sb, sack_scoreboard_hole_t * hole)
 {
@@ -532,6 +572,14 @@ scoreboard_first_hole (sack_scoreboard_t * sb)
   return 0;
 }
 
+always_inline sack_scoreboard_hole_t *
+scoreboard_last_hole (sack_scoreboard_t * sb)
+{
+  if (sb->tail != TCP_INVALID_SACK_HOLE_INDEX)
+    return pool_elt_at_index (sb->holes, sb->tail);
+  return 0;
+}
+
 always_inline void
 scoreboard_clear (sack_scoreboard_t * sb)
 {
@@ -540,6 +588,10 @@ scoreboard_clear (sack_scoreboard_t * sb)
     {
       scoreboard_remove_hole (sb, hole);
     }
+  sb->sacked_bytes = 0;
+  sb->last_sacked_bytes = 0;
+  sb->snd_una_adv = 0;
+  sb->max_byte_sacked = 0;
 }
 
 always_inline u32
@@ -548,6 +600,21 @@ scoreboard_hole_bytes (sack_scoreboard_hole_t * hole)
   return hole->end - hole->start;
 }
 
+always_inline u32
+scoreboard_hole_index (sack_scoreboard_t * sb, sack_scoreboard_hole_t * hole)
+{
+  return hole - sb->holes;
+}
+
+always_inline void
+scoreboard_init (sack_scoreboard_t * sb)
+{
+  sb->head = TCP_INVALID_SACK_HOLE_INDEX;
+  sb->tail = TCP_INVALID_SACK_HOLE_INDEX;
+}
+
+void tcp_rcv_sacks (tcp_connection_t * tc, u32 ack);
+
 always_inline void
 tcp_cc_algo_register (tcp_cc_algorithm_type_e type,
                      const tcp_cc_algorithm_t * vft)
index 069c512..5a71694 100644 (file)
@@ -19,6 +19,8 @@
 #include <vlib/vlib.h>
 
 #define TCP_DEBUG (1)
+#define TCP_DEBUG_CC (1)
+#define TCP_DEBUG_VERBOSE (0)
 
 #define foreach_tcp_dbg_evt            \
   _(INIT, "")                          \
   _(DELETE, "delete")                  \
   _(SYN_SENT, "SYN sent")              \
   _(FIN_SENT, "FIN sent")              \
+  _(ACK_SENT, "ACK sent")              \
+  _(DUPACK_SENT, "DUPACK sent")                \
   _(RST_SENT, "RST sent")              \
   _(SYN_RCVD, "SYN rcvd")              \
   _(ACK_RCVD, "ACK rcvd")              \
+  _(DUPACK_RCVD, "DUPACK rcvd")                \
   _(FIN_RCVD, "FIN rcvd")              \
   _(RST_RCVD, "RST rcvd")              \
   _(PKTIZE, "packetize")               \
   _(INPUT, "in")                       \
-  _(TIMER_POP, "timer pop")
+  _(SND_WND, "snd_wnd update")         \
+  _(OUTPUT, "output")                  \
+  _(TIMER_POP, "timer pop")            \
+  _(CC_RTX, "retransmit")              \
+  _(CC_EVT, "cc event")                        \
+  _(CC_PACK, "cc partial ack")         \
+  _(SEG_INVALID, "invalid segment")    \
+  _(ACK_RCV_ERR, "invalid ack")                \
 
 typedef enum _tcp_dbg
 {
@@ -73,10 +85,10 @@ typedef enum _tcp_dbg_evt
   ed = ELOG_TRACK_DATA (&vlib_global_main.elog_main,                   \
                        _e, _tc->c_elog_track)
 
-#define TCP_EVT_INIT_HANDLER(_tc, ...)                                 \
+#define TCP_EVT_INIT_HANDLER(_tc, _fmt, ...)                           \
 {                                                                      \
   _tc->c_elog_track.name =                                             \
-       (char *) format (0, "%d%c", _tc->c_c_index, 0);                 \
+       (char *) format (0, _fmt, _tc->c_c_index, 0);                   \
   elog_track_register (&vlib_global_main.elog_main, &_tc->c_elog_track);\
 }
 
@@ -87,7 +99,7 @@ typedef enum _tcp_dbg_evt
 
 #define TCP_EVT_OPEN_HANDLER(_tc, ...)                                 \
 {                                                                      \
-  TCP_EVT_INIT_HANDLER(_tc);                                           \
+  TCP_EVT_INIT_HANDLER(_tc, "s%d%c");                                  \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
     .format = "open: index %d",                                                \
@@ -110,7 +122,7 @@ typedef enum _tcp_dbg_evt
 
 #define TCP_EVT_BIND_HANDLER(_tc, ...)                                 \
 {                                                                      \
-  TCP_EVT_INIT_HANDLER(_tc);                                           \
+  TCP_EVT_INIT_HANDLER(_tc, "l%d%c");                                  \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
     .format = "bind: listener %d",                                     \
@@ -138,16 +150,44 @@ typedef enum _tcp_dbg_evt
     .format = "delete: %d",                                            \
     .format_args = "i4",                                               \
   };                                                                   \
-  DECLARE_ETD(_tc, _e, 0);                                             \
+  DECLARE_ETD(_tc, _e, 1);                                             \
   ed->data[0] = _tc->c_c_index;                                                \
   TCP_EVT_DEALLOC_HANDLER(_tc);                                                \
 }
 
+#define TCP_EVT_ACK_SENT_HANDLER(_tc, ...)                             \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "ack_prep: acked %u rcv_nxt %u rcv_wnd %u snd_nxt %u",   \
+    .format_args = "i4i4i4i4",                                         \
+  };                                                                   \
+  DECLARE_ETD(_tc, _e, 4);                                             \
+  ed->data[0] = _tc->rcv_nxt - _tc->rcv_las;                           \
+  ed->data[1] = _tc->rcv_nxt - _tc->irs;                               \
+  ed->data[2] = _tc->rcv_wnd;                                          \
+  ed->data[3] = _tc->snd_nxt - _tc->iss;                               \
+}
+
+#define TCP_EVT_DUPACK_SENT_HANDLER(_tc, ...)                          \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "dack_tx: rcv_nxt %u rcv_wnd %u snd_nxt %u av-wnd %u",   \
+    .format_args = "i4i4i4i4",                                         \
+  };                                                                   \
+  DECLARE_ETD(_tc, _e, 4);                                             \
+  ed->data[0] = _tc->rcv_nxt - _tc->irs;                               \
+  ed->data[1] = _tc->rcv_wnd;                                          \
+  ed->data[2] = _tc->snd_nxt - _tc->iss;                               \
+  ed->data[3] = tcp_available_wnd(_tc);                                        \
+}
+
 #define TCP_EVT_SYN_SENT_HANDLER(_tc, ...)                             \
 {                                                                      \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "SYN: iss %d",                                           \
+    .format = "SYNtx: iss %u",                                 \
     .format_args = "i4",                                               \
   };                                                                   \
   DECLARE_ETD(_tc, _e, 1);                                             \
@@ -158,7 +198,7 @@ typedef enum _tcp_dbg_evt
 {                                                                      \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "FIN: snd_nxt %d rcv_nxt %d",                            \
+    .format = "FINtx: snd_nxt %d rcv_nxt %d",                          \
     .format_args = "i4i4",                                             \
   };                                                                   \
   DECLARE_ETD(_tc, _e, 2);                                             \
@@ -170,7 +210,7 @@ typedef enum _tcp_dbg_evt
 {                                                                      \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "RST: snd_nxt %d rcv_nxt %d",                            \
+    .format = "RSTtx: snd_nxt %d rcv_nxt %d",                          \
     .format_args = "i4i4",                                             \
   };                                                                   \
   DECLARE_ETD(_tc, _e, 2);                                             \
@@ -180,10 +220,10 @@ typedef enum _tcp_dbg_evt
 
 #define TCP_EVT_SYN_RCVD_HANDLER(_tc, ...)                             \
 {                                                                      \
-  TCP_EVT_INIT_HANDLER(_tc);                                           \
+  TCP_EVT_INIT_HANDLER(_tc, "s%d%c");                                  \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "SYN rcvd: irs %d",                                      \
+    .format = "SYNrx: irs %u",                                         \
     .format_args = "i4",                                               \
   };                                                                   \
   DECLARE_ETD(_tc, _e, 1);                                             \
@@ -194,7 +234,7 @@ typedef enum _tcp_dbg_evt
 {                                                                      \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "FIN rcvd: snd_nxt %d rcv_nxt %d",                       \
+    .format = "FINrx: snd_nxt %d rcv_nxt %d",                          \
     .format_args = "i4i4",                                             \
   };                                                                   \
   DECLARE_ETD(_tc, _e, 2);                                             \
@@ -206,7 +246,7 @@ typedef enum _tcp_dbg_evt
 {                                                                      \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "RST rcvd: snd_nxt %d rcv_nxt %d",                       \
+    .format = "RSTrx: snd_nxt %d rcv_nxt %d",                          \
     .format_args = "i4i4",                                             \
   };                                                                   \
   DECLARE_ETD(_tc, _e, 2);                                             \
@@ -214,54 +254,68 @@ typedef enum _tcp_dbg_evt
   ed->data[1] = _tc->rcv_nxt - _tc->irs;                               \
 }
 
-#define TCP_EVT_ACK_RCVD_HANDLER(_tc, ...)                             \
+#define TCP_EVT_ACK_RCVD_HANDLER(_tc, _ack, ...)                       \
 {                                                                      \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "ACK: acked %u cwnd %u inflight %u",                     \
-    .format_args = "i4i4i4",                                           \
+    .format = "acked: %u snd_una %u ack %u cwnd %u inflight %u",       \
+    .format_args = "i4i4i4i4i4",                                       \
   };                                                                   \
-  DECLARE_ETD(_tc, _e, 3);                                             \
+  DECLARE_ETD(_tc, _e, 5);                                             \
   ed->data[0] = _tc->bytes_acked;                                      \
-  ed->data[1] = _tc->cwnd;                                             \
-  ed->data[2] = tcp_flight_size(_tc);                                  \
+  ed->data[1] = _tc->snd_una - _tc->iss;                               \
+  ed->data[2] = _ack - _tc->iss;                                       \
+  ed->data[3] = _tc->cwnd;                                             \
+  ed->data[4] = tcp_flight_size(_tc);                                  \
 }
 
-#define TCP_EVT_PKTIZE_HANDLER(_tc, ...)                               \
+#define TCP_EVT_DUPACK_RCVD_HANDLER(_tc, ...)                          \
 {                                                                      \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "pktize: snd_una %u snd_nxt %u una_max %u",              \
-    .format_args = "i4i4i4",                                           \
+    .format = "dack_rx: snd_una %u cwnd %u snd_wnd %u inflight %u",    \
+    .format_args = "i4i4i4i4",                                         \
   };                                                                   \
-  DECLARE_ETD(_tc, _e, 3);                                             \
+  DECLARE_ETD(_tc, _e, 4);                                             \
   ed->data[0] = _tc->snd_una - _tc->iss;                               \
-  ed->data[1] = _tc->snd_nxt - _tc->iss;                               \
-  ed->data[2] = _tc->snd_una_max - _tc->iss;                           \
+  ed->data[1] = _tc->cwnd;                                             \
+  ed->data[2] = _tc->snd_wnd;                                          \
+  ed->data[3] = tcp_flight_size(_tc);                                  \
 }
 
-#define TCP_EVT_OUTPUT_HANDLER(_tc, flags, n_bytes,...)                        \
+#define TCP_EVT_PKTIZE_HANDLER(_tc, ...)                               \
 {                                                                      \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "out: flags %x, bytes %u",                               \
-    .format_args = "i4i4",                                             \
+    .format = "pktize: una %u snd_nxt %u space %u flight %u rcv_wnd %u",\
+    .format_args = "i4i4i4i4i4",                                       \
   };                                                                   \
-  DECLARE_ETD(_tc, _e, 2);                                             \
-  ed->data[0] = flags;                                                 \
-  ed->data[1] = n_bytes;                                               \
+  DECLARE_ETD(_tc, _e, 5);                                             \
+  ed->data[0] = _tc->snd_una - _tc->iss;                               \
+  ed->data[1] = _tc->snd_nxt - _tc->iss;                               \
+  ed->data[2] = tcp_available_snd_space (_tc);                         \
+  ed->data[3] = tcp_flight_size (_tc);                                 \
+  ed->data[4] = _tc->rcv_wnd;                                          \
 }
 
-#define TCP_EVT_INPUT_HANDLER(_tc, n_bytes, ...)                       \
+#define TCP_EVT_INPUT_HANDLER(_tc, _type, _len, _written, ...)         \
 {                                                                      \
   ELOG_TYPE_DECLARE (_e) =                                             \
   {                                                                    \
-    .format = "in: bytes %u rcv_nxt %u",                               \
-    .format_args = "i4i4",                                             \
+    .format = "in: %s len %u written %d rcv_nxt %u free wnd %d",       \
+    .format_args = "t4i4i4i4i4",                                       \
+    .n_enum_strings = 2,                                               \
+    .enum_strings = {                                                  \
+      "order",                                                         \
+      "ooo",                                                                   \
+    },                                                                 \
   };                                                                   \
-  DECLARE_ETD(_tc, _e, 2);                                             \
-  ed->data[0] = n_bytes;                                               \
-  ed->data[1] = _tc->rcv_nxt - _tc->irs;                               \
+  DECLARE_ETD(_tc, _e, 5);                                             \
+  ed->data[0] = _type;                                                 \
+  ed->data[1] = _len;                                                  \
+  ed->data[2] = _written;                                              \
+  ed->data[3] = (_tc->rcv_nxt - _tc->irs) + _written;                  \
+  ed->data[4] = _tc->rcv_wnd - (_tc->rcv_nxt - _tc->rcv_las);          \
 }
 
 #define TCP_EVT_TIMER_POP_HANDLER(_tc_index, _timer_id, ...)            \
@@ -296,9 +350,131 @@ typedef enum _tcp_dbg_evt
   ed->data[1] = _timer_id;                                             \
 }
 
+#define TCP_EVT_SEG_INVALID_HANDLER(_tc, _seq, _end, ...)              \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "seg-inv: seq %u end %u rcv_las %u rcv_nxt %u wnd %u",   \
+    .format_args = "i4i4i4i4i4",                                       \
+  };                                                                   \
+  DECLARE_ETD(_tc, _e, 5);                                             \
+  ed->data[0] = _seq - _tc->irs;                                       \
+  ed->data[1] = _end - _tc->irs;                                       \
+  ed->data[2] = _tc->rcv_las - _tc->irs;                               \
+  ed->data[3] = _tc->rcv_nxt - _tc->irs;                               \
+  ed->data[4] = _tc->rcv_wnd;                                          \
+}
+
+#define TCP_EVT_ACK_RCV_ERR_HANDLER(_tc, _type, _ack, ...)             \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "ack-err: %s ack %u snd_una %u snd_nxt %u una_max %u",   \
+    .format_args = "t4i4i4i4i4",                                       \
+    .n_enum_strings = 3,                                               \
+    .enum_strings = {                                                  \
+      "invalid",                                                       \
+      "old",                                                           \
+      "future",                                                                \
+    },                                                                         \
+  };                                                                   \
+  DECLARE_ETD(_tc, _e, 5);                                             \
+  ed->data[0] = _type;                                                 \
+  ed->data[1] = _ack - _tc->iss;                                       \
+  ed->data[2] = _tc->snd_una - _tc->iss;                               \
+  ed->data[3] = _tc->snd_nxt - _tc->iss;                               \
+  ed->data[4] = _tc->snd_una_max - _tc->iss;                           \
+}
+
+/*
+ * Congestion Control
+ */
+
+#if TCP_DEBUG_CC
+#define TCP_EVT_CC_RTX_HANDLER(_tc, offset, n_bytes, ...)              \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "rtx: snd_nxt %u offset %u snd %u rtx %u",               \
+    .format_args = "i4i4i4i4",                                         \
+  };                                                                   \
+  DECLARE_ETD(_tc, _e, 4);                                             \
+  ed->data[0] = _tc->snd_nxt - _tc->iss;                               \
+  ed->data[1] = offset;                                                        \
+  ed->data[2] = n_bytes;                                               \
+  ed->data[3] = _tc->rtx_bytes;                                                \
+}
+
+#define TCP_EVT_CC_EVT_HANDLER(_tc, _sub_evt, ...)                     \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "cc: %s wnd %u snd_cong %u rtx_bytes %u",                        \
+    .format_args = "t4i4i4i4",                                         \
+    .n_enum_strings = 5,                                               \
+    .enum_strings = {                                                  \
+      "fast-rtx",                                                      \
+      "rtx-timeout",                                                   \
+      "first-rtx",                                                     \
+      "recovered",                                                     \
+      "congestion",                                                    \
+    },                                                                 \
+  };                                                                   \
+  DECLARE_ETD(_tc, _e, 4);                                             \
+  ed->data[0] = _sub_evt;                                              \
+  ed->data[1] = tcp_available_snd_space (_tc);                         \
+  ed->data[2] = _tc->snd_congestion - _tc->iss;                                \
+  ed->data[3] = _tc->rtx_bytes;                                                \
+}
+
+#define TCP_EVT_CC_PACK_HANDLER(_tc, ...)                              \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "pack: snd_una %u snd_una_max %u",                       \
+    .format_args = "i4i4",                                             \
+  };                                                                   \
+  DECLARE_ETD(_tc, _e, 2);                                             \
+  ed->data[0] = _tc->snd_una - _tc->iss;                               \
+  ed->data[1] = _tc->snd_una_max - _tc->iss;                           \
+}
+
+#else
+#define TCP_EVT_CC_RTX_HANDLER(_tc, offset, n_bytes, ...)
+#define TCP_EVT_CC_EVT_HANDLER(_tc, _sub_evt, _snd_space, ...)
+#define TCP_EVT_CC_PACK_HANDLER(_tc, ...)
+#endif
+
+#if TCP_DBG_VERBOSE
+#define TCP_EVT_SND_WND_HANDLER(_tc, ...)                              \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "snd_wnd update: %u ",                                   \
+    .format_args = "i4",                                               \
+  };                                                                   \
+  DECLARE_ETD(_tc, _e, 1);                                             \
+  ed->data[0] = _tc->snd_wnd;                                          \
+}
+
+#define TCP_EVT_OUTPUT_HANDLER(_tc, flags, n_bytes,...)                        \
+{                                                                      \
+  ELOG_TYPE_DECLARE (_e) =                                             \
+  {                                                                    \
+    .format = "out: flags %x, bytes %u",                               \
+    .format_args = "i4i4",                                             \
+  };                                                                   \
+  DECLARE_ETD(_tc, _e, 2);                                             \
+  ed->data[0] = flags;                                                 \
+  ed->data[1] = n_bytes;                                               \
+}
+#else
+#define TCP_EVT_SND_WND_HANDLER(_tc, ...)
+#define TCP_EVT_OUTPUT_HANDLER(_tc, flags, n_bytes,...)
+#endif
+
 #define CONCAT_HELPER(_a, _b) _a##_b
 #define CC(_a, _b) CONCAT_HELPER(_a, _b)
-
 #define TCP_EVT_DBG(_evt, _args...) CC(_evt, _HANDLER)(_args)
 
 #else
index 2dbdd9b..b91a08c 100644 (file)
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 tcp_error (NONE, "no error")
 tcp_error (NO_LISTENER, "no listener for dst port")
 tcp_error (LOOKUP_DROPS, "lookup drops")
 tcp_error (DISPATCH, "Dispatch error")
 tcp_error (ENQUEUED, "Packets pushed into rx fifo") 
+tcp_error (PARTIALLY_ENQUEUED, "Packets partially pushed into rx fifo") 
 tcp_error (PURE_ACK, "Pure acks")
 tcp_error (SYNS_RCVD, "SYNs received")
 tcp_error (SYN_ACKS_RCVD, "SYN-ACKs received")
@@ -26,11 +26,14 @@ tcp_error (FIFO_FULL, "Packets dropped for lack of rx fifo space")
 tcp_error (EVENT_FIFO_FULL, "Events not sent for lack of event fifo space") 
 tcp_error (API_QUEUE_FULL, "Sessions not created for lack of API queue space")
 tcp_error (CREATE_SESSION_FAIL, "Sessions couldn't be allocated")
-tcp_error (SEGMENT_INVALID, "Invalid segment")
+tcp_error (SEGMENT_INVALID, "Invalid segments")
+tcp_error (SEGMENT_OLD, "Old segment")
 tcp_error (ACK_INVALID, "Invalid ACK")
 tcp_error (ACK_DUP, "Duplicate ACK")
 tcp_error (ACK_OLD, "Old ACK")
+tcp_error (ACK_FUTURE, "Future ACK")
 tcp_error (PKTS_SENT, "Packets sent")
 tcp_error (FILTERED_DUPACKS, "Filtered duplicate ACKs")
 tcp_error (RST_SENT, "Resets sent")
 tcp_error (INVALID_CONNECTION, "Invalid connection")
+tcp_error (NO_WND, "No window")
\ No newline at end of file
index 67af432..5d11985 100644 (file)
@@ -95,13 +95,21 @@ vlib_node_registration_t tcp6_established_node;
  * or the rcv_nxt at last ack sent instead of rcv_nxt since that's the
  * peer's reference when computing our receive window.
  *
- * This accepts only segments within the window.
+ * This:
+ *  seq_leq (end_seq, tc->rcv_las + tc->rcv_wnd) && seq_geq (seq, tc->rcv_las)
+ * however, is too strict when we have retransmits. Instead we just check that
+ * the seq is not beyond the right edge and that the end of the segment is not
+ * less than the left edge.
+ *
+ * N.B. rcv_nxt and rcv_wnd are both updated in this node if acks are sent, so
+ * use rcv_nxt in the right edge window test instead of rcv_las.
+ *
  */
 always_inline u8
 tcp_segment_in_rcv_wnd (tcp_connection_t * tc, u32 seq, u32 end_seq)
 {
-  return seq_leq (end_seq, tc->rcv_las + tc->rcv_wnd)
-    && seq_geq (seq, tc->rcv_nxt);
+  return (seq_geq (end_seq, tc->rcv_las)
+         && seq_leq (seq, tc->rcv_nxt + tc->rcv_wnd));
 }
 
 void
@@ -253,6 +261,7 @@ tcp_segment_validate (vlib_main_t * vm, tcp_connection_t * tc0,
            {
              tcp_make_ack (tc0, b0);
              *next0 = tcp_next_output (tc0->c_is_ip4);
+             TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0);
              return -1;
            }
        }
@@ -262,13 +271,25 @@ tcp_segment_validate (vlib_main_t * vm, tcp_connection_t * tc0,
   if (!tcp_segment_in_rcv_wnd (tc0, vnet_buffer (b0)->tcp.seq_number,
                               vnet_buffer (b0)->tcp.seq_end))
     {
-      if (!tcp_rst (th0))
+      /* If our window is 0 and the packet is in sequence, let it pass
+       * through for ack processing. It should be dropped later.*/
+      if (tc0->rcv_wnd == 0
+         && tc0->rcv_nxt == vnet_buffer (b0)->tcp.seq_number)
        {
-         /* Send dup ack */
-         tcp_make_ack (tc0, b0);
-         *next0 = tcp_next_output (tc0->c_is_ip4);
+         /* Make it look as if there's nothing to dequeue */
+         vnet_buffer (b0)->tcp.seq_end = vnet_buffer (b0)->tcp.seq_number;
+       }
+      else
+       {
+         /* If not RST, send dup ack */
+         if (!tcp_rst (th0))
+           {
+             tcp_make_ack (tc0, b0);
+             *next0 = tcp_next_output (tc0->c_is_ip4);
+             TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0);
+           }
+         return -1;
        }
-      return -1;
     }
 
   /* 2nd: check the RST bit */
@@ -326,13 +347,13 @@ tcp_estimate_rtt (tcp_connection_t * tc, u32 mrtt)
 
       /* XXX Drop in RTT results in RTTVAR increase and bigger RTO.
        * The increase should be bound */
-      tc->rttvar += (clib_abs (err) - tc->rttvar) >> 2;
+      tc->rttvar += ((int) clib_abs (err) - (int) tc->rttvar) >> 2;
     }
   else
     {
       /* First measurement. */
       tc->srtt = mrtt;
-      tc->rttvar = mrtt << 1;
+      tc->rttvar = mrtt >> 1;
     }
 }
 
@@ -394,7 +415,11 @@ tcp_dequeue_acked (tcp_connection_t * tc, u32 ack)
     }
 }
 
-/** Check if dupack as per RFC5681 Sec. 2 */
+/**
+ * Check if dupack as per RFC5681 Sec. 2
+ *
+ * This works only if called before updating snd_wnd.
+ * */
 always_inline u8
 tcp_ack_is_dupack (tcp_connection_t * tc, vlib_buffer_t * b, u32 new_snd_wnd)
 {
@@ -429,10 +454,10 @@ scoreboard_remove_hole (sack_scoreboard_t * sb, sack_scoreboard_hole_t * hole)
 }
 
 sack_scoreboard_hole_t *
-scoreboard_insert_hole (sack_scoreboard_t * sb, sack_scoreboard_hole_t * prev,
+scoreboard_insert_hole (sack_scoreboard_t * sb, u32 prev_index,
                        u32 start, u32 end)
 {
-  sack_scoreboard_hole_t *hole, *next;
+  sack_scoreboard_hole_t *hole, *next, *prev;
   u32 hole_index;
 
   pool_get (sb->holes, hole);
@@ -442,6 +467,7 @@ scoreboard_insert_hole (sack_scoreboard_t * sb, sack_scoreboard_hole_t * prev,
   hole->end = end;
   hole_index = hole - sb->holes;
 
+  prev = scoreboard_get_hole (sb, prev_index);
   if (prev)
     {
       hole->prev = prev - sb->holes;
@@ -462,28 +488,35 @@ scoreboard_insert_hole (sack_scoreboard_t * sb, sack_scoreboard_hole_t * prev,
   return hole;
 }
 
-static void
+void
 tcp_rcv_sacks (tcp_connection_t * tc, u32 ack)
 {
   sack_scoreboard_t *sb = &tc->sack_sb;
   sack_block_t *blk, tmp;
-  sack_scoreboard_hole_t *hole, *next_hole;
-  u32 blk_index = 0;
+  sack_scoreboard_hole_t *hole, *next_hole, *last_hole, *new_hole;
+  u32 blk_index = 0, old_sacked_bytes, hole_index;
   int i, j;
 
-  if (!tcp_opts_sack (tc) && sb->head == TCP_INVALID_SACK_HOLE_INDEX)
+  sb->last_sacked_bytes = 0;
+  sb->snd_una_adv = 0;
+  old_sacked_bytes = sb->sacked_bytes;
+
+  if (!tcp_opts_sack (&tc->opt) && sb->head == TCP_INVALID_SACK_HOLE_INDEX)
     return;
 
   /* Remove invalid blocks */
-  vec_foreach (blk, tc->opt.sacks)
-  {
-    if (seq_lt (blk->start, blk->end)
-       && seq_gt (blk->start, tc->snd_una)
-       && seq_gt (blk->start, ack) && seq_lt (blk->end, tc->snd_nxt))
-      continue;
-
-    vec_del1 (tc->opt.sacks, blk - tc->opt.sacks);
-  }
+  blk = tc->opt.sacks;
+  while (blk < vec_end (tc->opt.sacks))
+    {
+      if (seq_lt (blk->start, blk->end)
+         && seq_gt (blk->start, tc->snd_una)
+         && seq_gt (blk->start, ack) && seq_leq (blk->end, tc->snd_nxt))
+       {
+         blk++;
+         continue;
+       }
+      vec_del1 (tc->opt.sacks, blk - tc->opt.sacks);
+    }
 
   /* Add block for cumulative ack */
   if (seq_gt (ack, tc->snd_una))
@@ -498,7 +531,7 @@ tcp_rcv_sacks (tcp_connection_t * tc, u32 ack)
 
   /* Make sure blocks are ordered */
   for (i = 0; i < vec_len (tc->opt.sacks); i++)
-    for (j = i; j < vec_len (tc->opt.sacks); j++)
+    for (j = i + 1; j < vec_len (tc->opt.sacks); j++)
       if (seq_lt (tc->opt.sacks[j].start, tc->opt.sacks[i].start))
        {
          tmp = tc->opt.sacks[i];
@@ -506,10 +539,22 @@ tcp_rcv_sacks (tcp_connection_t * tc, u32 ack)
          tc->opt.sacks[j] = tmp;
        }
 
-  /* If no holes, insert the first that covers all outstanding bytes */
   if (sb->head == TCP_INVALID_SACK_HOLE_INDEX)
     {
-      scoreboard_insert_hole (sb, 0, tc->snd_una, tc->snd_una_max);
+      /* If no holes, insert the first that covers all outstanding bytes */
+      last_hole = scoreboard_insert_hole (sb, TCP_INVALID_SACK_HOLE_INDEX,
+                                         tc->snd_una, tc->snd_una_max);
+      sb->tail = scoreboard_hole_index (sb, last_hole);
+    }
+  else
+    {
+      /* If we have holes but snd_una_max is beyond the last hole, update
+       * last hole end */
+      tmp = tc->opt.sacks[vec_len (tc->opt.sacks) - 1];
+      last_hole = scoreboard_last_hole (sb);
+      if (seq_gt (tc->snd_una_max, sb->max_byte_sacked)
+         && seq_gt (tc->snd_una_max, last_hole->end))
+       last_hole->end = tc->snd_una_max;
     }
 
   /* Walk the holes with the SACK blocks */
@@ -526,10 +571,10 @@ tcp_rcv_sacks (tcp_connection_t * tc, u32 ack)
              next_hole = scoreboard_next_hole (sb, hole);
 
              /* Byte accounting */
-             if (seq_lt (hole->end, ack))
+             if (seq_leq (hole->end, ack))
                {
-                 /* Bytes lost because snd wnd left edge advances */
-                 if (seq_lt (next_hole->start, ack))
+                 /* Bytes lost because snd_wnd left edge advances */
+                 if (next_hole && seq_leq (next_hole->start, ack))
                    sb->sacked_bytes -= next_hole->start - hole->end;
                  else
                    sb->sacked_bytes -= ack - hole->end;
@@ -539,35 +584,78 @@ tcp_rcv_sacks (tcp_connection_t * tc, u32 ack)
                  sb->sacked_bytes += scoreboard_hole_bytes (hole);
                }
 
+             /* snd_una needs to be advanced */
+             if (seq_geq (ack, hole->end))
+               {
+                 if (next_hole && seq_lt (ack, next_hole->start))
+                   sb->snd_una_adv = next_hole->start - ack;
+                 else
+                   sb->snd_una_adv = sb->max_byte_sacked - ack;
+
+                 /* all these can be delivered */
+                 sb->sacked_bytes -= sb->snd_una_adv;
+               }
+
+             /* About to remove last hole */
+             if (hole == last_hole)
+               {
+                 sb->tail = hole->prev;
+                 last_hole = scoreboard_last_hole (sb);
+                 /* keep track of max byte sacked in case the last hole
+                  * is acked */
+                 if (seq_gt (hole->end, sb->max_byte_sacked))
+                   sb->max_byte_sacked = hole->end;
+               }
              scoreboard_remove_hole (sb, hole);
              hole = next_hole;
            }
-         /* Partial overlap */
+         /* Partial 'head' overlap */
          else
            {
-             sb->sacked_bytes += blk->end - hole->start;
-             hole->start = blk->end;
+             if (seq_gt (blk->end, hole->start))
+               {
+                 sb->sacked_bytes += blk->end - hole->start;
+                 hole->start = blk->end;
+               }
              blk_index++;
            }
        }
       else
        {
          /* Hole must be split */
-         if (seq_leq (blk->end, hole->end))
+         if (seq_lt (blk->end, hole->end))
            {
              sb->sacked_bytes += blk->end - blk->start;
-             scoreboard_insert_hole (sb, hole, blk->end, hole->end);
-             hole->end = blk->start - 1;
+             hole_index = scoreboard_hole_index (sb, hole);
+             new_hole = scoreboard_insert_hole (sb, hole_index, blk->end,
+                                                hole->end);
+
+             /* Pool might've moved */
+             hole = scoreboard_get_hole (sb, hole_index);
+             hole->end = blk->start;
+
+             /* New or split of tail */
+             if ((last_hole->end == new_hole->end)
+                 || seq_lt (last_hole->end, new_hole->start))
+               {
+                 last_hole = new_hole;
+                 sb->tail = scoreboard_hole_index (sb, new_hole);
+               }
+
              blk_index++;
+             hole = scoreboard_next_hole (sb, hole);
            }
          else
            {
-             sb->sacked_bytes += hole->end - blk->start + 1;
-             hole->end = blk->start - 1;
+             sb->sacked_bytes += hole->end - blk->start;
+             hole->end = blk->start;
              hole = scoreboard_next_hole (sb, hole);
            }
        }
     }
+
+  sb->last_sacked_bytes = sb->sacked_bytes + sb->snd_una_adv
+    - old_sacked_bytes;
 }
 
 /** Update snd_wnd
@@ -577,72 +665,94 @@ tcp_rcv_sacks (tcp_connection_t * tc, u32 ack)
 static void
 tcp_update_snd_wnd (tcp_connection_t * tc, u32 seq, u32 ack, u32 snd_wnd)
 {
-  if (tc->snd_wl1 < seq || (tc->snd_wl1 == seq && tc->snd_wl2 <= ack))
+  if (seq_lt (tc->snd_wl1, seq)
+      || (tc->snd_wl1 == seq && seq_leq (tc->snd_wl2, ack)))
     {
       tc->snd_wnd = snd_wnd;
       tc->snd_wl1 = seq;
       tc->snd_wl2 = ack;
+      TCP_EVT_DBG (TCP_EVT_SND_WND, tc);
     }
 }
 
-static void
+void
 tcp_cc_congestion (tcp_connection_t * tc)
 {
+  tc->snd_congestion = tc->snd_nxt;
   tc->cc_algo->congestion (tc);
+  TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 4);
 }
 
-static void
+void
 tcp_cc_recover (tcp_connection_t * tc)
 {
-  if (tcp_in_fastrecovery (tc))
-    {
-      tc->cc_algo->recovered (tc);
-      tcp_recovery_off (tc);
-    }
-  else if (tcp_in_recovery (tc))
-    {
-      tcp_recovery_off (tc);
-      tc->cwnd = tcp_loss_wnd (tc);
-    }
+  tc->cc_algo->recovered (tc);
+
+  tc->rtx_bytes = 0;
+  tc->rcv_dupacks = 0;
+  tc->snd_nxt = tc->snd_una;
+
+  tc->cc_algo->rcv_ack (tc);
+  tc->tsecr_last_ack = tc->opt.tsecr;
+
+  tcp_fastrecovery_1_smss_off (tc);
+  tcp_fastrecovery_off (tc);
+
+  TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 3);
 }
 
 static void
-tcp_cc_rcv_ack (tcp_connection_t * tc)
+tcp_cc_rcv_ack (tcp_connection_t * tc, vlib_buffer_t * b)
 {
   u8 partial_ack;
 
   if (tcp_in_recovery (tc))
     {
-      partial_ack = seq_lt (tc->snd_una, tc->snd_una_max);
+      partial_ack = seq_lt (tc->snd_una, tc->snd_congestion);
       if (!partial_ack)
        {
          /* Clear retransmitted bytes. */
-         tc->rtx_bytes = 0;
          tcp_cc_recover (tc);
        }
       else
        {
+         TCP_EVT_DBG (TCP_EVT_CC_PACK, tc);
+
          /* Clear retransmitted bytes. XXX should we clear all? */
          tc->rtx_bytes = 0;
          tc->cc_algo->rcv_cong_ack (tc, TCP_CC_PARTIALACK);
 
-         /* Retransmit first unacked segment */
-         tcp_retransmit_first_unacked (tc);
+         /* In case snd_nxt is still in the past and output tries to
+          * shove some new bytes */
+         tc->snd_nxt = tc->snd_una;
+
+         /* XXX need proper RFC6675 support */
+         if (tc->sack_sb.last_sacked_bytes)
+           {
+             tcp_fast_retransmit (tc);
+           }
+         else
+           {
+             /* Retransmit first unacked segment */
+             tcp_retransmit_first_unacked (tc);
+             /* If window allows, send 1 SMSS of new data */
+             if (seq_lt (tc->snd_nxt, tc->snd_congestion))
+               tc->snd_nxt = tc->snd_congestion;
+           }
        }
     }
   else
     {
       tc->cc_algo->rcv_ack (tc);
+      tc->tsecr_last_ack = tc->opt.tsecr;
+      tc->rcv_dupacks = 0;
     }
-
-  tc->rcv_dupacks = 0;
-  tc->tsecr_last_ack = tc->opt.tsecr;
 }
 
 static void
 tcp_cc_rcv_dupack (tcp_connection_t * tc, u32 ack)
 {
-  ASSERT (tc->snd_una == ack);
+//  ASSERT (seq_geq(tc->snd_una, ack));
 
   tc->rcv_dupacks++;
   if (tc->rcv_dupacks == TCP_DUPACK_THRESHOLD)
@@ -688,20 +798,39 @@ tcp_rcv_ack (tcp_connection_t * tc, vlib_buffer_t * b,
 {
   u32 new_snd_wnd;
 
-  /* If the ACK acks something not yet sent (SEG.ACK > SND.NXT) then send an
-   * ACK, drop the segment, and return  */
+  /* If the ACK acks something not yet sent (SEG.ACK > SND.NXT) */
   if (seq_gt (vnet_buffer (b)->tcp.ack_number, tc->snd_nxt))
     {
-      tcp_make_ack (tc, b);
-      *next = tcp_next_output (tc->c_is_ip4);
-      *error = TCP_ERROR_ACK_INVALID;
-      return -1;
+      /* If we have outstanding data and this is within the window, accept it,
+       * probably retransmit has timed out. Otherwise ACK segment and then
+       * drop it */
+      if (seq_gt (vnet_buffer (b)->tcp.ack_number, tc->snd_una_max))
+       {
+         tcp_make_ack (tc, b);
+         *next = tcp_next_output (tc->c_is_ip4);
+         *error = TCP_ERROR_ACK_INVALID;
+         TCP_EVT_DBG (TCP_EVT_ACK_RCV_ERR, tc, 0,
+                      vnet_buffer (b)->tcp.ack_number);
+         return -1;
+       }
+
+      tc->snd_nxt = vnet_buffer (b)->tcp.ack_number;
+      *error = TCP_ERROR_ACK_FUTURE;
+      TCP_EVT_DBG (TCP_EVT_ACK_RCV_ERR, tc, 2,
+                  vnet_buffer (b)->tcp.ack_number);
     }
 
-  /* If old ACK, discard */
+  /* If old ACK, probably it's an old dupack */
   if (seq_lt (vnet_buffer (b)->tcp.ack_number, tc->snd_una))
     {
       *error = TCP_ERROR_ACK_OLD;
+      TCP_EVT_DBG (TCP_EVT_ACK_RCV_ERR, tc, 1,
+                  vnet_buffer (b)->tcp.ack_number);
+      if (tcp_in_fastrecovery (tc) && tc->rcv_dupacks == TCP_DUPACK_THRESHOLD)
+       {
+         TCP_EVT_DBG (TCP_EVT_DUPACK_RCVD, tc);
+         tcp_cc_rcv_dupack (tc, vnet_buffer (b)->tcp.ack_number);
+       }
       return -1;
     }
 
@@ -712,32 +841,40 @@ tcp_rcv_ack (tcp_connection_t * tc, vlib_buffer_t * b,
 
   if (tcp_ack_is_dupack (tc, b, new_snd_wnd))
     {
+      TCP_EVT_DBG (TCP_EVT_DUPACK_RCVD, tc, 1);
       tcp_cc_rcv_dupack (tc, vnet_buffer (b)->tcp.ack_number);
       *error = TCP_ERROR_ACK_DUP;
       return -1;
     }
 
-  /* Valid ACK */
+  /*
+   * Valid ACK
+   */
+
   tc->bytes_acked = vnet_buffer (b)->tcp.ack_number - tc->snd_una;
-  tc->snd_una = vnet_buffer (b)->tcp.ack_number;
+  tc->snd_una = vnet_buffer (b)->tcp.ack_number + tc->sack_sb.snd_una_adv;
 
-  /* Dequeue ACKed packet and update RTT */
+  /* Dequeue ACKed data and update RTT */
   tcp_dequeue_acked (tc, vnet_buffer (b)->tcp.ack_number);
-
   tcp_update_snd_wnd (tc, vnet_buffer (b)->tcp.seq_number,
                      vnet_buffer (b)->tcp.ack_number, new_snd_wnd);
 
-  /* Updates congestion control (slow start/congestion avoidance) */
-  tcp_cc_rcv_ack (tc);
+  /* If some of our sent bytes have been acked, update cc and retransmit
+   * timer. */
+  if (tc->bytes_acked)
+    {
+      TCP_EVT_DBG (TCP_EVT_ACK_RCVD, tc, vnet_buffer (b)->tcp.ack_number);
 
-  TCP_EVT_DBG (TCP_EVT_ACK_RCVD, tc);
+      /* Updates congestion control (slow start/congestion avoidance) */
+      tcp_cc_rcv_ack (tc, b);
 
-  /* If everything has been acked, stop retransmit timer
-   * otherwise update */
-  if (tc->snd_una == tc->snd_una_max)
-    tcp_timer_reset (tc, TCP_TIMER_RETRANSMIT);
-  else
-    tcp_timer_update (tc, TCP_TIMER_RETRANSMIT, tc->rto);
+      /* If everything has been acked, stop retransmit timer
+       * otherwise update */
+      if (tc->snd_una == tc->snd_una_max)
+       tcp_retransmit_timer_reset (tc);
+      else
+       tcp_retransmit_timer_update (tc);
+    }
 
   return 0;
 }
@@ -757,9 +894,7 @@ static void
 tcp_update_sack_list (tcp_connection_t * tc, u32 start, u32 end)
 {
   sack_block_t *new_list = 0, block;
-  u32 n_elts;
   int i;
-  u8 new_head = 0;
 
   /* If the first segment is ooo add it to the list. Last write might've moved
    * rcv_nxt over the first segment. */
@@ -768,7 +903,6 @@ tcp_update_sack_list (tcp_connection_t * tc, u32 start, u32 end)
       block.start = start;
       block.end = end;
       vec_add1 (new_list, block);
-      new_head = 1;
     }
 
   /* Find the blocks still worth keeping. */
@@ -782,20 +916,19 @@ tcp_update_sack_list (tcp_connection_t * tc, u32 start, u32 end)
          || seq_leq (tc->snd_sacks[i].start, end))
        continue;
 
-      /* Save subsequent segments to new SACK list. */
-      n_elts = clib_min (vec_len (tc->snd_sacks) - i,
-                        TCP_MAX_SACK_BLOCKS - new_head);
-      vec_insert_elts (new_list, &tc->snd_sacks[i], n_elts, new_head);
-      break;
+      /* Save to new SACK list. */
+      vec_add1 (new_list, tc->snd_sacks[i]);
     }
 
+  ASSERT (vec_len (new_list) < TCP_MAX_SACK_BLOCKS);
+
   /* Replace old vector with new one */
   vec_free (tc->snd_sacks);
   tc->snd_sacks = new_list;
 }
 
 /** Enqueue data for delivery to application */
-always_inline u32
+always_inline int
 tcp_session_enqueue_data (tcp_connection_t * tc, vlib_buffer_t * b,
                          u16 data_len)
 {
@@ -812,6 +945,8 @@ tcp_session_enqueue_data (tcp_connection_t * tc, vlib_buffer_t * b,
                                         vlib_buffer_get_current (b),
                                         data_len, 1 /* queue event */ );
 
+  TCP_EVT_DBG (TCP_EVT_INPUT, tc, 0, data_len, written);
+
   /* Update rcv_nxt */
   if (PREDICT_TRUE (written == data_len))
     {
@@ -824,38 +959,61 @@ tcp_session_enqueue_data (tcp_connection_t * tc, vlib_buffer_t * b,
 
       /* Send ACK confirming the update */
       tc->flags |= TCP_CONN_SNDACK;
+    }
+  else if (written > 0)
+    {
+      /* We've written something but FIFO is probably full now */
+      tc->rcv_nxt += written;
 
-      /* Update SACK list if need be */
-      if (tcp_opts_sack_permitted (&tc->opt))
-       {
-         /* Remove SACK blocks that have been delivered */
-         tcp_update_sack_list (tc, tc->rcv_nxt, tc->rcv_nxt);
-       }
+      /* Depending on how fast the app is, all remaining buffers in burst will
+       * not be enqueued. Should we inform peer of the damage? XXX */
+      return TCP_ERROR_PARTIALLY_ENQUEUED;
     }
   else
     {
-      ASSERT (0);
       return TCP_ERROR_FIFO_FULL;
     }
 
+  /* Update SACK list if need be */
+  if (tcp_opts_sack_permitted (&tc->opt))
+    {
+      /* Remove SACK blocks that have been delivered */
+      tcp_update_sack_list (tc, tc->rcv_nxt, tc->rcv_nxt);
+    }
+
   return TCP_ERROR_ENQUEUED;
 }
 
 /** Enqueue out-of-order data */
-always_inline u32
+always_inline int
 tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
                         u16 data_len)
 {
   stream_session_t *s0;
   u32 offset, seq;
+  int rv;
+
+  /* Pure ACK. Do nothing */
+  if (PREDICT_FALSE (data_len == 0))
+    {
+      return TCP_ERROR_PURE_ACK;
+    }
 
   s0 = stream_session_get (tc->c_s_index, tc->c_thread_index);
   seq = vnet_buffer (b)->tcp.seq_number;
   offset = seq - tc->rcv_nxt;
 
-  if (svm_fifo_enqueue_with_offset (s0->server_rx_fifo, s0->pid, offset,
-                                   data_len, vlib_buffer_get_current (b)))
-    return TCP_ERROR_FIFO_FULL;
+  rv = svm_fifo_enqueue_with_offset (s0->server_rx_fifo, s0->pid, offset,
+                                    data_len, vlib_buffer_get_current (b));
+
+  /* Nothing written */
+  if (rv)
+    {
+      TCP_EVT_DBG (TCP_EVT_INPUT, tc, 1, data_len, 0);
+      return TCP_ERROR_FIFO_FULL;
+    }
+
+  TCP_EVT_DBG (TCP_EVT_INPUT, tc, 1, data_len, data_len);
 
   /* Update SACK list if in use */
   if (tcp_opts_sack_permitted (&tc->opt))
@@ -875,20 +1033,23 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
 }
 
 /**
- * Check if ACK could be delayed. DELACK timer is set only after frame is
- * processed so this can return true for a full bursts of packets.
+ * Check if ACK could be delayed. If ack can be delayed, it should return
+ * true for a full frame. If we're always acking return 0.
  */
 always_inline int
 tcp_can_delack (tcp_connection_t * tc)
 {
-  /* If there's no DELACK timer set and the last window sent wasn't 0 we
-   * can safely delay. */
-  if (!tcp_timer_is_active (tc, TCP_TIMER_DELACK)
-      && (tc->flags & TCP_CONN_SENT_RCV_WND0) == 0
-      && (tc->flags & TCP_CONN_SNDACK) == 0)
-    return 1;
+  /* Send ack if ... */
+  if (TCP_ALWAYS_ACK
+      /* just sent a rcv wnd 0 */
+      || (tc->flags & TCP_CONN_SENT_RCV_WND0) != 0
+      /* constrained to send ack */
+      || (tc->flags & TCP_CONN_SNDACK) != 0
+      /* we're almost out of tx wnd */
+      || tcp_available_snd_space (tc) < 2 * tc->snd_mss)
+    return 0;
 
-  return 0;
+  return 1;
 }
 
 static int
@@ -900,23 +1061,33 @@ tcp_segment_rcv (tcp_main_t * tm, tcp_connection_t * tc, vlib_buffer_t * b,
   /* Handle out-of-order data */
   if (PREDICT_FALSE (vnet_buffer (b)->tcp.seq_number != tc->rcv_nxt))
     {
+      /* Old sequence numbers allowed through because they overlapped
+       * the rx window */
+      if (seq_lt (vnet_buffer (b)->tcp.seq_number, tc->rcv_nxt))
+       {
+         error = TCP_ERROR_SEGMENT_OLD;
+         *next0 = TCP_NEXT_DROP;
+         goto done;
+       }
+
       error = tcp_session_enqueue_ooo (tc, b, n_data_bytes);
 
-      /* Don't send more than 3 dupacks per burst
-       * XXX decide if this is good */
-      if (tc->snt_dupacks < 3)
-       {
-         /* RFC2581: Send DUPACK for fast retransmit */
-         tcp_make_ack (tc, b);
-         *next0 = tcp_next_output (tc->c_is_ip4);
+      /* N.B. Should not filter burst of dupacks. Two issues 1) dupacks open
+       * cwnd on remote peer when congested 2) acks leaving should have the
+       * latest rcv_wnd since the burst may eaten up all of it, so only the
+       * old ones could be filtered.
+       */
 
-         /* Mark as DUPACK. We may filter these in output if
-          * the burst fills the holes. */
-         vnet_buffer (b)->tcp.flags = TCP_BUF_FLAG_DUPACK;
+      /* RFC2581: Send DUPACK for fast retransmit */
+      tcp_make_ack (tc, b);
+      *next0 = tcp_next_output (tc->c_is_ip4);
 
-         tc->snt_dupacks++;
-       }
+      /* Mark as DUPACK. We may filter these in output if
+       * the burst fills the holes. */
+      if (n_data_bytes)
+       vnet_buffer (b)->tcp.flags = TCP_BUF_FLAG_DUPACK;
 
+      TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc);
       goto done;
     }
 
@@ -924,63 +1095,45 @@ tcp_segment_rcv (tcp_main_t * tm, tcp_connection_t * tc, vlib_buffer_t * b,
    * segments can be enqueued after fifo tail offset changes. */
   error = tcp_session_enqueue_data (tc, b, n_data_bytes);
 
-  TCP_EVT_DBG (TCP_EVT_INPUT, tc, n_data_bytes);
+  if (n_data_bytes == 0)
+    {
+      *next0 = TCP_NEXT_DROP;
+      goto done;
+    }
+
+  if (PREDICT_FALSE (error == TCP_ERROR_FIFO_FULL))
+    *next0 = TCP_NEXT_DROP;
 
   /* Check if ACK can be delayed */
-  if (tcp_can_delack (tc))
+  if (!tcp_can_delack (tc))
     {
-      /* Nothing to do for pure ACKs */
+      /* Nothing to do for pure ACKs XXX */
       if (n_data_bytes == 0)
        goto done;
 
-      /* If connection has not been previously marked for delay ack
-       * add it to the list and flag it */
-      if (!tc->flags & TCP_CONN_DELACK)
-       {
-         vec_add1 (tm->delack_connections[tc->c_thread_index],
-                   tc->c_c_index);
-         tc->flags |= TCP_CONN_DELACK;
-       }
+      *next0 = tcp_next_output (tc->c_is_ip4);
+      tcp_make_ack (tc, b);
     }
   else
     {
-      /* Check if a packet has already been enqueued to output for burst.
-       * If yes, then drop this one, otherwise, let it pass through to
-       * output */
-      if ((tc->flags & TCP_CONN_BURSTACK) == 0)
-       {
-         *next0 = tcp_next_output (tc->c_is_ip4);
-         tcp_make_ack (tc, b);
-         error = TCP_ERROR_ENQUEUED;
-
-         /* TODO: maybe add counter to ensure N acks will be sent/burst */
-         tc->flags |= TCP_CONN_BURSTACK;
-       }
+      if (!tcp_timer_is_active (tc, TCP_TIMER_DELACK))
+       tcp_timer_set (tc, TCP_TIMER_DELACK, TCP_DELACK_TIME);
     }
 
 done:
   return error;
 }
 
-void
-delack_timers_init (tcp_main_t * tm, u32 thread_index)
+always_inline void
+tcp_established_inc_counter (vlib_main_t * vm, u8 is_ip4, u8 evt, u8 val)
 {
-  tcp_connection_t *tc;
-  u32 i, *conns;
-  tw_timer_wheel_16t_2w_512sl_t *tw;
-
-  tw = &tm->timer_wheels[thread_index];
-  conns = tm->delack_connections[thread_index];
-  for (i = 0; i < vec_len (conns); i++)
-    {
-      tc = pool_elt_at_index (tm->connections[thread_index], conns[i]);
-      ASSERT (0 != tc);
+  if (PREDICT_TRUE (!val))
+    return;
 
-      tc->timers[TCP_TIMER_DELACK]
-       = tw_timer_start_16t_2w_512sl (tw, conns[i],
-                                      TCP_TIMER_DELACK, TCP_DELACK_TIME);
-    }
-  vec_reset_length (tm->delack_connections[thread_index]);
+  if (is_ip4)
+    vlib_node_increment_counter (vm, tcp4_established_node.index, evt, val);
+  else
+    vlib_node_increment_counter (vm, tcp6_established_node.index, evt, val);
 }
 
 always_inline uword
@@ -1027,7 +1180,7 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
          if (PREDICT_FALSE (tc0 == 0))
            {
              error0 = TCP_ERROR_INVALID_CONNECTION;
-             goto drop;
+             goto done;
            }
 
          /* Checksum computed by ipx_local no need to compute again */
@@ -1061,18 +1214,22 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
          if (PREDICT_FALSE (tcp_segment_validate (vm, tc0, b0, th0, &next0)))
            {
              error0 = TCP_ERROR_SEGMENT_INVALID;
-             goto drop;
+             TCP_EVT_DBG (TCP_EVT_SEG_INVALID, tc0,
+                          vnet_buffer (b0)->tcp.seq_number,
+                          vnet_buffer (b0)->tcp.seq_end);
+             goto done;
            }
 
          /* 5: check the ACK field  */
          if (tcp_rcv_ack (tc0, b0, th0, &next0, &error0))
            {
-             goto drop;
+             goto done;
            }
 
          /* 6: check the URG bit TODO */
 
          /* 7: process the segment text */
+
          vlib_buffer_advance (b0, n_advance_bytes0);
          error0 = tcp_segment_rcv (tm, tc0, b0, n_data_bytes0, &next0);
 
@@ -1088,7 +1245,7 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
            }
 
-       drop:
+       done:
          b0->error = node->errors[error0];
          if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
            {
@@ -1103,17 +1260,7 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
     }
 
   errors = session_manager_flush_enqueue_events (my_thread_index);
-  if (errors)
-    {
-      if (is_ip4)
-       vlib_node_increment_counter (vm, tcp4_established_node.index,
-                                    TCP_ERROR_EVENT_FIFO_FULL, errors);
-      else
-       vlib_node_increment_counter (vm, tcp6_established_node.index,
-                                    TCP_ERROR_EVENT_FIFO_FULL, errors);
-    }
-
-  delack_timers_init (tm, my_thread_index);
+  tcp_established_inc_counter (vm, is_ip4, TCP_ERROR_EVENT_FIFO_FULL, errors);
 
   return from_frame->n_vectors;
 }
@@ -1602,7 +1749,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              stream_session_accept_notify (&tc0->connection);
 
              /* Reset SYN-ACK retransmit timer */
-             tcp_timer_reset (tc0, TCP_TIMER_RETRANSMIT);
+             tcp_retransmit_timer_reset (tc0);
              break;
            case TCP_STATE_ESTABLISHED:
              /* We can get packets in established state here because they
@@ -1668,7 +1815,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
 
              /* Stop retransmit */
-             tcp_timer_reset (tc0, TCP_TIMER_RETRANSMIT);
+             tcp_retransmit_timer_reset (tc0);
 
              goto drop;
 
index 114a5b9..a671f72 100644 (file)
@@ -125,15 +125,33 @@ tcp_initial_window_to_advertise (tcp_connection_t * tc)
 u32
 tcp_window_to_advertise (tcp_connection_t * tc, tcp_state_t state)
 {
-  u32 available_space, max_fifo, observed_wnd;
-
   if (state < TCP_STATE_ESTABLISHED)
     return tcp_initial_window_to_advertise (tc);
 
+  tcp_update_rcv_wnd (tc);
+
+  if (tc->rcv_wnd == 0)
+    {
+      tc->flags |= TCP_CONN_SENT_RCV_WND0;
+    }
+  else
+    {
+      tc->flags &= ~TCP_CONN_SENT_RCV_WND0;
+    }
+
+  return tc->rcv_wnd >> tc->rcv_wscale;
+}
+
+void
+tcp_update_rcv_wnd (tcp_connection_t * tc)
+{
+  i32 observed_wnd;
+  u32 available_space, max_fifo, wnd;
+
   /*
    * Figure out how much space we have available
    */
-  available_space = stream_session_max_enqueue (&tc->connection);
+  available_space = stream_session_max_rx_enqueue (&tc->connection);
   max_fifo = stream_session_fifo_size (&tc->connection);
 
   ASSERT (tc->opt.mss < max_fifo);
@@ -145,23 +163,25 @@ tcp_window_to_advertise (tcp_connection_t * tc, tcp_state_t state)
    * Use the above and what we know about what we've previously advertised
    * to compute the new window
    */
-  observed_wnd = tc->rcv_wnd - (tc->rcv_nxt - tc->rcv_las);
+  observed_wnd = (i32) tc->rcv_wnd - (tc->rcv_nxt - tc->rcv_las);
+  if (observed_wnd < 0)
+    observed_wnd = 0;
 
   /* Bad. Thou shalt not shrink */
   if (available_space < observed_wnd)
     {
-      if (available_space == 0)
-       clib_warning ("Didn't shrink rcv window despite not having space");
+      /* Does happen! */
+      wnd = observed_wnd;
     }
-
-  tc->rcv_wnd = clib_min (available_space, TCP_WND_MAX << tc->rcv_wscale);
-
-  if (tc->rcv_wnd == 0)
+  else
     {
-      tc->flags |= TCP_CONN_SENT_RCV_WND0;
+      wnd = available_space;
     }
 
-  return tc->rcv_wnd >> tc->rcv_wscale;
+  if (wnd && ((wnd << tc->rcv_wscale) >> tc->rcv_wscale != wnd))
+    wnd += 1 << tc->rcv_wscale;
+
+  tc->rcv_wnd = clib_min (wnd, TCP_WND_MAX << tc->rcv_wscale);
 }
 
 /**
@@ -363,7 +383,7 @@ tcp_make_options (tcp_connection_t * tc, tcp_options_t * opts,
 #define tcp_get_free_buffer_index(tm, bidx)                             \
 do {                                                                    \
   u32 *my_tx_buffers, n_free_buffers;                                   \
-  u32 cpu_index = tm->vlib_main->cpu_index;                             \
+  u32 cpu_index = os_get_cpu_number();                                 \
   my_tx_buffers = tm->tx_buffers[cpu_index];                            \
   if (PREDICT_FALSE(vec_len (my_tx_buffers) == 0))                      \
     {                                                                   \
@@ -381,6 +401,14 @@ do {                                                                    \
   _vec_len (my_tx_buffers) -= 1;                                        \
 } while (0)
 
+#define tcp_return_buffer(tm)                                          \
+do {                                                                   \
+  u32 *my_tx_buffers;                                                  \
+  u32 cpu_index = os_get_cpu_number();                                 \
+  my_tx_buffers = tm->tx_buffers[cpu_index];                           \
+  _vec_len (my_tx_buffers) +=1;                                                \
+} while (0)
+
 always_inline void
 tcp_reuse_buffer (vlib_main_t * vm, vlib_buffer_t * b)
 {
@@ -421,8 +449,6 @@ tcp_make_ack_i (tcp_connection_t * tc, vlib_buffer_t * b, tcp_state_t state,
                             tc->rcv_nxt, tcp_hdr_opts_len, flags, wnd);
 
   tcp_options_write ((u8 *) (th + 1), snd_opts);
-
-  /* Mark as ACK */
   vnet_buffer (b)->tcp.connection_index = tc->c_c_index;
 }
 
@@ -432,12 +458,12 @@ tcp_make_ack_i (tcp_connection_t * tc, vlib_buffer_t * b, tcp_state_t state,
 void
 tcp_make_ack (tcp_connection_t * tc, vlib_buffer_t * b)
 {
-  tcp_main_t *tm = vnet_get_tcp_main ();
-  vlib_main_t *vm = tm->vlib_main;
+  vlib_main_t *vm = vlib_get_main ();
 
   tcp_reuse_buffer (vm, b);
   tcp_make_ack_i (tc, b, TCP_STATE_ESTABLISHED, TCP_FLAG_ACK);
   vnet_buffer (b)->tcp.flags = TCP_BUF_FLAG_ACK;
+  TCP_EVT_DBG (TCP_EVT_ACK_SENT, tc);
 }
 
 /**
@@ -446,8 +472,7 @@ tcp_make_ack (tcp_connection_t * tc, vlib_buffer_t * b)
 void
 tcp_make_fin (tcp_connection_t * tc, vlib_buffer_t * b)
 {
-  tcp_main_t *tm = vnet_get_tcp_main ();
-  vlib_main_t *vm = tm->vlib_main;
+  vlib_main_t *vm = vlib_get_main ();
   u8 flags = 0;
 
   tcp_reuse_buffer (vm, b);
@@ -467,8 +492,7 @@ tcp_make_fin (tcp_connection_t * tc, vlib_buffer_t * b)
 void
 tcp_make_synack (tcp_connection_t * tc, vlib_buffer_t * b)
 {
-  tcp_main_t *tm = vnet_get_tcp_main ();
-  vlib_main_t *vm = tm->vlib_main;
+  vlib_main_t *vm = vlib_get_main ();
   tcp_options_t _snd_opts, *snd_opts = &_snd_opts;
   u8 tcp_opts_len, tcp_hdr_opts_len;
   tcp_header_t *th;
@@ -631,7 +655,7 @@ tcp_send_reset (vlib_buffer_t * pkt, u8 is_ip4)
   vlib_buffer_t *b;
   u32 bi;
   tcp_main_t *tm = vnet_get_tcp_main ();
-  vlib_main_t *vm = tm->vlib_main;
+  vlib_main_t *vm = vlib_get_main ();
   u8 tcp_hdr_len, flags = 0;
   tcp_header_t *th, *pkt_th;
   u32 seq, ack;
@@ -736,7 +760,7 @@ tcp_send_syn (tcp_connection_t * tc)
   vlib_buffer_t *b;
   u32 bi;
   tcp_main_t *tm = vnet_get_tcp_main ();
-  vlib_main_t *vm = tm->vlib_main;
+  vlib_main_t *vm = vlib_get_main ();
   u8 tcp_hdr_opts_len, tcp_opts_len;
   tcp_header_t *th;
   u32 time_now;
@@ -795,9 +819,9 @@ tcp_enqueue_to_output (vlib_main_t * vm, vlib_buffer_t * b, u32 bi, u8 is_ip4)
 
   /* Decide where to send the packet */
   next_index = is_ip4 ? tcp4_output_node.index : tcp6_output_node.index;
-  f = vlib_get_frame_to_node (vm, next_index);
 
   /* Enqueue the packet */
+  f = vlib_get_frame_to_node (vm, next_index);
   to_next = vlib_frame_vector_args (f);
   to_next[0] = bi;
   f->n_vectors = 1;
@@ -813,7 +837,7 @@ tcp_send_fin (tcp_connection_t * tc)
   vlib_buffer_t *b;
   u32 bi;
   tcp_main_t *tm = vnet_get_tcp_main ();
-  vlib_main_t *vm = tm->vlib_main;
+  vlib_main_t *vm = vlib_get_main ();
 
   tcp_get_free_buffer_index (tm, &bi);
   b = vlib_get_buffer (vm, bi);
@@ -884,22 +908,21 @@ tcp_push_hdr_i (tcp_connection_t * tc, vlib_buffer_t * b,
   vnet_buffer (b)->tcp.connection_index = tc->c_c_index;
 
   tc->snd_nxt += data_len;
+  /* TODO this is updated in output as well ... */
+  if (tc->snd_nxt > tc->snd_una_max)
+    tc->snd_una_max = tc->snd_nxt;
   TCP_EVT_DBG (TCP_EVT_PKTIZE, tc);
 }
 
-/* Send delayed ACK when timer expires */
 void
-tcp_timer_delack_handler (u32 index)
+tcp_send_ack (tcp_connection_t * tc)
 {
   tcp_main_t *tm = vnet_get_tcp_main ();
-  vlib_main_t *vm = tm->vlib_main;
-  u32 thread_index = os_get_cpu_number ();
-  tcp_connection_t *tc;
+  vlib_main_t *vm = vlib_get_main ();
+
   vlib_buffer_t *b;
   u32 bi;
 
-  tc = tcp_connection_get (index, thread_index);
-
   /* Get buffer */
   tcp_get_free_buffer_index (tm, &bi);
   b = vlib_get_buffer (vm, bi);
@@ -907,12 +930,22 @@ tcp_timer_delack_handler (u32 index)
   /* Fill in the ACK */
   tcp_make_ack (tc, b);
 
-  tc->timers[TCP_TIMER_DELACK] = TCP_TIMER_HANDLE_INVALID;
-  tc->flags &= ~TCP_CONN_DELACK;
-
   tcp_enqueue_to_output (vm, b, bi, tc->c_is_ip4);
 }
 
+/* Send delayed ACK when timer expires */
+void
+tcp_timer_delack_handler (u32 index)
+{
+  u32 thread_index = os_get_cpu_number ();
+  tcp_connection_t *tc;
+
+  tc = tcp_connection_get (index, thread_index);
+  tc->timers[TCP_TIMER_DELACK] = TCP_TIMER_HANDLE_INVALID;
+//  tc->flags &= ~TCP_CONN_DELACK;
+  tcp_send_ack (tc);
+}
+
 /** Build a retransmit segment
  *
  * @return the number of bytes in the segment or 0 if there's nothing to
@@ -920,59 +953,74 @@ tcp_timer_delack_handler (u32 index)
  * */
 u32
 tcp_prepare_retransmit_segment (tcp_connection_t * tc, vlib_buffer_t * b,
-                               u32 max_bytes)
+                               u32 offset, u32 max_bytes)
 {
-  tcp_main_t *tm = vnet_get_tcp_main ();
-  vlib_main_t *vm = tm->vlib_main;
-  u32 n_bytes, offset = 0;
-  sack_scoreboard_hole_t *hole;
-  u32 hole_size;
+  vlib_main_t *vm = vlib_get_main ();
+  u32 n_bytes = 0;
 
   tcp_reuse_buffer (vm, b);
 
   ASSERT (tc->state >= TCP_STATE_ESTABLISHED);
   ASSERT (max_bytes != 0);
 
-  if (tcp_opts_sack_permitted (&tc->opt))
-    {
-      /* XXX get first hole not retransmitted yet  */
-      hole = scoreboard_first_hole (&tc->sack_sb);
-      if (!hole)
-       return 0;
-
-      offset = hole->start - tc->snd_una;
-      hole_size = hole->end - hole->start;
+  max_bytes = clib_min (tc->snd_mss, max_bytes);
 
-      ASSERT (hole_size);
+  /* Start is beyond snd_congestion */
+  if (seq_geq (tc->snd_una + offset, tc->snd_congestion))
+    goto done;
 
-      if (hole_size < max_bytes)
-       max_bytes = hole_size;
-    }
-  else
+  /* Don't overshoot snd_congestion */
+  if (seq_gt (tc->snd_nxt + max_bytes, tc->snd_congestion))
     {
-      if (seq_geq (tc->snd_nxt, tc->snd_una_max))
-       return 0;
+      max_bytes = tc->snd_congestion - tc->snd_nxt;
+      if (max_bytes == 0)
+       goto done;
     }
 
+  ASSERT (max_bytes <= tc->snd_mss);
+
   n_bytes = stream_session_peek_bytes (&tc->connection,
                                       vlib_buffer_get_current (b), offset,
                                       max_bytes);
   ASSERT (n_bytes != 0);
-
+  b->current_length = n_bytes;
   tcp_push_hdr_i (tc, b, tc->state);
 
+done:
+  TCP_EVT_DBG (TCP_EVT_CC_RTX, tc, offset, n_bytes);
   return n_bytes;
 }
 
+/**
+ * Reset congestion control, switch cwnd to loss window and try again.
+ */
+static void
+tcp_rtx_timeout_cc_recover (tcp_connection_t * tc)
+{
+  /* Cleanly recover cc (also clears up fast retransmit) */
+  if (tcp_in_fastrecovery (tc))
+    {
+      tcp_cc_recover (tc);
+    }
+  else
+    {
+      tc->ssthresh = clib_max (tcp_flight_size (tc) / 2, 2 * tc->snd_mss);
+    }
+
+  /* Start again from the beginning */
+  tc->cwnd = tcp_loss_wnd (tc);
+  tc->snd_congestion = tc->snd_una_max;
+}
+
 static void
 tcp_timer_retransmit_handler_i (u32 index, u8 is_syn)
 {
   tcp_main_t *tm = vnet_get_tcp_main ();
-  vlib_main_t *vm = tm->vlib_main;
+  vlib_main_t *vm = vlib_get_main ();
   u32 thread_index = os_get_cpu_number ();
   tcp_connection_t *tc;
   vlib_buffer_t *b;
-  u32 bi, max_bytes, snd_space;
+  u32 bi, snd_space, n_bytes;
 
   if (is_syn)
     {
@@ -998,26 +1046,43 @@ tcp_timer_retransmit_handler_i (u32 index, u8 is_syn)
 
   if (tc->state >= TCP_STATE_ESTABLISHED)
     {
-      tcp_fastrecovery_off (tc);
+      /* First retransmit timeout */
+      if (tc->rto_boff == 1)
+       tcp_rtx_timeout_cc_recover (tc);
 
       /* Exponential backoff */
       tc->rto = clib_min (tc->rto << 1, TCP_RTO_MAX);
 
       /* Figure out what and how many bytes we can send */
       snd_space = tcp_available_snd_space (tc);
-      max_bytes = clib_min (tc->snd_mss, snd_space);
 
-      if (max_bytes == 0)
+      TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 1);
+
+      if (snd_space == 0)
        {
          clib_warning ("no wnd to retransmit");
+         tcp_return_buffer (tm);
+
+         /* Force one segment */
+         tcp_retransmit_first_unacked (tc);
+
+         /* Re-enable retransmit timer. Output may be unwilling
+          * to do it for us */
+         tcp_retransmit_timer_set (tc);
+
          return;
        }
-      tcp_prepare_retransmit_segment (tc, b, max_bytes);
+      else
+       {
+         /* No fancy recovery for now! */
+         n_bytes = tcp_prepare_retransmit_segment (tc, b, 0, snd_space);
+         scoreboard_clear (&tc->sack_sb);
 
-      tc->rtx_bytes += max_bytes;
+         if (n_bytes == 0)
+           return;
 
-      /* No fancy recovery for now! */
-      scoreboard_clear (&tc->sack_sb);
+         tc->rtx_bytes += n_bytes;
+       }
     }
   else
     {
@@ -1072,63 +1137,110 @@ tcp_timer_retransmit_syn_handler (u32 index)
 }
 
 /**
- * Retansmit first unacked segment */
+ * Retransmit first unacked segment
+ */
 void
 tcp_retransmit_first_unacked (tcp_connection_t * tc)
 {
   tcp_main_t *tm = vnet_get_tcp_main ();
-  u32 snd_nxt = tc->snd_nxt;
+  vlib_main_t *vm = vlib_get_main ();
   vlib_buffer_t *b;
-  u32 bi;
+  u32 bi, n_bytes;
 
   tc->snd_nxt = tc->snd_una;
 
   /* Get buffer */
   tcp_get_free_buffer_index (tm, &bi);
-  b = vlib_get_buffer (tm->vlib_main, bi);
+  b = vlib_get_buffer (vm, bi);
+
+  TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 2);
 
-  tcp_prepare_retransmit_segment (tc, b, tc->snd_mss);
-  tcp_enqueue_to_output (tm->vlib_main, b, bi, tc->c_is_ip4);
+  n_bytes = tcp_prepare_retransmit_segment (tc, b, 0, tc->snd_mss);
+  if (n_bytes == 0)
+    return;
 
-  tc->snd_nxt = snd_nxt;
-  tc->rtx_bytes += tc->snd_mss;
+  tcp_enqueue_to_output (vm, b, bi, tc->c_is_ip4);
+  tc->rtx_bytes += n_bytes;
+}
+
+sack_scoreboard_hole_t *
+scoreboard_first_rtx_hole (sack_scoreboard_t * sb)
+{
+  sack_scoreboard_hole_t *hole = 0;
+
+//  hole = scoreboard_first_hole (&tc->sack_sb);
+//  if (hole)
+//    {
+//
+//      offset = hole->start - tc->snd_una;
+//      hole_size = hole->end - hole->start;
+//
+//      ASSERT(hole_size);
+//
+//      if (hole_size < max_bytes)
+//      max_bytes = hole_size;
+//    }
+  return hole;
 }
 
+/**
+ * Do fast retransmit.
+ */
 void
 tcp_fast_retransmit (tcp_connection_t * tc)
 {
   tcp_main_t *tm = vnet_get_tcp_main ();
-  u32 snd_space, max_bytes, n_bytes, bi;
+  vlib_main_t *vm = vlib_get_main ();
+  u32 bi;
+  int snd_space;
+  u32 n_written = 0, offset = 0;
   vlib_buffer_t *b;
+  u8 use_sacks = 0;
 
   ASSERT (tcp_in_fastrecovery (tc));
 
-  clib_warning ("fast retransmit!");
-
   /* Start resending from first un-acked segment */
   tc->snd_nxt = tc->snd_una;
 
   snd_space = tcp_available_snd_space (tc);
+  TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 0);
+
+  /* If we have SACKs use them */
+  if (tcp_opts_sack_permitted (&tc->opt)
+      && scoreboard_first_hole (&tc->sack_sb))
+    use_sacks = 0;
 
-  while (snd_space)
+  while (snd_space > 0)
     {
       tcp_get_free_buffer_index (tm, &bi);
-      b = vlib_get_buffer (tm->vlib_main, bi);
+      b = vlib_get_buffer (vm, bi);
+
+      if (use_sacks)
+       {
+         scoreboard_first_rtx_hole (&tc->sack_sb);
+       }
+      else
+       {
+         offset += n_written;
+       }
 
-      max_bytes = clib_min (tc->snd_mss, snd_space);
-      n_bytes = tcp_prepare_retransmit_segment (tc, b, max_bytes);
+      n_written = tcp_prepare_retransmit_segment (tc, b, offset, snd_space);
 
       /* Nothing left to retransmit */
-      if (n_bytes == 0)
-       return;
-
-      tcp_enqueue_to_output (tm->vlib_main, b, bi, tc->c_is_ip4);
+      if (n_written == 0)
+       {
+         tcp_return_buffer (tm);
+         break;
+       }
 
-      snd_space -= n_bytes;
+      tcp_enqueue_to_output (vm, b, bi, tc->c_is_ip4);
+      tc->rtx_bytes += n_written;
+      snd_space -= n_written;
     }
 
-  /* If window allows, send new data */
-  tc->snd_nxt = tc->snd_una_max;
+  /* If window allows, send 1 SMSS of new data */
+  if (seq_lt (tc->snd_nxt, tc->snd_congestion))
+    tc->snd_nxt = tc->snd_congestion;
 }
 
 always_inline u32
@@ -1209,8 +1321,6 @@ tcp46_output_inline (vlib_main_t * vm,
          if (PREDICT_FALSE
              (vnet_buffer (b0)->tcp.flags & TCP_BUF_FLAG_DUPACK))
            {
-             ASSERT (tc0->snt_dupacks > 0);
-             tc0->snt_dupacks--;
              if (!tcp_session_has_ooo_data (tc0))
                {
                  error0 = TCP_ERROR_FILTERED_DUPACKS;
@@ -1223,8 +1333,7 @@ tcp46_output_inline (vlib_main_t * vm,
          tc0->rcv_las = tc0->rcv_nxt;
 
          /* Stop DELACK timer and fix flags */
-         tc0->flags &=
-           ~(TCP_CONN_SNDACK | TCP_CONN_DELACK | TCP_CONN_BURSTACK);
+         tc0->flags &= ~(TCP_CONN_SNDACK);
          if (tcp_timer_is_active (tc0, TCP_TIMER_DELACK))
            {
              tcp_timer_reset (tc0, TCP_TIMER_DELACK);
index 866c5fd..4f28cf3 100644 (file)
@@ -137,7 +137,7 @@ enum
 typedef struct _sack_block
 {
   u32 start;           /**< Start sequence number */
-  u32 end;             /**< End sequence number */
+  u32 end;             /**< End sequence number (first outside) */
 } sack_block_t;
 
 typedef struct
diff --git a/src/vnet/tcp/tcp_test.c b/src/vnet/tcp/tcp_test.c
new file mode 100644 (file)
index 0000000..0725bb0
--- /dev/null
@@ -0,0 +1,216 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vnet/tcp/tcp.h>
+
+#define TCP_TEST_I(_cond, _comment, _args...)                  \
+({                                                             \
+  int _evald = (_cond);                                                \
+  if (!(_evald)) {                                             \
+    fformat(stderr, "FAIL:%d: " _comment "\n",                 \
+           __LINE__, ##_args);                                 \
+  } else {                                                     \
+    fformat(stderr, "PASS:%d: " _comment "\n",                 \
+           __LINE__, ##_args);                                 \
+  }                                                            \
+  _evald;                                                      \
+})
+
+#define TCP_TEST(_cond, _comment, _args...)                    \
+{                                                              \
+    if (!TCP_TEST_I(_cond, _comment, ##_args)) {               \
+       return 1;                                               \
+    }                                                          \
+}
+
+static int
+tcp_test_sack ()
+{
+  tcp_connection_t _tc, *tc = &_tc;
+  sack_scoreboard_t *sb = &tc->sack_sb;
+  sack_block_t *sacks = 0, block;
+  sack_scoreboard_hole_t *hole;
+  int i;
+
+  memset (tc, 0, sizeof (*tc));
+
+  tc->snd_una = 0;
+  tc->snd_una_max = 1000;
+  tc->snd_nxt = 1000;
+  tc->opt.flags |= TCP_OPTS_FLAG_SACK;
+  scoreboard_init (&tc->sack_sb);
+
+  for (i = 0; i < 1000 / 100; i++)
+    {
+      block.start = i * 100;
+      block.end = (i + 1) * 100;
+      vec_add1 (sacks, block);
+    }
+
+  /*
+   * Inject even blocks
+   */
+
+  for (i = 0; i < 1000 / 200; i++)
+    {
+      vec_add1 (tc->opt.sacks, sacks[i * 2]);
+    }
+  tc->opt.n_sack_blocks = vec_len (tc->opt.sacks);
+  tcp_rcv_sacks (tc, 0);
+
+  TCP_TEST ((pool_elts (sb->holes) == 5),
+           "scoreboard has %d elements", pool_elts (sb->holes));
+
+  /* First SACK block should be rejected */
+  hole = scoreboard_first_hole (sb);
+  TCP_TEST ((hole->start == 0 && hole->end == 200),
+           "first hole start %u end %u", hole->start, hole->end);
+  hole = scoreboard_last_hole (sb);
+  TCP_TEST ((hole->start == 900 && hole->end == 1000),
+           "last hole start %u end %u", hole->start, hole->end);
+  TCP_TEST ((sb->sacked_bytes == 400), "sacked bytes %d", sb->sacked_bytes);
+  TCP_TEST ((sb->snd_una_adv == 0), "snd_una_adv %u", sb->snd_una_adv);
+  TCP_TEST ((sb->last_sacked_bytes == 400),
+           "last sacked bytes %d", sb->last_sacked_bytes);
+
+  /*
+   * Inject odd blocks
+   */
+
+  vec_reset_length (tc->opt.sacks);
+  for (i = 0; i < 1000 / 200; i++)
+    {
+      vec_add1 (tc->opt.sacks, sacks[i * 2 + 1]);
+    }
+  tc->opt.n_sack_blocks = vec_len (tc->opt.sacks);
+  tcp_rcv_sacks (tc, 0);
+
+  hole = scoreboard_first_hole (sb);
+  TCP_TEST ((pool_elts (sb->holes) == 1),
+           "scoreboard has %d holes", pool_elts (sb->holes));
+  TCP_TEST ((hole->start == 0 && hole->end == 100),
+           "first hole start %u end %u", hole->start, hole->end);
+  TCP_TEST ((sb->sacked_bytes == 900), "sacked bytes %d", sb->sacked_bytes);
+  TCP_TEST ((sb->snd_una_adv == 0), "snd_una_adv %u", sb->snd_una_adv);
+  TCP_TEST ((sb->max_byte_sacked == 1000),
+           "max sacked byte %u", sb->max_byte_sacked);
+  TCP_TEST ((sb->last_sacked_bytes == 500),
+           "last sacked bytes %d", sb->last_sacked_bytes);
+
+  /*
+   *  Ack until byte 100, all bytes are now acked + sacked
+   */
+  tcp_rcv_sacks (tc, 100);
+
+  TCP_TEST ((pool_elts (sb->holes) == 0),
+           "scoreboard has %d elements", pool_elts (sb->holes));
+  TCP_TEST ((sb->snd_una_adv == 900),
+           "snd_una_adv after ack %u", sb->snd_una_adv);
+  TCP_TEST ((sb->max_byte_sacked == 1000),
+           "max sacked byte %u", sb->max_byte_sacked);
+  TCP_TEST ((sb->sacked_bytes == 0), "sacked bytes %d", sb->sacked_bytes);
+  TCP_TEST ((sb->last_sacked_bytes == 0),
+           "last sacked bytes %d", sb->last_sacked_bytes);
+
+  /*
+   * Add new block
+   */
+
+  vec_reset_length (tc->opt.sacks);
+
+  block.start = 1200;
+  block.end = 1300;
+  vec_add1 (tc->opt.sacks, block);
+
+  tc->snd_una_max = 1500;
+  tc->snd_una = 1000;
+  tc->snd_nxt = 1500;
+  tcp_rcv_sacks (tc, 1000);
+
+  TCP_TEST ((sb->snd_una_adv == 0),
+           "snd_una_adv after ack %u", sb->snd_una_adv);
+  TCP_TEST ((pool_elts (sb->holes) == 2),
+           "scoreboard has %d holes", pool_elts (sb->holes));
+  hole = scoreboard_first_hole (sb);
+  TCP_TEST ((hole->start == 1000 && hole->end == 1200),
+           "first hole start %u end %u", hole->start, hole->end);
+  hole = scoreboard_last_hole (sb);
+  TCP_TEST ((hole->start == 1300 && hole->end == 1500),
+           "last hole start %u end %u", hole->start, hole->end);
+  TCP_TEST ((sb->sacked_bytes == 100), "sacked bytes %d", sb->sacked_bytes);
+
+  /*
+   * Ack first hole
+   */
+
+  vec_reset_length (tc->opt.sacks);
+  tcp_rcv_sacks (tc, 1200);
+
+  TCP_TEST ((sb->snd_una_adv == 100),
+           "snd_una_adv after ack %u", sb->snd_una_adv);
+  TCP_TEST ((sb->sacked_bytes == 0), "sacked bytes %d", sb->sacked_bytes);
+  TCP_TEST ((pool_elts (sb->holes) == 1),
+           "scoreboard has %d elements", pool_elts (sb->holes));
+
+  /*
+   * Remove all
+   */
+
+  scoreboard_clear (sb);
+  TCP_TEST ((pool_elts (sb->holes) == 0),
+           "number of holes %d", pool_elts (sb->holes));
+  return 0;
+}
+
+static clib_error_t *
+tcp_test (vlib_main_t * vm,
+         unformat_input_t * input, vlib_cli_command_t * cmd_arg)
+{
+  int res = 0;
+
+  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (input, "sack"))
+       {
+         res = tcp_test_sack ();
+       }
+      else
+       {
+         return clib_error_return (0, "unknown input `%U'",
+                                   format_unformat_error, input);
+       }
+    }
+
+  if (res)
+    {
+      return clib_error_return (0, "TCP unit test failed");
+    }
+  else
+    {
+      return 0;
+    }
+}
+
+VLIB_CLI_COMMAND (tcp_test_command, static) =
+{
+.path = "test tcp",.short_help = "internal tcp unit tests",.function =
+    tcp_test,};
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
index 46c8e73..57f774c 100644 (file)
@@ -39,10 +39,10 @@ builtin_session_disconnect_callback (stream_session_t * s)
 }
 
 static int
-builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * ep)
+builtin_server_rx_callback (stream_session_t * s)
 {
   svm_fifo_t *rx_fifo, *tx_fifo;
-  u32 this_transfer;
+  u32 this_transfer, max_deq, max_enq;
   int actual_transfer;
   u8 *my_copy_buffer;
   session_fifo_event_t evt;
@@ -52,9 +52,9 @@ builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * ep)
   rx_fifo = s->server_rx_fifo;
   tx_fifo = s->server_tx_fifo;
 
-  this_transfer = svm_fifo_max_enqueue (tx_fifo)
-    < svm_fifo_max_dequeue (rx_fifo) ?
-    svm_fifo_max_enqueue (tx_fifo) : svm_fifo_max_dequeue (rx_fifo);
+  max_deq = svm_fifo_max_dequeue (rx_fifo);
+  max_enq = svm_fifo_max_enqueue (tx_fifo);
+  this_transfer = max_enq < max_deq ? max_enq : max_deq;
 
   vec_validate (my_copy_buffer, this_transfer - 1);
   _vec_len (my_copy_buffer) = this_transfer;
@@ -64,17 +64,20 @@ builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * ep)
   ASSERT (actual_transfer == this_transfer);
   actual_transfer = svm_fifo_enqueue_nowait (tx_fifo, 0, this_transfer,
                                             my_copy_buffer);
+  ASSERT (actual_transfer == this_transfer);
 
   copy_buffers[s->thread_index] = my_copy_buffer;
 
-  /* Fabricate TX event, send to ourselves */
-  evt.fifo = tx_fifo;
-  evt.event_type = FIFO_EVENT_SERVER_TX;
-  /* $$$$ for event logging */
-  evt.enqueue_length = actual_transfer;
-  evt.event_id = 0;
-  q = session_manager_get_vpp_event_queue (s->thread_index);
-  unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
+  if (svm_fifo_set_event (tx_fifo))
+    {
+      /* Fabricate TX event, send to ourselves */
+      evt.fifo = tx_fifo;
+      evt.event_type = FIFO_EVENT_SERVER_TX;
+      evt.event_id = 0;
+      q = session_manager_get_vpp_event_queue (s->thread_index);
+      unix_shared_memory_queue_add (q, (u8 *) & evt,
+                                   0 /* do wait for mutex */ );
+    }
 
   return 0;
 }
index 8827873..4b22109 100644 (file)
@@ -244,44 +244,53 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
       /* Get session's server */
       server0 = application_get (s0->app_index);
 
-      /* Fabricate event */
-      evt.fifo = s0->server_rx_fifo;
-      evt.event_type = FIFO_EVENT_SERVER_RX;
-      evt.event_id = serial_number++;
-      evt.enqueue_length = svm_fifo_max_dequeue (s0->server_rx_fifo);
-
       /* Built-in server? Deliver the goods... */
       if (server0->cb_fns.builtin_server_rx_callback)
        {
-         server0->cb_fns.builtin_server_rx_callback (s0, &evt);
+         server0->cb_fns.builtin_server_rx_callback (s0);
          continue;
        }
 
-      /* Add event to server's event queue */
-      q = server0->event_queue;
-
-      /* Don't block for lack of space */
-      if (PREDICT_TRUE (q->cursize < q->maxsize))
-       unix_shared_memory_queue_add (server0->event_queue, (u8 *) & evt,
-                                     0 /* do wait for mutex */ );
-      else
+      if (svm_fifo_set_event (s0->server_rx_fifo))
        {
-         vlib_node_increment_counter (vm, udp4_uri_input_node.index,
-                                      SESSION_ERROR_FIFO_FULL, 1);
+         /* Fabricate event */
+         evt.fifo = s0->server_rx_fifo;
+         evt.event_type = FIFO_EVENT_SERVER_RX;
+         evt.event_id = serial_number++;
+
+         /* Add event to server's event queue */
+         q = server0->event_queue;
+
+         /* Don't block for lack of space */
+         if (PREDICT_TRUE (q->cursize < q->maxsize))
+           {
+             unix_shared_memory_queue_add (server0->event_queue,
+                                           (u8 *) & evt,
+                                           0 /* do wait for mutex */ );
+           }
+         else
+           {
+             vlib_node_increment_counter (vm, udp4_uri_input_node.index,
+                                          SESSION_ERROR_FIFO_FULL, 1);
+           }
        }
+      /* *INDENT-OFF* */
       if (1)
        {
          ELOG_TYPE_DECLARE (e) =
          {
-         .format = "evt-enqueue: id %d length %d",.format_args = "i4i4",};
+             .format = "evt-enqueue: id %d length %d",
+             .format_args = "i4i4",};
          struct
          {
            u32 data[2];
          } *ed;
          ed = ELOG_DATA (&vlib_global_main.elog_main, e);
          ed->data[0] = evt.event_id;
-         ed->data[1] = evt.enqueue_length;
+         ed->data[1] = svm_fifo_max_dequeue (s0->server_rx_fifo);
        }
+      /* *INDENT-ON* */
+
     }
 
   vec_reset_length (session_indices_to_enqueue);