session: use vpp to switch io events for ct sessions 05/18005/17
authorFlorin Coras <fcoras@cisco.com>
Mon, 4 Mar 2019 18:56:23 +0000 (10:56 -0800)
committerDave Barach <openvpp@barachs.net>
Wed, 6 Mar 2019 17:53:39 +0000 (17:53 +0000)
Instead of allocating pairs of message queues per cut-thru session and
having the applications map them, this uses vpp as an io event message
switch.

Change-Id: I51db1c7564df479a7d1a3288342394251fd188bb
Signed-off-by: Florin Coras <fcoras@cisco.com>
17 files changed:
src/tests/vnet/session/udp_echo.c
src/vcl/vcl_bapi.c
src/vcl/vcl_private.c
src/vcl/vcl_private.h
src/vcl/vppcom.c
src/vnet/session-apps/echo_server.c
src/vnet/session/application_interface.h
src/vnet/session/application_local.c
src/vnet/session/application_local.h
src/vnet/session/application_worker.c
src/vnet/session/segment_manager.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/session/session_node.c
src/vnet/session/session_types.h
test/test_vcl.py

index 462e113..9fda73d 100644 (file)
@@ -510,39 +510,14 @@ session_accepted_handler (session_accepted_msg_t * mp)
   session_index = session - utm->sessions;
   session->session_index = session_index;
 
-  /* Cut-through case */
-  if (mp->server_event_queue_address)
-    {
-      clib_warning ("cut-through session");
-      session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address,
-                                            svm_msg_q_t *);
-      sleep (1);
-      rx_fifo->master_session_index = session_index;
-      tx_fifo->master_session_index = session_index;
-      utm->cut_through_session_index = session_index;
-      session->rx_fifo = rx_fifo;
-      session->tx_fifo = tx_fifo;
-      session->is_dgram = 0;
-
-      rv = pthread_create (&utm->cut_through_thread_handle,
-                          NULL /*attr */ , cut_through_thread_fn, 0);
-      if (rv)
-       {
-         clib_warning ("pthread_create returned %d", rv);
-         rv = VNET_API_ERROR_SYSCALL_ERROR_1;
-       }
-    }
-  else
-    {
-      rx_fifo->client_session_index = session_index;
-      tx_fifo->client_session_index = session_index;
-      session->rx_fifo = rx_fifo;
-      session->tx_fifo = tx_fifo;
-      clib_memcpy_fast (&session->transport.rmt_ip, mp->ip,
-                       sizeof (ip46_address_t));
-      session->transport.is_ip4 = mp->is_ip4;
-      session->transport.rmt_port = mp->port;
-    }
+  rx_fifo->client_session_index = session_index;
+  tx_fifo->client_session_index = session_index;
+  session->rx_fifo = rx_fifo;
+  session->tx_fifo = tx_fifo;
+  clib_memcpy_fast (&session->transport.rmt_ip, mp->ip,
+                   sizeof (ip46_address_t));
+  session->transport.is_ip4 = mp->is_ip4;
+  session->transport.rmt_port = mp->port;
 
   hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index);
   if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
@@ -623,18 +598,17 @@ session_connected_handler (session_connected_msg_t * mp)
   session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
 
   /* Cut-through case */
-  if (mp->client_event_queue_address)
+  if (mp->ct_rx_fifo)
     {
       clib_warning ("cut-through session");
-      session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
+      session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
                                             svm_msg_q_t *);
-      utm->ct_event_queue = uword_to_pointer (mp->client_event_queue_address,
-                                             svm_msg_q_t *);
       utm->cut_through_session_index = session->session_index;
       session->is_dgram = 0;
       sleep (1);
       session->rx_fifo->client_session_index = session->session_index;
       session->tx_fifo->client_session_index = session->session_index;
+      /* TODO use ct fifos */
     }
   else
     {
@@ -744,7 +718,6 @@ send_test_chunk (udp_echo_main_t * utm, app_session_t * s, u32 bytes)
 
   u8 *test_data = utm->connect_test_data;
   u32 bytes_to_snd, enq_space, min_chunk;
-  session_evt_type_t et = FIFO_EVENT_APP_TX;
   int written;
 
   test_buf_len = vec_len (test_data);
@@ -753,17 +726,9 @@ send_test_chunk (udp_echo_main_t * utm, app_session_t * s, u32 bytes)
                               utm->bytes_to_send);
   enq_space = svm_fifo_max_enqueue (s->tx_fifo);
   bytes_this_chunk = clib_min (bytes_this_chunk, enq_space);
-  et += (s->session_index == utm->cut_through_session_index);
-
-  if (s->is_dgram)
-    written = app_send_dgram_raw (s->tx_fifo, &s->transport, s->vpp_evt_q,
-                                 test_data + test_buf_offset,
-                                 bytes_this_chunk, et, SVM_Q_WAIT);
-  else
-    written = app_send_stream_raw (s->tx_fifo, s->vpp_evt_q,
-                                  test_data + test_buf_offset,
-                                  bytes_this_chunk, et, SVM_Q_WAIT);
 
+  written = app_send (s, test_data + test_buf_offset, bytes_this_chunk,
+                     SVM_Q_WAIT);
   if (written > 0)
     {
       utm->bytes_to_send -= written;
@@ -1004,15 +969,12 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, u32 session_index)
   app_session_t *session;
   int rv;
   u32 max_dequeue, offset, max_transfer, rx_buf_len;
-  session_evt_type_t et = FIFO_EVENT_APP_TX;
 
   session = pool_elt_at_index (utm->sessions, session_index);
   rx_buf_len = vec_len (utm->rx_buf);
   rx_fifo = session->rx_fifo;
   tx_fifo = session->tx_fifo;
 
-  et += (session->session_index == utm->cut_through_session_index);
-
   max_dequeue = svm_fifo_max_dequeue (rx_fifo);
   /* Allow enqueuing of a new event */
   svm_fifo_unset_event (rx_fifo);
@@ -1040,15 +1002,8 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, u32 session_index)
          offset = 0;
          do
            {
-             if (session->is_dgram)
-               rv = app_send_dgram_raw (tx_fifo, &session->transport,
-                                        session->vpp_evt_q,
-                                        &utm->rx_buf[offset], n_read, et,
-                                        SVM_Q_WAIT);
-             else
-               rv = app_send_stream_raw (tx_fifo, session->vpp_evt_q,
-                                         &utm->rx_buf[offset], n_read, et,
-                                         SVM_Q_WAIT);
+             rv = app_send (session, &utm->rx_buf[offset], n_read,
+                            SVM_Q_WAIT);
              if (rv > 0)
                {
                  n_read -= rv;
@@ -1060,7 +1015,7 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, u32 session_index)
          /* If event wasn't set, add one */
          if (svm_fifo_set_event (tx_fifo))
            app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo,
-                                   et, SVM_Q_WAIT);
+                                   SESSION_IO_EVT_TX, SVM_Q_WAIT);
        }
     }
   while ((n_read < 0 || max_dequeue > 0) && !utm->time_to_stop);
@@ -1087,11 +1042,9 @@ server_handle_event_queue (udp_echo_main_t * utm)
       e = svm_msg_q_msg_data (mq, &msg);
       switch (e->event_type)
        {
-       case FIFO_EVENT_APP_RX:
+       case SESSION_IO_EVT_RX:
          server_handle_fifo_event_rx (utm, e->fifo->client_session_index);
          break;
-       case SESSION_IO_EVT_CT_TX:
-         break;
 
        default:
          handle_mq_event (e);
index 6baa7c1..d86e773 100644 (file)
@@ -293,41 +293,6 @@ vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp)
   VDBG (1, "Unmapped segment: %d", segment_handle);
 }
 
