session tls: support tls descheduling
[vpp.git] / src / vnet / session / session_node.c
index 5f76578..f023a95 100644 (file)
@@ -50,6 +50,8 @@ session_mq_listen_handler (void *data)
   clib_memset (a, 0, sizeof (*a));
   a->sep.is_ip4 = mp->is_ip4;
   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
+  if (mp->is_ip4)
+    ip46_address_mask_ip4 (&a->sep.ip);
   a->sep.port = mp->port;
   a->sep.fib_index = mp->vrf;
   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
@@ -113,11 +115,17 @@ session_mq_connect_handler (void *data)
   a->sep.transport_proto = mp->proto;
   a->sep.peer.fib_index = mp->vrf;
   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
+  if (mp->is_ip4)
+    {
+      ip46_address_mask_ip4 (&a->sep.ip);
+      ip46_address_mask_ip4 (&a->sep.peer.ip);
+    }
+  a->sep.peer.port = mp->lcl_port;
   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
   a->sep_ext.parent_handle = mp->parent_handle;
   a->sep_ext.ckpair_index = mp->ckpair_index;
   a->sep_ext.crypto_engine = mp->crypto_engine;
-  a->sep_ext.flags = mp->flags;
+  a->sep_ext.transport_flags = mp->flags;
   if (mp->hostname_len)
     {
       vec_validate (a->sep_ext.hostname, mp->hostname_len - 1);
@@ -131,8 +139,7 @@ session_mq_connect_handler (void *data)
     {
       clib_warning ("connect returned: %U", format_vnet_api_errno, rv);
       app_wrk = application_get_worker (app, mp->wrk_index);
-      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0,
-                                   /* is_fail */ 1);
+      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
     }
 
   vec_free (a->sep_ext.hostname);
@@ -161,8 +168,7 @@ session_mq_connect_uri_handler (void *data)
     {
       clib_warning ("connect_uri returned: %d", rv);
       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
-      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0,
-                                   /* is_fail */ 1);
+      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
     }
 }
 
@@ -567,7 +573,7 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
   b->total_length_not_including_first_buffer = 0;
 
   chain_b = b;
-  left_from_seg = clib_min (ctx->snd_mss - b->current_length,
+  left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
                            ctx->left_to_snd);
   to_deq = left_from_seg;
   for (j = 1; j < ctx->n_bufs_per_seg; j++)
@@ -583,8 +589,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
       if (peek_data)
        {
          n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
-                                       ctx->tx_offset, len_to_deq, data);
-         ctx->tx_offset += n_bytes_read;
+                                       ctx->sp.tx_offset, len_to_deq, data);
+         ctx->sp.tx_offset += n_bytes_read;
        }
       else
        {
@@ -651,12 +657,12 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
 
   if (peek_data)
     {
-      n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->tx_offset,
+      n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
                                    len_to_deq, data0);
       ASSERT (n_bytes_read > 0);
       /* Keep track of progress locally, transport is also supposed to
        * increment it independently when pushing the header */
-      ctx->tx_offset += n_bytes_read;
+      ctx->sp.tx_offset += n_bytes_read;
     }
   else
     {
@@ -756,13 +762,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
   if (peek_data)
     {
       /* Offset in rx fifo from where to peek data */
-      ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc);
-      if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue))
+      if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
        {
          ctx->max_len_to_snd = 0;
          return;
        }
-      ctx->max_dequeue -= ctx->tx_offset;
+      ctx->max_dequeue -= ctx->sp.tx_offset;
     }
   else
     {
@@ -782,34 +787,34 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
   ASSERT (ctx->max_dequeue > 0);
 
   /* Ensure we're not writing more than transport window allows */
-  if (ctx->max_dequeue < ctx->snd_space)
+  if (ctx->max_dequeue < ctx->sp.snd_space)
     {
       /* Constrained by tx queue. Try to send only fully formed segments */
-      ctx->max_len_to_snd =
-       (ctx->max_dequeue > ctx->snd_mss) ?
-       ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue;
+      ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
+       (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
+       ctx->max_dequeue;
       /* TODO Nagle ? */
     }
   else
     {
       /* Expectation is that snd_space0 is already a multiple of snd_mss */
-      ctx->max_len_to_snd = ctx->snd_space;
+      ctx->max_len_to_snd = ctx->sp.snd_space;
     }
 
   /* Check if we're tx constrained by the node */
-  ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss);
+  ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
   if (ctx->n_segs_per_evt > max_segs)
     {
       ctx->n_segs_per_evt = max_segs;
-      ctx->max_len_to_snd = max_segs * ctx->snd_mss;
+      ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
     }
 
   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
-  n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->snd_mss;
+  n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
   ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
-  ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf);
-  ctx->deq_per_first_buf = clib_min (ctx->snd_mss,
+  ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
+  ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
                                     n_bytes_per_buf -
                                     TRANSPORT_MAX_HDRS_LEN);
 }
@@ -817,12 +822,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
 always_inline void
 session_tx_maybe_reschedule (session_worker_t * wrk,
                             session_tx_context_t * ctx,
-                            session_evt_elt_t * elt, u8 is_peek)
+                            session_evt_elt_t * elt)
 {
   session_t *s = ctx->s;
 
   svm_fifo_unset_event (s->tx_fifo);
-  if (svm_fifo_max_dequeue_cons (s->tx_fifo) > is_peek ? ctx->tx_offset : 0)
+  if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
     if (svm_fifo_set_event (s->tx_fifo))
       session_evt_add_head_old (wrk, elt);
 }
