session: support half-close connection
[vpp.git] / src / vnet / session / session_node.c
index f3713d0..b68ff53 100644 (file)
@@ -24,6 +24,7 @@
 #include <vnet/session/application_local.h>
 #include <vnet/session/session_debug.h>
 #include <svm/queue.h>
+#include <sys/timerfd.h>
 
 #define app_check_thread_and_barrier(_fn, _arg)                                \
   if (!vlib_thread_is_main_w_barrier ())                               \
       return;                                                          \
    }
 
+static transport_endpt_ext_cfg_t *
+session_mq_get_ext_config (application_t *app, uword offset)
+{
+  svm_fifo_chunk_t *c;
+  fifo_segment_t *fs;
+
+  fs = application_get_rx_mqs_segment (app);
+  c = fs_chunk_ptr (fs->h, offset);
+  return (transport_endpt_ext_cfg_t *) c->data;
+}
+
+static void
+session_mq_free_ext_config (application_t *app, uword offset)
+{
+  svm_fifo_chunk_t *c;
+  fifo_segment_t *fs;
+
+  fs = application_get_rx_mqs_segment (app);
+  c = fs_chunk_ptr (fs->h, offset);
+  fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
+}
+
 static void
 session_mq_listen_handler (void *data)
 {
@@ -54,18 +77,21 @@ session_mq_listen_handler (void *data)
   a->sep.fib_index = mp->vrf;
   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
   a->sep.transport_proto = mp->proto;
-  a->sep_ext.ckpair_index = mp->ckpair_index;
-  a->sep_ext.crypto_engine = mp->crypto_engine;
   a->app_index = app->app_index;
   a->wrk_map_index = mp->wrk_index;
   a->sep_ext.transport_flags = mp->flags;
 
+  if (mp->ext_config)
+    a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
+
   if ((rv = vnet_listen (a)))
     clib_warning ("listen returned: %U", format_session_error, rv);
 
   app_wrk = application_get_worker (app, mp->wrk_index);
   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
-  return;
+
+  if (mp->ext_config)
+    session_mq_free_ext_config (app, mp->ext_config);
 }
 
 static void
@@ -122,18 +148,14 @@ session_mq_connect_handler (void *data)
   a->sep.peer.port = mp->lcl_port;
   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
   a->sep_ext.parent_handle = mp->parent_handle;
-  a->sep_ext.ckpair_index = mp->ckpair_index;
-  a->sep_ext.crypto_engine = mp->crypto_engine;
   a->sep_ext.transport_flags = mp->flags;
-  if (mp->hostname_len)
-    {
-      vec_validate (a->sep_ext.hostname, mp->hostname_len - 1);
-      clib_memcpy_fast (a->sep_ext.hostname, mp->hostname, mp->hostname_len);
-    }
   a->api_context = mp->context;
   a->app_index = app->app_index;
   a->wrk_map_index = mp->wrk_index;
 
+  if (mp->ext_config)
+    a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
+
   if ((rv = vnet_connect (a)))
     {
       clib_warning ("connect returned: %U", format_session_error, rv);
@@ -141,7 +163,8 @@ session_mq_connect_handler (void *data)
       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
     }
 
-  vec_free (a->sep_ext.hostname);
+  if (mp->ext_config)
+    session_mq_free_ext_config (app, mp->ext_config);
 }
 
 static void
@@ -171,6 +194,22 @@ session_mq_connect_uri_handler (void *data)
     }
 }
 
+static void
+session_mq_shutdown_handler (void *data)
+{
+  session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
+  vnet_shutdown_args_t _a, *a = &_a;
+  application_t *app;
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  a->app_index = app->app_index;
+  a->handle = mp->handle;
+  vnet_shutdown_session (a);
+}
+
 static void
 session_mq_disconnect_handler (void *data)
 {
@@ -519,6 +558,51 @@ session_mq_app_wrk_rpc_handler (void *data)
   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
 }
 
+static void
+session_mq_transport_attr_handler (void *data)
+{
+  session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
+  session_transport_attr_reply_msg_t *rmp;
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  app_worker_t *app_wrk;
+  session_event_t *evt;
+  application_t *app;
+  session_t *s;
+  int rv;
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  if (!(s = session_get_from_handle_if_valid (mp->handle)))
+    {
+      clib_warning ("invalid handle %llu", mp->handle);
+      return;
+    }
+  app_wrk = app_worker_get (s->app_wrk_index);
+  if (app_wrk->app_index != app->app_index)
+    {
+      clib_warning ("app %u does not own session %llu", app->app_index,
+                   mp->handle);
+      return;
+    }
+
+  rv = session_transport_attribute (s, mp->is_get, &mp->attr);
+
+  svm_msg_q_lock_and_alloc_msg_w_ring (
+    app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
+  evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+  clib_memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
+  rmp = (session_transport_attr_reply_msg_t *) evt->data;
+  rmp->handle = mp->handle;
+  rmp->retval = rv;
+  rmp->is_get = mp->is_get;
+  if (!rv && mp->is_get)
+    rmp->attr = mp->attr;
+  svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
+}
+
 vlib_node_registration_t session_queue_node;
 
 typedef struct
@@ -1073,9 +1157,6 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
 
       n_left -= 2;
 
-      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
-      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1);
-
       vec_add1 (wrk->pending_tx_buffers, bi0);
       vec_add1 (wrk->pending_tx_buffers, bi1);
       vec_add1 (wrk->pending_tx_nexts, next_index);