-static void
-  vl_api_app_cut_through_registration_add_t_handler
-  (vl_api_app_cut_through_registration_add_t * mp)
-{
-  vcl_cut_through_registration_t *ctr;
-  u32 mqc_index = ~0;
-  vcl_worker_t *wrk;
-  int *fds = 0;
-
-  if (mp->n_fds)
-    {
-      ASSERT (mp->n_fds == 2);
-      vec_validate (fds, mp->n_fds);
-      vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5);
-    }
-
-  wrk = vcl_worker_get (mp->wrk_index);
-  ctr = vcl_ct_registration_lock_and_alloc (wrk);
-  ctr->mq = uword_to_pointer (mp->evt_q_address, svm_msg_q_t *);
-  ctr->peer_mq = uword_to_pointer (mp->peer_evt_q_address, svm_msg_q_t *);
-  VDBG (0, "Adding ct registration %u", vcl_ct_registration_index (wrk, ctr));
-
-  if (mp->n_fds && (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD))
-    {
-      svm_msg_q_set_consumer_eventfd (ctr->mq, fds[0]);
-      svm_msg_q_set_producer_eventfd (ctr->peer_mq, fds[1]);
-      mqc_index = vcl_mq_epoll_add_evfd (wrk, ctr->mq);
-      ctr->epoll_evt_conn_index = mqc_index;
-      vec_free (fds);
-    }
-  vcl_ct_registration_lookup_add (wrk, mp->evt_q_address,
-                                 vcl_ct_registration_index (wrk, ctr));
-  vcl_ct_registration_unlock (wrk);
-}
-
 static void
 vl_api_bind_sock_reply_t_handler (vl_api_bind_sock_reply_t * mp)
 {
@@ -400,7 +365,6 @@ _(APPLICATION_TLS_CERT_ADD_REPLY, application_tls_cert_add_reply)   \
 _(APPLICATION_TLS_KEY_ADD_REPLY, application_tls_key_add_reply)        \
 _(MAP_ANOTHER_SEGMENT, map_another_segment)                            \
 _(UNMAP_SEGMENT, unmap_segment)                                                \
-_(APP_CUT_THROUGH_REGISTRATION_ADD, app_cut_through_registration_add)  \
 _(APP_WORKER_ADD_DEL_REPLY, app_worker_add_del_reply)                  \
 
 void
index 6c364e3..e38b663 100644 (file)
@@ -68,75 +68,6 @@ vcl_wait_for_app_state_change (app_state_t app_state)
   return VPPCOM_ETIMEDOUT;
 }
 
-vcl_cut_through_registration_t *
-vcl_ct_registration_lock_and_alloc (vcl_worker_t * wrk)
-{
-  vcl_cut_through_registration_t *cr;
-  clib_spinlock_lock (&wrk->ct_registration_lock);
-  pool_get (wrk->cut_through_registrations, cr);
-  memset (cr, 0, sizeof (*cr));
-  cr->epoll_evt_conn_index = -1;
-  return cr;
-}
-
-u32
-vcl_ct_registration_index (vcl_worker_t * wrk,
-                          vcl_cut_through_registration_t * ctr)
-{
-  return (ctr - wrk->cut_through_registrations);
-}
-
-void
-vcl_ct_registration_lock (vcl_worker_t * wrk)
-{
-  clib_spinlock_lock (&wrk->ct_registration_lock);
-}
-
-void
-vcl_ct_registration_unlock (vcl_worker_t * wrk)
-{
-  clib_spinlock_unlock (&wrk->ct_registration_lock);
-}
-
-vcl_cut_through_registration_t *
-vcl_ct_registration_get (vcl_worker_t * wrk, u32 ctr_index)
-{
-  if (pool_is_free_index (wrk->cut_through_registrations, ctr_index))
-    return 0;
-  return pool_elt_at_index (wrk->cut_through_registrations, ctr_index);
-}
-
-vcl_cut_through_registration_t *
-vcl_ct_registration_lock_and_lookup (vcl_worker_t * wrk, uword mq_addr)
-{
-  uword *p;
-  clib_spinlock_lock (&wrk->ct_registration_lock);
-  p = hash_get (wrk->ct_registration_by_mq, mq_addr);
-  if (!p)
-    return 0;
-  return vcl_ct_registration_get (wrk, p[0]);
-}
-
-void
-vcl_ct_registration_lookup_add (vcl_worker_t * wrk, uword mq_addr,
-                               u32 ctr_index)
-{
-  hash_set (wrk->ct_registration_by_mq, mq_addr, ctr_index);
-}
-
-void
-vcl_ct_registration_lookup_del (vcl_worker_t * wrk, uword mq_addr)
-{
-  hash_unset (wrk->ct_registration_by_mq, mq_addr);
-}
-
-void
-vcl_ct_registration_del (vcl_worker_t * wrk,
-                        vcl_cut_through_registration_t * ctr)
-{
-  pool_put (wrk->cut_through_registrations, ctr);
-}
-
 vcl_mq_evt_conn_t *
 vcl_mq_evt_conn_alloc (vcl_worker_t * wrk)
 {
@@ -235,8 +166,6 @@ vcl_worker_cleanup (vcl_worker_t * wrk, u8 notify_vpp)
   if (wrk->mqs_epfd > 0)
     close (wrk->mqs_epfd);
   hash_free (wrk->session_index_by_vpp_handles);
-  hash_free (wrk->ct_registration_by_mq);
-  clib_spinlock_free (&wrk->ct_registration_lock);
   vec_free (wrk->mq_events);
   vec_free (wrk->mq_msg_vector);
   vcl_worker_free (wrk);
@@ -286,8 +215,6 @@ vcl_worker_alloc_and_init ()
     }
 
   wrk->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
-  wrk->ct_registration_by_mq = hash_create (0, sizeof (uword));
-  clib_spinlock_init (&wrk->ct_registration_lock);
   clib_time_init (&wrk->clib_time);
   vec_validate (wrk->mq_events, 64);
   vec_validate (wrk->mq_msg_vector, 128);
@@ -418,6 +345,9 @@ vcl_session_read_ready (vcl_session_t * session)
   if (session->session_state & STATE_LISTEN)
     return clib_fifo_elts (session->accept_evts_fifo);
 
+  if (vcl_session_is_ct (session))
+    return svm_fifo_max_dequeue (session->ct_rx_fifo);
+
   return svm_fifo_max_dequeue (session->rx_fifo);
 }
 
@@ -452,6 +382,9 @@ vcl_session_write_ready (vcl_session_t * session)
       return rv;
     }
 
+  if (vcl_session_is_ct (session))
+    return svm_fifo_max_enqueue (session->ct_tx_fifo);
+
   return svm_fifo_max_enqueue (session->tx_fifo);
 }
 
index 2187499..dfe8b16 100644 (file)
@@ -162,6 +162,9 @@ typedef struct
   u64 vpp_handle;
   u32 vpp_thread_index;
 
+  svm_fifo_t *ct_rx_fifo;
+  svm_fifo_t *ct_tx_fifo;
+
   /* Socket configuration state */
   u8 is_vep;
   u8 is_vep_session;
@@ -270,15 +273,6 @@ typedef struct vcl_worker_
   /** For deadman timers */
   clib_time_t clib_time;
 
