tcp: reuse session infra for syns and resets 22/26222/4
authorFlorin Coras <fcoras@cisco.com>
Fri, 27 Mar 2020 23:55:06 +0000 (23:55 +0000)
committerDave Barach <openvpp@barachs.net>
Mon, 30 Mar 2020 20:34:48 +0000 (20:34 +0000)
Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I71df27049ef0193578f0c42f8f8bbd5c54e4d53e

src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp.h
src/vnet/tcp/tcp_output.c

index fa4cd85..ce3f581 100644 (file)
@@ -1548,11 +1548,11 @@ listen_session_get_transport (session_t * s)
 }
 
 void
-session_flush_frames_main_thread (vlib_main_t * vm)
+session_queue_run_on_main_thread (vlib_main_t * vm)
 {
   ASSERT (vlib_get_thread_index () == 0);
   vlib_process_signal_event_mt (vm, session_queue_process_node.index,
-                               SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
+                               SESSION_Q_PROCESS_RUN_ON_MAIN, 0);
 }
 
 static clib_error_t *
index d72763a..681b42d 100644 (file)
@@ -202,8 +202,11 @@ extern vlib_node_registration_t session_queue_node;
 extern vlib_node_registration_t session_queue_process_node;
 extern vlib_node_registration_t session_queue_pre_input_node;
 
-#define SESSION_Q_PROCESS_FLUSH_FRAMES 1
-#define SESSION_Q_PROCESS_STOP         2
+typedef enum session_q_process_evt_
+{
+  SESSION_Q_PROCESS_RUN_ON_MAIN = 1,
+  SESSION_Q_PROCESS_STOP
+} session_q_process_evt_t;
 
 #define TRANSPORT_PROTO_INVALID (session_main.last_transport_proto_type + 1)
 #define TRANSPORT_N_PROTOS (session_main.last_transport_proto_type + 1)
@@ -641,14 +644,22 @@ do {                                                                      \
 
 int session_main_flush_enqueue_events (u8 proto, u32 thread_index);
 int session_main_flush_all_enqueue_events (u8 transport_proto);
-void session_flush_frames_main_thread (vlib_main_t * vm);
+void session_queue_run_on_main_thread (vlib_main_t * vm);
 
+/**
+ * Add session node pending buffer with custom node
+ *
+ * @param thread_index         worker thread expected to send the buffer
+ * @param bi           buffer index
+ * @param next_node    next node edge index for buffer. Edge to next node
+ *                     must exist
+ */
 always_inline void
-session_add_pending_tx_buffer (session_type_t st, u32 thread_index, u32 bi)
+session_add_pending_tx_buffer (u32 thread_index, u32 bi, u32 next_node)
 {
   session_worker_t *wrk = session_main_get_worker (thread_index);
   vec_add1 (wrk->pending_tx_buffers, bi);
-  vec_add1 (wrk->pending_tx_nexts, session_main.session_type_to_next[st]);
+  vec_add1 (wrk->pending_tx_nexts, next_node);
 }
 
 ssvm_private_t *session_main_get_evt_q_segment (void);
index b5f4321..2a4bb6b 100644 (file)
@@ -1563,33 +1563,40 @@ session_queue_exit (vlib_main_t * vm)
 
 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
 
+static uword
+session_queue_run_on_main (vlib_main_t * vm)
+{
+  vlib_node_runtime_t *node;
+
+  node = vlib_node_get_runtime (vm, session_queue_node.index);
+  return session_queue_node_fn (vm, node, 0);
+}
+
 static uword
 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
                       vlib_frame_t * f)
 {
-  f64 now, timeout = 1.0;
   uword *event_data = 0;
+  f64 timeout = 1.0;
   uword event_type;
 
   while (1)
     {
       vlib_process_wait_for_event_or_clock (vm, timeout);
-      now = vlib_time_now (vm);
       event_type = vlib_process_get_events (vm, (uword **) & event_data);
 
       switch (event_type)
        {
-       case SESSION_Q_PROCESS_FLUSH_FRAMES:
-         /* Flush the frames by updating all transports times */
-         transport_update_time (now, 0);
+       case SESSION_Q_PROCESS_RUN_ON_MAIN:
+         /* Run session queue node on main thread */
+         session_queue_run_on_main (vm);
          break;
        case SESSION_Q_PROCESS_STOP:
          timeout = 100000.0;
          break;
        case ~0:
-         /* Timed out. Update time for all transports to trigger all
-          * outstanding retransmits. */
-         transport_update_time (now, 0);
+         /* Timed out. Run on main to ensure all events are handled */
+         session_queue_run_on_main (vm);
          break;
        }
       vec_reset_length (event_data);
index 25a3a44..a1d774d 100644 (file)
@@ -1460,28 +1460,7 @@ tcp_dispatch_pending_timers (tcp_worker_ctx_t * wrk)
     }
 
   if (thread_index == 0 && clib_fifo_elts (wrk->pending_timers))
-    vlib_process_signal_event_mt (wrk->vm, session_queue_process_node.index,
-                                 SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
-}
-
-/**
- * Flush ip lookup tx frames populated by timer pops
- */
-static void
-tcp_flush_frames_to_output (tcp_worker_ctx_t * wrk)
-{
-  if (wrk->ip_lookup_tx_frames[0])
-    {
-      vlib_put_frame_to_node (wrk->vm, ip4_lookup_node.index,
-                             wrk->ip_lookup_tx_frames[0]);
-      wrk->ip_lookup_tx_frames[0] = 0;
-    }
-  if (wrk->ip_lookup_tx_frames[1])
-    {
-      vlib_put_frame_to_node (wrk->vm, ip6_lookup_node.index,
-                             wrk->ip_lookup_tx_frames[1]);
-      wrk->ip_lookup_tx_frames[1] = 0;
-    }
+    session_queue_run_on_main_thread (wrk->vm);
 }
 
 static void
@@ -1514,7 +1493,6 @@ tcp_update_time (f64 now, u8 thread_index)
   tcp_handle_cleanups (wrk, now);
   tw_timer_expire_timers_16t_2w_512sl (&wrk->timer_wheel, now);
   tcp_dispatch_pending_timers (wrk);
-  tcp_flush_frames_to_output (wrk);
 }
 
 static void