@@ -867,33 +872,37 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
     {
       u32 n_custom_tx;
       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
-      n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, max_burst);
+      ctx->sp.max_burst_size = max_burst;
+      n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
       *n_tx_packets += n_custom_tx;
       if (PREDICT_FALSE
          (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
        return SESSION_TX_OK;
       max_burst -= n_custom_tx;
-      if (!max_burst)
+      if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
        {
          session_evt_add_old (wrk, elt);
          return SESSION_TX_OK;
        }
     }
 
-  ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
-  if (PREDICT_FALSE (ctx->snd_mss == 0))
-    {
-      session_evt_add_old (wrk, elt);
-      return SESSION_TX_NO_DATA;
-    }
-
-  ctx->snd_space = transport_connection_snd_space (ctx->tc);
+  transport_connection_snd_params (ctx->tc, &ctx->sp);
 
-  /* This flow queue is "empty" so it should be re-evaluated before
-   * the ones that have data to send. */
-  if (!ctx->snd_space)
+  if (!ctx->sp.snd_space)
     {
-      session_evt_add_head_old (wrk, elt);
+      /* If the deschedule flag was set, remove session from scheduler.
+       * Transport is responsible for rescheduling this session. */
+      if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
+       transport_connection_deschedule (ctx->tc);
+      /* Request to postpone the session, e.g., zero-wnd and transport
+       * is not currently probing */
+      else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
+       session_evt_add_old (wrk, elt);
+      /* This flow queue is "empty" so it should be re-evaluated before
+       * the ones that have data to send. */
+      else
+       session_evt_add_head_old (wrk, elt);
+
       return SESSION_TX_NO_DATA;
     }
 
@@ -905,9 +914,9 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
          session_evt_add_head_old (wrk, elt);
          return SESSION_TX_NO_DATA;
        }
-      snd_space = clib_min (ctx->snd_space, snd_space);
-      ctx->snd_space = snd_space >= ctx->snd_mss ?
-       snd_space - snd_space % ctx->snd_mss : snd_space;
+      snd_space = clib_min (ctx->sp.snd_space, snd_space);
+      ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
+       snd_space - snd_space % ctx->sp.snd_mss : snd_space;
     }
 
   /* Check how much we can pull. */
@@ -916,7 +925,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
   if (PREDICT_FALSE (!ctx->max_len_to_snd))
     {
       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
-      session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
+      session_tx_maybe_reschedule (wrk, ctx, elt);
       return SESSION_TX_NO_DATA;
     }
 
@@ -1019,7 +1028,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
   if (ctx->max_len_to_snd < ctx->max_dequeue)
     session_evt_add_old (wrk, elt);
   else
-    session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
+    session_tx_maybe_reschedule (wrk, ctx, elt);
 
   if (!peek_data
       && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
@@ -1058,15 +1067,37 @@ session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
 int
 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
                                  vlib_node_runtime_t * node,
-                                 session_evt_elt_t * e, int *n_tx_packets)
+                                 session_evt_elt_t * elt, int *n_tx_packets)
 {
+  transport_send_params_t *sp = &wrk->ctx.sp;
   session_t *s = wrk->ctx.s;
+  u32 n_packets;
 
   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
     return 0;
-  svm_fifo_unset_event (s->tx_fifo);
-  return transport_custom_tx (session_get_transport_proto (s), s,
-                             VLIB_FRAME_SIZE - *n_tx_packets);
+
+  /* Clear custom-tx flag used to request reschedule for tx */
+  s->flags &= ~SESSION_F_CUSTOM_TX;
+
+  sp->max_burst_size = clib_min (VLIB_FRAME_SIZE - *n_tx_packets,
+                                TRANSPORT_PACER_MAX_BURST_PKTS);
+
+  n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
+  *n_tx_packets += n_packets;
+
+  if (s->flags & SESSION_F_CUSTOM_TX)
+    {
+      session_evt_add_old (wrk, elt);
+    }
+  else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
+    {
+      svm_fifo_unset_event (s->tx_fifo);
+      if (svm_fifo_max_dequeue_cons (s->tx_fifo))
+       if (svm_fifo_set_event (s->tx_fifo))
+         session_evt_add_head_old (wrk, elt);
+    }
+
+  return n_packets;
 }
 
 always_inline session_t *
@@ -1561,33 +1592,40 @@ session_queue_exit (vlib_main_t * vm)
 
 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
 
+static uword
+session_queue_run_on_main (vlib_main_t * vm)
+{
+  vlib_node_runtime_t *node;
+
+  node = vlib_node_get_runtime (vm, session_queue_node.index);
+  return session_queue_node_fn (vm, node, 0);
+}
+
 static uword
 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
                       vlib_frame_t * f)
 {
-  f64 now, timeout = 1.0;
   uword *event_data = 0;
+  f64 timeout = 1.0;
   uword event_type;
 
   while (1)
     {
       vlib_process_wait_for_event_or_clock (vm, timeout);
-      now = vlib_time_now (vm);
       event_type = vlib_process_get_events (vm, (uword **) & event_data);
 
       switch (event_type)
        {
-       case SESSION_Q_PROCESS_FLUSH_FRAMES:
-         /* Flush the frames by updating all transports times */
-         transport_update_time (now, 0);
+       case SESSION_Q_PROCESS_RUN_ON_MAIN:
+         /* Run session queue node on main thread */
+         session_queue_run_on_main (vm);
          break;
        case SESSION_Q_PROCESS_STOP:
          timeout = 100000.0;
          break;
        case ~0:
-         /* Timed out. Update time for all transports to trigger all
-          * outstanding retransmits. */
-         transport_update_time (now, 0);
+         /* Timed out. Run on main to ensure all events are handled */
+         session_queue_run_on_main (vm);
          break;
        }
       vec_reset_length (event_data);