-  /** Pool of cut through registrations */
-  vcl_cut_through_registration_t *cut_through_registrations;
-
-  /** Lock for accessing ct registration pool */
-  clib_spinlock_t ct_registration_lock;
-
-  /** Cut-through registration by mq address hash table */
-  uword *ct_registration_by_mq;
-
   /** Vector acting as buffer for mq messages */
   svm_msg_q_msg_t *mq_msg_vector;
 
@@ -481,7 +475,7 @@ const char *vppcom_session_state_str (vcl_session_state_t state);
 static inline u8
 vcl_session_is_ct (vcl_session_t * s)
 {
-  return (s->our_evt_q != 0);
+  return (s->ct_tx_fifo != 0);
 }
 
 static inline u8
@@ -516,19 +510,6 @@ vcl_session_closed_error (vcl_session_t * s)
  * Helpers
  */
 int vcl_wait_for_app_state_change (app_state_t app_state);
-vcl_cut_through_registration_t
-  * vcl_ct_registration_lock_and_alloc (vcl_worker_t * wrk);
-void vcl_ct_registration_del (vcl_worker_t * wrk,
-                             vcl_cut_through_registration_t * ctr);
-u32 vcl_ct_registration_index (vcl_worker_t * wrk,
-                              vcl_cut_through_registration_t * ctr);
-void vcl_ct_registration_lock (vcl_worker_t * wrk);
-void vcl_ct_registration_unlock (vcl_worker_t * wrk);
-vcl_cut_through_registration_t
-  * vcl_ct_registration_lock_and_lookup (vcl_worker_t * wrk, uword mq_addr);
-void vcl_ct_registration_lookup_add (vcl_worker_t * wrk, uword mq_addr,
-                                    u32 ctr_index);
-void vcl_ct_registration_lookup_del (vcl_worker_t * wrk, uword mq_addr);
 vcl_mq_evt_conn_t *vcl_mq_evt_conn_alloc (vcl_worker_t * wrk);
 u32 vcl_mq_evt_conn_index (vcl_worker_t * wrk, vcl_mq_evt_conn_t * mqc);
 vcl_mq_evt_conn_t *vcl_mq_evt_conn_get (vcl_worker_t * wrk, u32 mq_conn_idx);
@@ -581,10 +562,7 @@ vcl_n_workers (void)
 static inline svm_msg_q_t *
 vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
 {
-  if (vcl_session_is_ct (s))
-    return wrk->vpp_event_queues[0];
-  else
-    return wrk->vpp_event_queues[s->vpp_thread_index];
+  return wrk->vpp_event_queues[s->vpp_thread_index];
 }
 
 void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
index 7c076d3..3abde98 100644 (file)
@@ -273,7 +273,6 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
                                                      mp->listener_handle);
   if (!listen_session)
     {
-      svm_msg_q_t *evt_q;
       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
       clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
                    "unknown vpp listener handle %llx",
@@ -287,39 +286,23 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
 
-  if (mp->server_event_queue_address)
-    {
-      session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address,
-                                            svm_msg_q_t *);
-      session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
-                                            svm_msg_q_t *);
-      if (vcl_wait_for_segment (mp->segment_handle))
-       {
-         clib_warning ("segment for session %u couldn't be mounted!",
-                       session->session_index);
-         return VCL_INVALID_SESSION_INDEX;
-       }
-      rx_fifo->master_session_index = session->session_index;
-      tx_fifo->master_session_index = session->session_index;
-      rx_fifo->master_thread_index = vcl_get_worker_index ();
-      tx_fifo->master_thread_index = vcl_get_worker_index ();
-      vec_validate (wrk->vpp_event_queues, 0);
-      evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
-      wrk->vpp_event_queues[0] = evt_q;
-    }
-  else
+  if (vcl_wait_for_segment (mp->segment_handle))
     {
-      session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                            svm_msg_q_t *);
-      rx_fifo->client_session_index = session->session_index;
-      tx_fifo->client_session_index = session->session_index;
-      rx_fifo->client_thread_index = vcl_get_worker_index ();
-      tx_fifo->client_thread_index = vcl_get_worker_index ();
-      vpp_wrk_index = tx_fifo->master_thread_index;
-      vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
-      wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
+      clib_warning ("segment for session %u couldn't be mounted!",
+                   session->session_index);
+      return VCL_INVALID_SESSION_INDEX;
     }
 
+  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+                                        svm_msg_q_t *);
+  rx_fifo->client_session_index = session->session_index;
+  tx_fifo->client_session_index = session->session_index;
+  rx_fifo->client_thread_index = vcl_get_worker_index ();
+  tx_fifo->client_thread_index = vcl_get_worker_index ();
+  vpp_wrk_index = tx_fifo->master_thread_index;
+  vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
+  wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
+
   session->vpp_handle = mp->handle;
   session->vpp_thread_index = rx_fifo->master_thread_index;
   session->client_context = mp->context;
@@ -356,7 +339,6 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
   u32 session_index, vpp_wrk_index;
   svm_fifo_t *rx_fifo, *tx_fifo;
   vcl_session_t *session = 0;
-  svm_msg_q_t *evt_q;
 
   session_index = mp->context;
   session = vcl_session_get (wrk, session_index);
@@ -380,8 +362,8 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
   if (vcl_wait_for_segment (mp->segment_handle))
     {
-      clib_warning ("segment for session %u couldn't be mounted!",
-                   session->session_index);
+      VDBG (0, "segment for session %u couldn't be mounted!",
+           session->session_index);
       return VCL_INVALID_SESSION_INDEX;
     }
 
@@ -390,24 +372,22 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
   rx_fifo->client_thread_index = vcl_get_worker_index ();
   tx_fifo->client_thread_index = vcl_get_worker_index ();
 
-  if (mp->client_event_queue_address)
-    {
-      session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
-                                            svm_msg_q_t *);
-      session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
-                                            svm_msg_q_t *);
+  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+                                        svm_msg_q_t *);
+  vpp_wrk_index = tx_fifo->master_thread_index;
+  vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
+  wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
 
-      vec_validate (wrk->vpp_event_queues, 0);
-      evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
-      wrk->vpp_event_queues[0] = evt_q;
-    }
-  else
+  if (mp->ct_rx_fifo)
     {
-      session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                            svm_msg_q_t *);
-      vpp_wrk_index = tx_fifo->master_thread_index;
-      vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
-      wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
+      session->ct_rx_fifo = uword_to_pointer (mp->ct_rx_fifo, svm_fifo_t *);
+      session->ct_tx_fifo = uword_to_pointer (mp->ct_tx_fifo, svm_fifo_t *);
+      if (vcl_wait_for_segment (mp->ct_segment_handle))
+       {
+         VDBG (0, "ct segment for session %u couldn't be mounted!",
+               session->session_index);
+         return VCL_INVALID_SESSION_INDEX;
+       }
     }
 
   session->rx_fifo = rx_fifo;
@@ -667,10 +647,11 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
 
   switch (e->event_type)
     {
-    case FIFO_EVENT_APP_RX:
-    case FIFO_EVENT_APP_TX:
-    case SESSION_IO_EVT_CT_RX:
-    case SESSION_IO_EVT_CT_TX:
+    case SESSION_IO_EVT_RX:
+    case SESSION_IO_EVT_TX:
+      session = vcl_session_get (wrk, e->fifo->client_session_index);
+      if (!session || !(session->session_state & STATE_OPEN))
+       break;
       vec_add1 (wrk->unhandled_evts_vector, *e);
       break;
     case SESSION_CTRL_EVT_ACCEPTED:
@@ -1123,23 +1104,6 @@ vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * session,
        }
     }
 