@@ -1629,8 +1607,7 @@ tcp_expired_timers_dispatch (u32 * expired_timers)
                                       max_per_loop);
 
   if (thread_index == 0)
-    vlib_process_signal_event_mt (wrk->vm, session_queue_process_node.index,
-                                 SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
+    session_queue_run_on_main_thread (wrk->vm);
 }
 
 static void
@@ -1691,6 +1668,12 @@ tcp_main_enable (vlib_main_t * vm)
   n_workers = num_threads == 1 ? 1 : vtm->n_threads;
   prealloc_conn_per_wrk = tcp_cfg.preallocated_connections / n_workers;
 
+  wrk = &tm->wrk_ctx[0];
+  wrk->tco_next_node[0] = vlib_node_get_next (vm, session_queue_node.index,
+                                             tcp4_output_node.index);
+  wrk->tco_next_node[1] = vlib_node_get_next (vm, session_queue_node.index,
+                                             tcp6_output_node.index);
+
   for (thread = 0; thread < num_threads; thread++)
     {
       wrk = &tm->wrk_ctx[thread];
@@ -1704,6 +1687,12 @@ tcp_main_enable (vlib_main_t * vm)
       wrk->vm = vlib_mains[thread];
       wrk->max_timers_per_loop = 10;
 
+      if (thread > 0)
+       {
+         wrk->tco_next_node[0] = tm->wrk_ctx[0].tco_next_node[0];
+         wrk->tco_next_node[1] = tm->wrk_ctx[0].tco_next_node[1];
+       }
+
       /*
        * Preallocate connections. Assume that thread 0 won't
        * use preallocated threads when running multi-core
@@ -1734,6 +1723,11 @@ tcp_main_enable (vlib_main_t * vm)
 
   tm->bytes_per_buffer = vlib_buffer_get_default_data_size (vm);
   tm->cc_last_type = TCP_CC_LAST;
+
+  tm->ipl_next_node[0] = vlib_node_get_next (vm, session_queue_node.index,
+                                            ip4_lookup_node.index);
+  tm->ipl_next_node[1] = vlib_node_get_next (vm, session_queue_node.index,
+                                            ip6_lookup_node.index);
   return error;
 }
 
index 361abe2..30c95a4 100644 (file)
@@ -559,8 +559,11 @@ typedef struct tcp_worker_ctx_
   /* Max timers to be handled per dispatch loop */
   u32 max_timers_per_loop;
 
-  /** tx frames for ip 4/6 lookup nodes */
-  vlib_frame_t *ip_lookup_tx_frames[2];
+  /** Session layer edge indices to tcp output */
+  u32 tco_next_node[2];
+
+  /* Fifo of pending timer expirations */
+  u32 *pending_timers;
 
     CLIB_CACHE_LINE_ALIGN_MARK (cacheline1);
 
@@ -570,9 +573,6 @@ typedef struct tcp_worker_ctx_
   /** tx buffer free list */
   u32 *tx_buffers;
 
-  /* Fifo of pending timer expirations */
-  u32 *pending_timers;
-
   /* fifo of pending free requests */
   tcp_cleanup_req_t *pending_cleanups;
 
@@ -679,6 +679,9 @@ typedef struct _tcp_main
   /** vlib buffer size */
   u32 bytes_per_buffer;
 
+  /** Session layer edge indices to ip lookup (syns, rst) */
+  u32 ipl_next_node[2];
+
   /** Dispatch table by state and flags */
   tcp_lookup_dispatch_t dispatch_table[TCP_N_STATES][64];
 
index aff2d93..d07fb2e 100644 (file)
@@ -620,13 +620,12 @@ tcp_make_synack (tcp_connection_t * tc, vlib_buffer_t * b)
   th->checksum = tcp_compute_checksum (tc, b);
 }
 