@@ -1103,8 +1184,6 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
 
       n_left -= 1;
 
-      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
-
       vec_add1 (wrk->pending_tx_buffers, bi0);
       vec_add1 (wrk->pending_tx_nexts, next_index);
     }
@@ -1224,6 +1303,12 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
       fp = e->rpc_args.fp;
       (*fp) (e->rpc_args.arg);
       break;
+    case SESSION_CTRL_EVT_HALF_CLOSE:
+      s = session_get_from_handle_if_valid (e->session_handle);
+      if (PREDICT_FALSE (!s))
+       break;
+      session_transport_half_close (s);
+      break;
     case SESSION_CTRL_EVT_CLOSE:
       s = session_get_from_handle_if_valid (e->session_handle);
       if (PREDICT_FALSE (!s))
@@ -1251,6 +1336,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
     case SESSION_CTRL_EVT_CONNECT_URI:
       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
       break;
+    case SESSION_CTRL_EVT_SHUTDOWN:
+      session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
+      break;
     case SESSION_CTRL_EVT_DISCONNECT:
       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
       break;
@@ -1276,6 +1364,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
     case SESSION_CTRL_EVT_APP_WRK_RPC:
       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
       break;
+    case SESSION_CTRL_EVT_TRANSPORT_ATTR:
+      session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
+      break;
     default:
       clib_warning ("unhandled event type %d", e->event_type);
     }
@@ -1418,6 +1509,79 @@ session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
   return n_to_dequeue;
 }
 
+static void
+session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
+{
+  struct itimerspec its;
+
+  its.it_value.tv_sec = 0;
+  its.it_value.tv_nsec = time_ns;
+  its.it_interval.tv_sec = 0;
+  its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+  if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
+    clib_warning ("timerfd_settime");
+}
+
+always_inline u64
+session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
+{
+  if (state == SESSION_WRK_INTERRUPT)
+    return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
+  else if (state == SESSION_WRK_IDLE)
+    return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
+  else
+    return 0;
+}
+
+static inline void
+session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
+{
+  u64 time_ns;
+
+  wrk->state = state;
+  time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
+  session_wrk_timerfd_update (wrk, time_ns);
+}
+
+static void
+session_wrk_update_state (session_worker_t *wrk)
+{
+  vlib_main_t *vm = wrk->vm;
+
+  if (wrk->state == SESSION_WRK_POLLING)
+    {
+      if (pool_elts (wrk->event_elts) == 3 &&
+         vlib_last_vectors_per_main_loop (vm) < 1)
+       {
+         session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
+         vlib_node_set_state (vm, session_queue_node.index,
+                              VLIB_NODE_STATE_INTERRUPT);
+       }
+    }
+  else if (wrk->state == SESSION_WRK_INTERRUPT)
+    {
+      if (pool_elts (wrk->event_elts) > 3 ||
+         vlib_last_vectors_per_main_loop (vm) > 1)
+       {
+         session_wrk_set_state (wrk, SESSION_WRK_POLLING);
+         vlib_node_set_state (vm, session_queue_node.index,
+                              VLIB_NODE_STATE_POLLING);
+       }
+      else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
+       {
+         session_wrk_set_state (wrk, SESSION_WRK_IDLE);
+       }
+    }
+  else
+    {
+      if (pool_elts (wrk->event_elts))
+       {
+         session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
+       }
+    }
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
@@ -1431,8 +1595,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
 
-  wrk->last_vlib_time = vlib_time_now (vm);
-  wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
+  session_wrk_update_time (wrk, vlib_time_now (vm));
 
   /*
    *  Update transport time
@@ -1514,6 +1677,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
 
+  if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
+    session_wrk_update_state (wrk);
+
   return n_tx_packets;
 }
 
@@ -1531,6 +1697,46 @@ VLIB_REGISTER_NODE (session_queue_node) =
 };
 /* *INDENT-ON* */
 
+static clib_error_t *
+session_wrk_tfd_read_ready (clib_file_t *cf)
+{
+  session_worker_t *wrk = session_main_get_worker (cf->private_data);
+  u64 buf;
+  int rv;
+
+  vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+  rv = read (wrk->timerfd, &buf, sizeof (buf));
+  if (rv < 0 && errno != EAGAIN)
+    clib_unix_warning ("failed");
+  return 0;
+}
+
+static clib_error_t *
+session_wrk_tfd_write_ready (clib_file_t *cf)
+{
+  return 0;
+}
+
+void
+session_wrk_enable_adaptive_mode (session_worker_t *wrk)
+{
+  u32 thread_index = wrk->vm->thread_index;
+  clib_file_t template = { 0 };
+
+  if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+    clib_warning ("timerfd_create");
+
+  template.read_function = session_wrk_tfd_read_ready;
+  template.write_function = session_wrk_tfd_write_ready;
+  template.file_descriptor = wrk->timerfd;
+  template.private_data = thread_index;
+  template.polling_thread_index = thread_index;
+  template.description = format (0, "session-wrk-tfd-%u", thread_index);
+
+  wrk->timerfd_file = clib_file_add (&file_main, &template);
+  wrk->flags |= SESSION_WRK_F_ADAPTIVE;
+}
+
 static clib_error_t *
 session_queue_exit (vlib_main_t * vm)
 {