-  if (vcl_session_is_ct (session))
-    {
-      vcl_cut_through_registration_t *ctr;
-      uword mq_addr;
-
-      mq_addr = pointer_to_uword (session->our_evt_q);
-      ctr = vcl_ct_registration_lock_and_lookup (wrk, mq_addr);
-      ASSERT (ctr);
-      if (ctr->epoll_evt_conn_index != ~0)
-       vcl_mq_epoll_del_evfd (wrk, ctr->epoll_evt_conn_index);
-      VDBG (0, "Removing ct registration %u",
-           vcl_ct_registration_index (wrk, ctr));
-      vcl_ct_registration_del (wrk, ctr);
-      vcl_ct_registration_lookup_del (wrk, mq_addr);
-      vcl_ct_registration_unlock (wrk);
-    }
-
   VDBG (0, "session %u [0x%llx] removed", session->session_index, vpp_handle);
 
 cleanup:
@@ -1336,7 +1300,6 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
   session_accepted_msg_t accepted_msg;
   vcl_session_t *listen_session = 0;
   vcl_session_t *client_session = 0;
-  svm_msg_q_t *vpp_evt_q;
   vcl_session_msg_t *evt;
   u64 listen_vpp_handle;
   svm_msg_q_msg_t msg;
@@ -1411,13 +1374,8 @@ handle:
                          sizeof (ip6_address_t));
     }
 
-  if (accepted_msg.server_event_queue_address)
-    vpp_evt_q = uword_to_pointer (accepted_msg.vpp_event_queue_address,
-                                 svm_msg_q_t *);
-  else
-    vpp_evt_q = client_session->vpp_evt_q;
-
-  vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
+  vcl_send_session_accepted_reply (client_session->vpp_evt_q,
+                                  client_session->client_context,
                                   client_session->vpp_handle, 0);
 
   VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
@@ -1533,11 +1491,8 @@ vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
 static u8
 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
 {
-  if (!is_ct)
-    return (e->event_type == FIFO_EVENT_APP_RX
-           && e->fifo->client_session_index == sid);
-  else
-    return (e->event_type == SESSION_IO_EVT_CT_TX);
+  return (e->event_type == SESSION_IO_EVT_RX
+         && e->fifo->client_session_index == sid);
 }
 
 static inline int
@@ -1570,17 +1525,18 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
 
   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
   is_ct = vcl_session_is_ct (s);
-  mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
-  rx_fifo = s->rx_fifo;
+  mq = wrk->app_event_queue;
+  rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
   s->has_rx_evt = 0;
 
+  if (is_ct)
+    svm_fifo_unset_event (s->rx_fifo);
+
   if (svm_fifo_is_empty (rx_fifo))
     {
+      svm_fifo_unset_event (rx_fifo);
       if (is_nonblocking)
-       {
-         svm_fifo_unset_event (rx_fifo);
-         return VPPCOM_EWOULDBLOCK;
-       }
+       return VPPCOM_EWOULDBLOCK;
       while (svm_fifo_is_empty (rx_fifo))
        {
          if (vcl_session_is_closing (s))
@@ -1595,7 +1551,10 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
          e = svm_msg_q_msg_data (mq, &msg);
          svm_msg_q_unlock (mq);
          if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
-           vcl_handle_mq_event (wrk, e);
+           {
+             clib_warning ("THIS ONE type %u", e->event_type);
+             vcl_handle_mq_event (wrk, e);
+           }
          svm_msg_q_free_msg (mq, &msg);
        }
     }
@@ -1608,13 +1567,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   if (svm_fifo_is_empty (rx_fifo))
     svm_fifo_unset_event (rx_fifo);
 
-  if (is_ct && svm_fifo_needs_tx_ntf (rx_fifo, n_read))
-    {
-      svm_fifo_clear_tx_ntf (s->rx_fifo);
-      app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
-                             SVM_Q_WAIT);
-    }
-
   VDBG (2, "vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
        s->vpp_handle, session_handle, n_read, rx_fifo);
 
@@ -1659,6 +1611,9 @@ vppcom_session_read_segments (uint32_t session_handle,
   rx_fifo = s->rx_fifo;
   s->has_rx_evt = 0;
 
+  if (is_ct)
+    svm_fifo_unset_event (s->rx_fifo);
+
   if (svm_fifo_is_empty (rx_fifo))
     {
       if (is_nonblocking)
@@ -1688,14 +1643,6 @@ vppcom_session_read_segments (uint32_t session_handle,
   n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
   svm_fifo_unset_event (rx_fifo);
 
-  if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
-    {
-      /* If the peer is not polling send notification */
-      if (!svm_fifo_has_event (s->rx_fifo))
-       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
-                               SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
-    }
-
   return n_read;
 }
 
@@ -1729,11 +1676,8 @@ vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
 static u8
 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
 {
-  if (!is_ct)
-    return (e->event_type == FIFO_EVENT_APP_TX
-           && e->fifo->client_session_index == sid);
-  else
-    return (e->event_type == SESSION_IO_EVT_CT_RX);
+  return (e->event_type == SESSION_IO_EVT_TX
+         && e->fifo->client_session_index == sid);
 }
 
 static inline int
@@ -1772,10 +1716,10 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
       return vcl_session_closed_error (s);;
     }
 
-  tx_fifo = s->tx_fifo;
   is_ct = vcl_session_is_ct (s);
+  tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
-  mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
+  mq = wrk->app_event_queue;
   if (svm_fifo_is_full (tx_fifo))
     {
       if (is_nonblocking)
@@ -1801,17 +1745,20 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
        }
     }
 
-  ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
-  et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
-  if (is_flush && !vcl_session_is_ct (s))
+  et = SESSION_IO_EVT_TX;
+  if (is_flush && !is_ct)
     et = SESSION_IO_EVT_TX_FLUSH;
 
   if (s->is_dgram)
     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
-                                 s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
+                                 s->vpp_evt_q, buf, n, et,
+                                 !is_ct /* do_evt */ , SVM_Q_WAIT);
   else
     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
-                                  SVM_Q_WAIT);
+                                  !is_ct /* do_evt */ , SVM_Q_WAIT);
+
+  if (is_ct && svm_fifo_set_event (s->tx_fifo))
+    app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo, et, SVM_Q_WAIT);
 
   ASSERT (n_write > 0);
 
@@ -1835,32 +1782,6 @@ vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n)
                                      1 /* is_flush */ );
 }
 
-
-static vcl_session_t *
-vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
-{
-  vcl_session_t *s;
-  s = vcl_session_get (wrk, f->client_session_index);
-  if (s)
-    {
-      /* rx fifo */
-      if (type == 0 && s->rx_fifo == f)
-       return s;
-      /* tx fifo */
-      if (type == 1 && s->tx_fifo == f)
-       return s;
-    }
-  s = vcl_session_get (wrk, f->master_session_index);
-  if (s)
-    {
-      if (type == 0 && s->rx_fifo == f)
-       return s;
-      if (type == 1 && s->tx_fifo == f)
-       return s;
-    }
-  return 0;
-}
-
 #define vcl_fifo_rx_evt_valid_or_break(_fifo)                  \
 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                 \
   {                                                            \
@@ -1905,29 +1826,6 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
          *bits_set += 1;
        }
       break;
-    case SESSION_IO_EVT_CT_TX:
-      vcl_fifo_rx_evt_valid_or_break (e->fifo);
-      session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
-      if (!session)
-       break;
-      sid = session->session_index;
-      if (sid < n_bits && read_map)
-       {
-         clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
-         *bits_set += 1;
-       }
-      break;
-    case SESSION_IO_EVT_CT_RX:
-      session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
-      if (!session)
-       break;
-      sid = session->session_index;
-      if (sid < n_bits && write_map)
-       {
-         clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
-         *bits_set += 1;
-       }
-      break;
     case SESSION_CTRL_EVT_ACCEPTED:
       session = vcl_session_accepted (wrk,
                                      (session_accepted_msg_t *) e->data);