-always_inline void
-tcp_enqueue_to_ip_lookup_i (tcp_worker_ctx_t * wrk, vlib_buffer_t * b, u32 bi,
-                           u8 is_ip4, u32 fib_index, u8 flush)
+static void
+tcp_enqueue_to_ip_lookup (tcp_worker_ctx_t * wrk, vlib_buffer_t * b, u32 bi,
+                         u8 is_ip4, u32 fib_index)
 {
+  tcp_main_t *tm = &tcp_main;
   vlib_main_t *vm = wrk->vm;
-  u32 *to_next, next_index;
-  vlib_frame_t *f;
 
   b->flags |= VNET_BUFFER_F_LOCALLY_ORIGINATED;
   b->error = 0;
@@ -634,55 +633,24 @@ tcp_enqueue_to_ip_lookup_i (tcp_worker_ctx_t * wrk, vlib_buffer_t * b, u32 bi,
   vnet_buffer (b)->sw_if_index[VLIB_TX] = fib_index;
   vnet_buffer (b)->sw_if_index[VLIB_RX] = 0;
 
-  /* Send to IP lookup */
-  next_index = is_ip4 ? ip4_lookup_node.index : ip6_lookup_node.index;
   tcp_trajectory_add_start (b, 1);
 
-  f = wrk->ip_lookup_tx_frames[!is_ip4];
-  if (!f)
-    {
-      f = vlib_get_frame_to_node (vm, next_index);
-      ASSERT (f);
-      wrk->ip_lookup_tx_frames[!is_ip4] = f;
-    }
-
-  to_next = vlib_frame_vector_args (f);
-  to_next[f->n_vectors] = bi;
-  f->n_vectors += 1;
-  if (flush || f->n_vectors == VLIB_FRAME_SIZE)
-    {
-      vlib_put_frame_to_node (vm, next_index, f);
-      wrk->ip_lookup_tx_frames[!is_ip4] = 0;
-    }
-}
-
-static void
-tcp_enqueue_to_ip_lookup_now (tcp_worker_ctx_t * wrk, vlib_buffer_t * b,
-                             u32 bi, u8 is_ip4, u32 fib_index)
-{
-  tcp_enqueue_to_ip_lookup_i (wrk, b, bi, is_ip4, fib_index, 1);
-}
+  session_add_pending_tx_buffer (vm->thread_index, bi,
+                                tm->ipl_next_node[!is_ip4]);
 
-static void
-tcp_enqueue_to_ip_lookup (tcp_worker_ctx_t * wrk, vlib_buffer_t * b, u32 bi,
-                         u8 is_ip4, u32 fib_index)
-{
-  tcp_enqueue_to_ip_lookup_i (wrk, b, bi, is_ip4, fib_index, 0);
-  if (wrk->vm->thread_index == 0 && vlib_num_workers ())
-    session_flush_frames_main_thread (wrk->vm);
+  if (vm->thread_index == 0 && vlib_num_workers ())
+    session_queue_run_on_main_thread (wrk->vm);
 }
 
 static void
 tcp_enqueue_to_output (tcp_worker_ctx_t * wrk, vlib_buffer_t * b, u32 bi,
                       u8 is_ip4)
 {
-  session_type_t st;
-
   b->flags |= VNET_BUFFER_F_LOCALLY_ORIGINATED;
   b->error = 0;
 
-  st = session_type_from_proto_and_ip (TRANSPORT_PROTO_TCP, is_ip4);
-  session_add_pending_tx_buffer (st, wrk->vm->thread_index, bi);
+  session_add_pending_tx_buffer (wrk->vm->thread_index, bi,
+                                wrk->tco_next_node[!is_ip4]);
 }
 
 #endif /* CLIB_MARCH_VARIANT */
@@ -846,7 +814,7 @@ tcp_send_reset_w_pkt (tcp_connection_t * tc, vlib_buffer_t * pkt,
       ASSERT (!bogus);
     }
 
-  tcp_enqueue_to_ip_lookup_now (wrk, b, bi, is_ip4, fib_index);
+  tcp_enqueue_to_ip_lookup (wrk, b, bi, is_ip4, fib_index);
   TCP_EVT (TCP_EVT_RST_SENT, tc);
   vlib_node_increment_counter (vm, tcp_node_index (output, tc->c_is_ip4),
                               TCP_ERROR_RST_SENT, 1);