@@ -2038,31 +1936,9 @@ vppcom_select_condvar (vcl_worker_t * wrk, int n_bits,
                       vcl_si_set * except_map, double time_to_wait,
                       u32 * bits_set)
 {
-  double total_wait = 0, wait_slice;
-  vcl_cut_through_registration_t *cr;
-
   time_to_wait = (time_to_wait == -1) ? 1e6 : time_to_wait;
-  wait_slice = wrk->cut_through_registrations ? 10e-6 : time_to_wait;
-  do
-    {
-      vcl_ct_registration_lock (wrk);
-      /* *INDENT-OFF* */
-      pool_foreach (cr, wrk->cut_through_registrations, ({
-       vcl_select_handle_mq (wrk, cr->mq, n_bits, read_map, write_map, except_map,
-                             0, bits_set);
-      }));
-      /* *INDENT-ON* */
-      vcl_ct_registration_unlock (wrk);
-
-      vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
-                           write_map, except_map, wait_slice, bits_set);
-      total_wait += wait_slice;
-      if (*bits_set)
-       return *bits_set;
-    }
-  while (total_wait < time_to_wait);
-
-  return 0;
+  return vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
+                              write_map, except_map, time_to_wait, bits_set);
 }
 
 static int
@@ -2477,7 +2353,7 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
 
   switch (e->event_type)
     {
-    case FIFO_EVENT_APP_RX:
+    case SESSION_IO_EVT_RX:
       ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
       vcl_fifo_rx_evt_valid_or_break (e->fifo);
       sid = e->fifo->client_session_index;
@@ -2491,7 +2367,7 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       session_evt_data = session->vep.ev.data.u64;
       session->has_rx_evt = 1;
       break;
-    case FIFO_EVENT_APP_TX:
+    case SESSION_IO_EVT_TX:
       sid = e->fifo->client_session_index;
       if (!(session = vcl_session_get (wrk, sid)))
        break;
@@ -2503,33 +2379,6 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       session_evt_data = session->vep.ev.data.u64;
       svm_fifo_reset_tx_ntf (session->tx_fifo);
       break;
-    case SESSION_IO_EVT_CT_TX:
-      vcl_fifo_rx_evt_valid_or_break (e->fifo);
-      session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
-      if (PREDICT_FALSE (!session))
-       break;
-      sid = session->session_index;
-      session_events = session->vep.ev.events;
-      if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
-       break;
-      add_event = 1;
-      events[*num_ev].events |= EPOLLIN;
-      session_evt_data = session->vep.ev.data.u64;
-      session->has_rx_evt = 1;
-      break;
-    case SESSION_IO_EVT_CT_RX:
-      session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
-      if (PREDICT_FALSE (!session))
-       break;
-      sid = session->session_index;
-      session_events = session->vep.ev.events;
-      if (!(EPOLLOUT & session_events))
-       break;
-      add_event = 1;
-      events[*num_ev].events |= EPOLLOUT;
-      session_evt_data = session->vep.ev.data.u64;
-      svm_fifo_reset_tx_ntf (session->tx_fifo);
-      break;
     case SESSION_CTRL_EVT_ACCEPTED:
       session = vcl_session_accepted (wrk,
                                      (session_accepted_msg_t *) e->data);
@@ -2663,33 +2512,9 @@ static int
 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
                           int maxevents, u32 n_evts, double wait_for_time)
 {
-  vcl_cut_through_registration_t *cr;
-  double total_wait = 0, wait_slice;
-  int rv;
-
   wait_for_time = (wait_for_time == -1) ? (double) 1e6 : wait_for_time;
-  wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time;
-
-  do
-    {
-      vcl_ct_registration_lock (wrk);
-      /* *INDENT-OFF* */
-      pool_foreach (cr, wrk->cut_through_registrations, ({
-        vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &n_evts);
-      }));
-      /* *INDENT-ON* */
-      vcl_ct_registration_unlock (wrk);
-
-      rv = vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events,
-                                    maxevents, n_evts ? 0 : wait_slice,
-                                    &n_evts);
-      if (rv)
-       total_wait += wait_slice;
-      if (n_evts)
-       return n_evts;
-    }
-  while (total_wait < wait_for_time);
-  return n_evts;
+  return vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events,
+                                  maxevents, wait_for_time, &n_evts);
 }
 
 static int
index 2762347..941c16d 100644 (file)
@@ -240,14 +240,16 @@ echo_server_rx_callback (session_t * s)
       n_written = app_send_stream_raw (tx_fifo,
                                       esm->vpp_queue[thread_index],
                                       esm->rx_buf[thread_index],
-                                      actual_transfer, SESSION_IO_EVT_TX, 0);
+                                      actual_transfer, SESSION_IO_EVT_TX,
+                                      1 /* do_evt */ , 0);
     }
   else
     {
       n_written = app_send_dgram_raw (tx_fifo, &at,
                                      esm->vpp_queue[s->thread_index],
                                      esm->rx_buf[thread_index],
-                                     actual_transfer, SESSION_IO_EVT_TX, 0);
+                                     actual_transfer, SESSION_IO_EVT_TX,
+                                     1 /* do_evt */ , 0);
     }
 
   if (n_written != max_transfer)
index 935a352..d4dfeec 100644 (file)
@@ -283,8 +283,6 @@ typedef struct session_accepted_msg_
   uword server_tx_fifo;
   u64 segment_handle;
   uword vpp_event_queue_address;
-  uword server_event_queue_address;
-  uword client_event_queue_address;
   u16 port;
   u8 is_ip4;
   u8 ip[16];
@@ -309,9 +307,10 @@ typedef struct session_connected_msg_
   uword server_rx_fifo;
   uword server_tx_fifo;
   u64 segment_handle;
+  uword ct_rx_fifo;
+  uword ct_tx_fifo;
+  u64 ct_segment_handle;
   uword vpp_event_queue_address;
-  uword client_event_queue_address;
-  uword server_event_queue_address;
   u32 segment_size;
   u8 segment_name_length;
   u8 segment_name[64];
@@ -454,7 +453,7 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
 always_inline int
 app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
                    svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 evt_type,
-                   u8 noblock)
+                   u8 do_evt, u8 noblock)
 {
   u32 max_enqueue, actual_write;
   session_dgram_hdr_t hdr;
@@ -478,7 +477,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
 
   if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0)
     {
-      if (svm_fifo_set_event (f))
+      if (do_evt && svm_fifo_set_event (f))
        app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock);
     }
   ASSERT (rv);
@@ -489,18 +488,19 @@ always_inline int
 app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock)
 {
   return app_send_dgram_raw (s->tx_fifo, &s->transport, s->vpp_evt_q, data,
-                            len, SESSION_IO_EVT_TX, noblock);
+                            len, SESSION_IO_EVT_TX, 1 /* do_evt */ ,
+                            noblock);
 }
 
 always_inline int
 app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data,
-                    u32 len, u8 evt_type, u8 noblock)
+                    u32 len, u8 evt_type, u8 do_evt, u8 noblock)
 {
   int rv;
 
   if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0)
     {
-      if (svm_fifo_set_event (f))
+      if (do_evt && svm_fifo_set_event (f))
        app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock);
     }
   return rv;
@@ -510,7 +510,7 @@ always_inline int
 app_send_stream (app_session_t * s, u8 * data, u32 len, u8 noblock)
 {
   return app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, data, len,
-                             SESSION_IO_EVT_TX, noblock);
+                             SESSION_IO_EVT_TX, 1 /* do_evt */ , noblock);
 }
 
 always_inline int
index 9378e5e..745b202 100644 (file)
 #include <vnet/session/application_local.h>
 #include <vnet/session/session.h>
 
-ct_connection_t *connections;
+static ct_connection_t *connections;
 
-ct_connection_t *
+static void
+ct_enable_disable_main_pre_input_node (u8 is_add)
+{
+  u32 n_conns;
+
+  n_conns = pool_elts (connections);
+  if (n_conns > 2)
+    return;
+
+  if (n_conns > 0 && is_add)
+    vlib_node_set_state (vlib_get_main (),
+                        session_queue_pre_input_node.index,
+                        VLIB_NODE_STATE_POLLING);
+  else if (n_conns == 0)
+    vlib_node_set_state (vlib_get_main (),
+                        session_queue_pre_input_node.index,
+                        VLIB_NODE_STATE_DISABLED);
+}
+
+static ct_connection_t *
 ct_connection_alloc (void)
 {
   ct_connection_t *ct;
@@ -31,7 +50,7 @@ ct_connection_alloc (void)
   return ct;
 }
 
-ct_connection_t *
+static ct_connection_t *
 ct_connection_get (u32 ct_index)
 {
   if (pool_is_free_index (connections, ct_index))
@@ -39,7 +58,7 @@ ct_connection_get (u32 ct_index)
   return pool_elt_at_index (connections, ct_index);
 }
 
-void
+static void
 ct_connection_free (ct_connection_t * ct)
 {
   if (CLIB_DEBUG)
@@ -110,8 +129,14 @@ ct_session_connect_notify (session_t * ss)
   cs->session_state = SESSION_STATE_CONNECTING;
   cs->app_wrk_index = client_wrk->wrk_index;
   cs->connection_index = cct->c_c_index;
+  cs->t_app_index = client_wrk->app_index;
 
   cct->c_s_index = cs->session_index;
+  cct->client_rx_fifo = ss->tx_fifo;
+  cct->client_tx_fifo = ss->rx_fifo;
+
+  cct->client_rx_fifo->refcnt++;
+  cct->client_tx_fifo->refcnt++;
 
   /* This will allocate fifos for the session. They won't be used for
    * exchanging data but they will be used to close the connection if
@@ -135,47 +160,25 @@ ct_session_connect_notify (session_t * ss)
   return 0;
 }
 
-static void
-ct_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq)
-{
-  int fd;
-
-  /*
-   * segment manager initializes only the producer eventds, since vpp is
-   * typically the producer. But for local sessions, we also pass to the
-   * apps the mqs they listen on for events from peer apps, so they are also
-   * consumer fds.
-   */
-  fd = svm_msg_q_get_producer_eventfd (sq);
-  svm_msg_q_set_consumer_eventfd (sq, fd);
-  fd = svm_msg_q_get_producer_eventfd (cq);
-  svm_msg_q_set_consumer_eventfd (cq, fd);
-}
-
-int
+static int
 ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
                       ct_connection_t * ct, session_t * ls, session_t * ll)
 {
-  u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
-  u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index;
-  segment_manager_properties_t *props, *cprops;
+  u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index, seg_size;
+  segment_manager_properties_t *props;
   svm_fifo_segment_private_t *seg;
-  application_t *server, *client;
+  application_t *server;
   segment_manager_t *sm;
-  svm_msg_q_t *sq, *cq;
+  u32 margin = 16 << 10;
   u64 segment_handle;
   int seg_index, rv;
 
   server = application_get (server_wrk->app_index);
-  client = application_get (client_wrk->app_index);
 
   props = application_segment_manager_properties (server);
-  cprops = application_segment_manager_properties (client);
-  evt_q_elts = props->evt_q_size + cprops->evt_q_size;
-  evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
   round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
   round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
-  seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
+  seg_size = round_rx_fifo_sz + round_tx_fifo_sz + margin;
 
   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
   seg_index = segment_manager_add_segment (sm, seg_size);
@@ -185,14 +188,7 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
       return seg_index;
     }
   seg = segment_manager_get_segment_w_lock (sm, seg_index);
-  sq = segment_manager_alloc_queue (seg, props);
-  cq = segment_manager_alloc_queue (seg, cprops);
-
-  if (props->use_mq_eventfd)
-    ct_session_fix_eventds (sq, cq);
 
-  ct->server_evt_q = pointer_to_uword (sq);
-  ct->client_evt_q = pointer_to_uword (cq);
   rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
                                        props->tx_fifo_size, &ls->rx_fifo,
                                        &ls->tx_fifo);
@@ -204,8 +200,8 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
     }
 
   sm_index = segment_manager_index (sm);
-  ls->rx_fifo->ct_session_index = ls->session_index;
-  ls->tx_fifo->ct_session_index = ls->session_index;
+  ls->rx_fifo->master_session_index = ls->session_index;
+  ls->tx_fifo->master_session_index = ls->session_index;
   ls->rx_fifo->segment_manager = sm_index;
   ls->tx_fifo->segment_manager = sm_index;
   ls->rx_fifo->segment_index = seg_index;
@@ -228,7 +224,7 @@ failed:
   return rv;
 }
 
-int
+static int
 ct_connect (app_worker_t * client_wrk, session_t * ll,
            session_endpoint_cfg_t * sep)
 {
@@ -255,6 +251,7 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
   cct->c_is_ip4 = sep->is_ip4;
   clib_memcpy (&cct->c_rmt_ip, &sep->ip, sizeof (sep->ip));
   cct->actual_tp = ll_ct->actual_tp;
+  cct->is_client = 1;
 
   /*
    * Init server transport
@@ -285,6 +282,7 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
 
   server_wrk = application_listener_select_worker (ll);
   ss->app_wrk_index = server_wrk->wrk_index;
+  ss->t_app_index = server_wrk->app_index;
 
   sct->c_s_index = ss->session_index;
   sct->server_wrk = ss->app_wrk_index;
@@ -306,14 +304,12 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
       return -1;
     }
 
-  cct->client_evt_q = sct->client_evt_q;
-  cct->server_evt_q = sct->server_evt_q;
   cct->segment_handle = sct->segment_handle;
-
+  ct_enable_disable_main_pre_input_node (1 /* is_add */ );
   return 0;
 }
 
-u32
+static u32
 ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
 {
   session_endpoint_cfg_t *sep;
@@ -326,25 +322,27 @@ ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
   ct->c_lcl_port = sep->port;
   ct->actual_tp = sep->transport_proto;
+  ct_enable_disable_main_pre_input_node (1 /* is_add */ );
   return ct->c_c_index;
 }
 
-u32
+static u32
 ct_stop_listen (u32 ct_index)
 {
   ct_connection_t *ct;
   ct = ct_connection_get (ct_index);
   ct_connection_free (ct);
+  ct_enable_disable_main_pre_input_node (0 /* is_add */ );
   return 0;
 }
 
-transport_connection_t *
+static transport_connection_t *
 ct_listener_get (u32 ct_index)
 {
   return (transport_connection_t *) ct_connection_get (ct_index);
 }
 
-int
+static int
 ct_session_connect (transport_endpoint_cfg_t * tep)
 {
   session_endpoint_cfg_t *sep_ext;
@@ -407,10 +405,11 @@ global_scope:
   return 1;
 }
 
-void
+static void
 ct_session_close (u32 ct_index, u32 thread_index)
 {
   ct_connection_t *ct, *peer_ct;
+  app_worker_t *app_wrk;
   session_t *s;
 
   ct = ct_connection_get (ct_index);
@@ -422,13 +421,18 @@ ct_session_close (u32 ct_index, u32 thread_index)
     }
 
   s = session_get (ct->c_s_index, 0);
-  app_worker_del_segment_notify (app_worker_get (s->app_wrk_index),
-                                ct->segment_handle);
+  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+  if (app_wrk)
+    app_worker_del_segment_notify (app_wrk, ct->segment_handle);
   session_free_w_fifos (s);
+  if (ct->is_client)
+    segment_manager_dealloc_fifos (ct->client_rx_fifo, ct->client_tx_fifo);
+
   ct_connection_free (ct);
+  ct_enable_disable_main_pre_input_node (0 /* is_add */ );
 }
 
-transport_connection_t *
+static transport_connection_t *
 ct_session_get (u32 ct_index, u32 thread_index)
 {
   return (transport_connection_t *) ct_connection_get (ct_index);
@@ -460,7 +464,7 @@ format_ct_connection_id (u8 * s, va_list * args)
   return s;
 }
 
-u8 *
+static u8 *
 format_ct_listener (u8 * s, va_list * args)
 {
   u32 tc_index = va_arg (*args, u32);
@@ -472,7 +476,7 @@ format_ct_listener (u8 * s, va_list * args)
   return s;
 }
 
-u8 *
+static u8 *
 format_ct_connection (u8 * s, va_list * args)
 {
   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
@@ -492,7 +496,7 @@ format_ct_connection (u8 * s, va_list * args)
   return s;
 }
 
-u8 *
+static u8 *
 format_ct_session (u8 * s, va_list * args)
 {
   u32 ct_index = va_arg (*args, u32);
@@ -526,11 +530,29 @@ const static transport_proto_vft_t cut_thru_proto = {
 };
 /* *INDENT-ON* */
 
+int
+ct_session_tx (session_t * s)
+{
+  ct_connection_t *ct, *peer_ct;
+  session_t *peer_s;
+
+  ct = (ct_connection_t *) session_get_transport (s);
+  peer_ct = ct_connection_get (ct->peer_index);
+  if (!peer_ct)
+    return -1;
+  peer_s = session_get (peer_ct->c_s_index, 0);
+  if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+    return 0;
+  return session_enqueue_notify (peer_s);
+}
+
 static clib_error_t *
 ct_transport_init (vlib_main_t * vm)
 {
   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
                               FIB_PROTOCOL_IP4, ~0);
+  transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
+                              FIB_PROTOCOL_IP6, ~0);
   return 0;
 }
 
index 5d6e6c1..7b937d3 100644 (file)
@@ -27,16 +27,18 @@ typedef struct ct_connection_
   u32 server_wrk;
   u32 transport_listener_index;
   transport_proto_t actual_tp;
-  u64 server_evt_q;
-  u64 client_evt_q;
   u32 client_opaque;
   u32 peer_index;
   u64 segment_handle;
+  svm_fifo_t *client_rx_fifo;
+  svm_fifo_t *client_tx_fifo;
+  u8 is_client;
 } ct_connection_t;
 
 session_t *ct_session_get_peer (session_t * s);
 void ct_session_endpoint (session_t * ll, session_endpoint_t * sep);
 int ct_session_connect_notify (session_t * ls);
+int ct_session_tx (session_t * s);
 
 #endif /* SRC_VNET_SESSION_APPLICATION_LOCAL_H_ */
 
index 9dfa3aa..7c88888 100644 (file)
@@ -543,7 +543,7 @@ app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock)
       return app->cb_fns.builtin_app_rx_callback (s);
     }
 
-  if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo))
+  if (svm_fifo_has_event (s->rx_fifo))
     return 0;
 
   mq = app_wrk->event_queue;
@@ -608,9 +608,8 @@ app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock)
 typedef int (app_send_evt_handler_fn) (app_worker_t *app,
                                       session_t *s,
                                       u8 lock);
-static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
+static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
     app_send_io_evt_rx,
-    0,
     app_send_io_evt_tx,
 };
 /* *INDENT-ON* */
index b7467bb..25b641d 100644 (file)
@@ -408,12 +408,10 @@ segment_manager_del_sessions (segment_manager_t * sm)
      */
     while (fifo)
       {
-       if (fifo->ct_session_index != SVM_FIFO_INVALID_SESSION_INDEX)
-         session = session_get (fifo->ct_session_index, 0);
-       else
-         session = session_get (fifo->master_session_index,
-                                fifo->master_thread_index);
-       vec_add1 (handles, session_handle (session));
+       session = session_get_if_valid (fifo->master_session_index,
+                                       fifo->master_thread_index);
+       if (session)
+         vec_add1 (handles, session_handle (session));
        fifo = fifo->next;
       }
 
index 5756792..6e24d56 100644 (file)
@@ -124,13 +124,6 @@ session_program_transport_close (session_t * s)
   session_worker_t *wrk;
   session_event_t *evt;
 
-  if (!session_has_transport (s))
-    {
-      /* Polling may not be enabled on main thread so close now */
-      session_transport_close (s);
-      return;
-    }
-
   /* If we are in the handler thread, or being called with the worker barrier
    * held, just append a new event to pending disconnects vector. */
   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
@@ -483,7 +476,7 @@ session_notify_subscribers (u32 app_index, session_t * s,
  * @return 0 on success or negative number if failed to send notification.
  */
 static inline int
-session_enqueue_notify (session_t * s)
+session_enqueue_notify_inline (session_t * s)
 {
   app_worker_t *app_wrk;
 
@@ -512,6 +505,12 @@ session_enqueue_notify (session_t * s)
   return 0;
 }
 
+int
+session_enqueue_notify (session_t * s)
+{
+  return session_enqueue_notify_inline (s);
+}
+
 int
 session_dequeue_notify (session_t * s)
 {
@@ -560,7 +559,11 @@ session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
          errors++;
          continue;
        }
-      if (PREDICT_FALSE (session_enqueue_notify (s)))
+
+      if (svm_fifo_is_empty (s->rx_fifo))
+       continue;
+
+      if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
        errors++;
     }
 
index a7c9194..cea1b37 100644 (file)
@@ -187,6 +187,7 @@ typedef struct session_main_
 extern session_main_t session_main;
 extern vlib_node_registration_t session_queue_node;
 extern vlib_node_registration_t session_queue_process_node;
+extern vlib_node_registration_t session_queue_pre_input_node;
 
 #define SESSION_Q_PROCESS_FLUSH_FRAMES 1
 #define SESSION_Q_PROCESS_STOP         2
@@ -196,6 +197,9 @@ session_is_valid (u32 si, u8 thread_index)
 {
   session_t *s;
   s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
+  if (s->session_state == SESSION_STATE_CLOSED)
+    return 1;
+
   if (s->thread_index != thread_index || s->session_index != si)
     return 0;
   return 1;
@@ -331,6 +335,7 @@ void session_transport_close (session_t * s);
 void session_transport_cleanup (session_t * s);
 int session_send_io_evt_to_thread (svm_fifo_t * f,
                                   session_evt_type_t evt_type);
+int session_enqueue_notify (session_t * s);
 int session_dequeue_notify (session_t * s);
 int session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
                                          session_evt_type_t evt_type);
index 8ee25a9..525f637 100755 (executable)
@@ -149,54 +149,6 @@ send_del_segment_callback (u32 api_client_index, u64 segment_handle)
   return 0;
 }
 
-static int
-send_app_cut_through_registration_add (u32 api_client_index,
-                                      u32 wrk_map_index, u64 mq_addr,
-                                      u64 peer_mq_addr)
-{
-  vl_api_app_cut_through_registration_add_t *mp;
-  vl_api_registration_t *reg;
-  svm_msg_q_t *mq, *peer_mq;
-  int fds[2];
-
-  reg = vl_mem_api_client_index_to_registration (api_client_index);
-  if (!reg)
-    {
-      clib_warning ("no registration: %u", api_client_index);
-      return -1;
-    }
-
-  mp = vl_mem_api_alloc_as_if_client_w_reg (reg, sizeof (*mp));
-  clib_memset (mp, 0, sizeof (*mp));
-  mp->_vl_msg_id =
-    clib_host_to_net_u16 (VL_API_APP_CUT_THROUGH_REGISTRATION_ADD);
-
-  mp->evt_q_address = mq_addr;
-  mp->peer_evt_q_address = peer_mq_addr;
-  mp->wrk_index = wrk_map_index;
-
-  mq = uword_to_pointer (mq_addr, svm_msg_q_t *);
-  peer_mq = uword_to_pointer (peer_mq_addr, svm_msg_q_t *);
-
-  if (svm_msg_q_get_producer_eventfd (mq) != -1)
-    {
-      mp->fd_flags |= SESSION_FD_F_MQ_EVENTFD;
-      mp->n_fds = 2;
-      /* app will overwrite exactly the fds we pass here. So
-       * when we swap mq with peer_mq (accept vs connect) the
-       * fds will still be valid */
-      fds[0] = svm_msg_q_get_consumer_eventfd (mq);
-      fds[1] = svm_msg_q_get_producer_eventfd (peer_mq);
-    }
-
-  vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
-
-  if (mp->n_fds != 0)
-    session_send_fds (reg, fds, mp->n_fds);
-
-  return 0;
-}
-
 static int
 mq_try_lock_and_alloc_msg (svm_msg_q_t * app_mq, svm_msg_q_msg_t * msg)
 {
@@ -268,25 +220,17 @@ mq_send_session_accepted_cb (session_t * s)
     }
   else
     {
-      u8 main_thread = vlib_num_workers ()? 1 : 0;
       ct_connection_t *ct;
 
       ct = (ct_connection_t *) session_get_transport (s);
-      send_app_cut_through_registration_add (app_wrk->api_client_index,
-                                            app_wrk->wrk_map_index,
-                                            ct->server_evt_q,
-                                            ct->client_evt_q);
-
       listener = listen_session_get (s->listener_index);
       al = app_listener_get (app, listener->al_index);
       mp->listener_handle = app_listener_handle (al);
       mp->is_ip4 = session_type_is_ip4 (listener->session_type);
       mp->handle = session_handle (s);
       mp->port = ct->c_rmt_port;
-      vpp_queue = session_main_get_vpp_event_queue (main_thread);
+      vpp_queue = session_main_get_vpp_event_queue (0);
       mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
-      mp->client_event_queue_address = ct->client_evt_q;
-      mp->server_event_queue_address = ct->server_evt_q;
     }
   svm_msg_q_add_and_unlock (app_mq, msg);
 
@@ -415,26 +359,22 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
     }
   else
     {
-      u8 main_thread = vlib_num_workers ()? 1 : 0;
       ct_connection_t *cct;
       session_t *ss;
 
       cct = (ct_connection_t *) session_get_transport (s);
-      send_app_cut_through_registration_add (app_wrk->api_client_index,
-                                            app_wrk->wrk_map_index,
-                                            cct->client_evt_q,
-                                            cct->server_evt_q);
-
       mp->handle = session_handle (s);
       mp->lcl_port = cct->c_lcl_port;
-      vpp_mq = session_main_get_vpp_event_queue (main_thread);
+      mp->is_ip4 = cct->c_is_ip4;
+      vpp_mq = session_main_get_vpp_event_queue (0);
       mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
-      mp->client_event_queue_address = cct->client_evt_q;
-      mp->server_event_queue_address = cct->server_evt_q;
+      mp->server_rx_fifo = pointer_to_uword (s->rx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->tx_fifo);
+      mp->segment_handle = session_segment_handle (s);
       ss = ct_session_get_peer (s);
-      mp->server_rx_fifo = pointer_to_uword (ss->tx_fifo);
-      mp->server_tx_fifo = pointer_to_uword (ss->rx_fifo);
-      mp->segment_handle = session_segment_handle (ss);
+      mp->ct_rx_fifo = pointer_to_uword (ss->tx_fifo);
+      mp->ct_tx_fifo = pointer_to_uword (ss->rx_fifo);
+      mp->ct_segment_handle = session_segment_handle (ss);
     }
 
 done:
@@ -505,11 +445,20 @@ done:
   return 0;
 }
 
+static int
+mq_app_tx_callback (session_t * s)
+{
+  if (session_has_transport (s))
+    return 0;
+  return ct_session_tx (s);
+}
+
 static session_cb_vft_t session_mq_cb_vft = {
   .session_accept_callback = mq_send_session_accepted_cb,
   .session_disconnect_callback = mq_send_session_disconnected_cb,
   .session_connected_callback = mq_send_session_connected_cb,
   .session_reset_callback = mq_send_session_reset_cb,
+  .builtin_app_tx_callback = mq_app_tx_callback,
   .add_segment_callback = send_add_segment_callback,
   .del_segment_callback = send_del_segment_callback,
 };
index d0936c7..db5123b 100644 (file)
@@ -74,9 +74,9 @@ session_mq_accepted_reply_handler (void *data)
 
   if (!session_has_transport (s))
     {
+      s->session_state = SESSION_STATE_READY;
       if (ct_session_connect_notify (s))
        return;
-      s->session_state = SESSION_STATE_READY;
     }
   else
     {
@@ -1234,6 +1234,26 @@ VLIB_REGISTER_NODE (session_queue_process_node) =
 };
 /* *INDENT-ON* */
 
+static_always_inline uword
+session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
+                               vlib_frame_t * frame)
+{
+  session_main_t *sm = &session_main;
+  if (!sm->wrk[0].vpp_event_queue)
+    return 0;
+  return session_queue_node_fn (vm, node, frame);
+}
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (session_queue_pre_input_node) =
+{
+  .function = session_queue_pre_input_inline,
+  .type = VLIB_NODE_TYPE_PRE_INPUT,
+  .name = "session-queue-main",
+  .state = VLIB_NODE_STATE_DISABLED,
+};
+/* *INDENT-ON* */
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index c4240ab..9e51d69 100644 (file)
@@ -278,9 +278,7 @@ session_parse_handle (session_handle_t handle, u32 * index,
 typedef enum
 {
   SESSION_IO_EVT_RX,
-  SESSION_IO_EVT_CT_RX,
   SESSION_IO_EVT_TX,
-  SESSION_IO_EVT_CT_TX,
   SESSION_IO_EVT_TX_FLUSH,
   SESSION_IO_EVT_BUILTIN_RX,
   SESSION_IO_EVT_BUILTIN_TX,
index 71ad902..9a8662d 100644 (file)
@@ -259,8 +259,8 @@ class LDPCutThruTestCase(VCLTestCase):
                                               self.server_port]
 
     def tearDown(self):
+        self.logger.debug(self.vapi.cli("show session verbose 2"))
         self.cut_thru_tear_down()
-
         super(LDPCutThruTestCase, self).tearDown()
 
     @unittest.skipUnless(running_extended_tests, "part of extended tests")