session: support local sessions and deprecate redirects 75/10475/17
authorFlorin Coras <fcoras@cisco.com>
Thu, 8 Feb 2018 23:10:09 +0000 (15:10 -0800)
committerFlorin Coras <fcoras@cisco.com>
Wed, 14 Feb 2018 08:54:43 +0000 (00:54 -0800)
Memfd backed shared memory segments can only be negotiated over sockets.
For such scenarios, the existing redirect mechanism that establishes
cut-through sessions does not work anymore as the two peer application
do not share such a socket.

This patch adds support for local sessions, as opposed to sessions
backed by a transport connection, in a way that is almost transparent to
the two applications by reusing the existing binary api messages.
Moreover, all segment allocations are now entirely done through the
segment manager valloc, so segment overlaps due to independent
allocations previously required for redirects are completely avoided.
The one notable characteristic of local sessions (cut-through from app
perspective) notification messages is that they carry pointers to two
event queues, one for each app peer, instead of one. For
transport-backed sessions one of the queues can be inferred but for
local session they cannot.

Change-Id: Ia443fb63e2d9d8e43490275062a708f039038175
Signed-off-by: Florin Coras <fcoras@cisco.com>
24 files changed:
src/svm/ssvm.c
src/svm/svm_fifo_segment.h
src/tests/vnet/session/tcp_echo.c
src/tests/vnet/session/udp_echo.c
src/vcl.am
src/vnet/session-apps/echo_server.c
src/vnet/session-apps/http_server.c
src/vnet/session-apps/proxy.c
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_interface.c
src/vnet/session/application_interface.h
src/vnet/session/segment_manager.c
src/vnet/session/segment_manager.h
src/vnet/session/session.api
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/session/session_lookup.c
src/vnet/session/session_lookup.h
src/vnet/session/session_table.h
src/vnet/session/session_test.c
src/vnet/session/stream_session.h
src/vnet/session/transport.h
test/test_session.py

index 7077f8b..04e0efa 100644 (file)
@@ -33,7 +33,7 @@ ssvm_master_init_shm (ssvm_private_t * ssvm)
   clib_mem_vm_map_t mapa = { 0 };
   u8 junk = 0, *ssvm_filename;
   ssvm_shared_header_t *sh;
-  uword page_size;
+  uword page_size, requested_va = 0;
   void *oldheap;
 
   if (ssvm->ssvm_size == 0)
@@ -75,9 +75,12 @@ ssvm_master_init_shm (ssvm_private_t * ssvm)
 
   page_size = clib_mem_vm_get_page_size (ssvm_fd);
   if (ssvm->requested_va)
-    clib_mem_vm_randomize_va (&ssvm->requested_va, min_log2 (page_size));
+    {
+      requested_va = ssvm->requested_va;
+      clib_mem_vm_randomize_va (&requested_va, min_log2 (page_size));
+    }
 
-  mapa.requested_va = ssvm->requested_va;
+  mapa.requested_va = requested_va;
   mapa.size = ssvm->ssvm_size;
   mapa.fd = ssvm_fd;
   if (clib_mem_vm_ext_map (&mapa))
index bf8d513..1872da1 100644 (file)
@@ -31,8 +31,8 @@ typedef enum
 #define FIFO_SEGMENT_MAX_FIFO_SIZE (8<<20)     /* 8mb max fifo size */
 #define FIFO_SEGMENT_ALLOC_CHUNK_SIZE 32       /* Allocation quantum */
 
-#define FIFO_SEGMENT_F_IS_PREALLOCATED 1 << 0  /* Segment is preallocated */
-#define FIFO_SEGMENT_F_WILL_DELETE     1 << 1  /* Segment will be removed */
+#define FIFO_SEGMENT_F_IS_PREALLOCATED (1 << 0)
+#define FIFO_SEGMENT_F_WILL_DELETE     (1 << 1)
 
 typedef struct
 {
index d48c5d9..a925384 100644 (file)
@@ -479,6 +479,7 @@ vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
   rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
   rmp->retval = rv;
   rmp->handle = mp->handle;
+  rmp->context = mp->context;
   vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
 
   if (session)
index e425169..d1363fc 100644 (file)
@@ -126,6 +126,8 @@ typedef struct
   svm_fifo_segment_main_t *segment_main;
 
   u8 *connect_test_data;
+
+  uword *segments_table;
 } uri_udp_test_main_t;
 
 #if CLIB_DEBUG > 0
@@ -173,8 +175,9 @@ application_send_attach (uri_udp_test_main_t * utm)
   bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
   bmp->client_index = utm->my_client_index;
   bmp->context = ntohl (0xfeedface);
-  bmp->options[APP_OPTIONS_FLAGS] =
-    APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
+  bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
   bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 2;
   bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = fifo_size;
   bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = fifo_size;
@@ -308,7 +311,7 @@ cut_through_thread_fn (void *arg)
       /* We read from the tx fifo and write to the rx fifo */
       do
        {
-         actual_transfer = svm_fifo_dequeue_nowait (tx_fifo,
+         actual_transfer = svm_fifo_dequeue_nowait (rx_fifo,
                                                     vec_len (my_copy_buffer),
                                                     my_copy_buffer);
        }
@@ -319,7 +322,7 @@ cut_through_thread_fn (void *arg)
       buffer_offset = 0;
       while (actual_transfer > 0)
        {
-         rv = svm_fifo_enqueue_nowait (rx_fifo, actual_transfer,
+         rv = svm_fifo_enqueue_nowait (tx_fifo, actual_transfer,
                                        my_copy_buffer + buffer_offset);
          if (rv > 0)
            {
@@ -605,7 +608,10 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
 static void
 vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
 {
+  uri_udp_test_main_t *utm = &uri_udp_test_main;
   svm_fifo_segment_create_args_t _a, *a = &_a;
+  svm_fifo_segment_private_t *seg;
+  u8 *seg_name;
   int rv;
 
   memset (a, 0, sizeof (*a));
@@ -619,8 +625,35 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
                    mp->segment_name);
       return;
     }
-  clib_warning ("Mapped new segment '%s' size %d", mp->segment_name,
-               mp->segment_size);
+  seg = svm_fifo_segment_get_segment (a->new_segment_indices[0]);
+  clib_warning ("Mapped new segment '%s' size %d", seg->ssvm.name,
+               seg->ssvm.ssvm_size);
+  seg_name = format (0, "%s", (char *) mp->segment_name);
+  hash_set_mem (utm->segments_table, seg_name, a->new_segment_indices[0]);
+  vec_free (seg_name);
+}
+
+static void
+vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp)
+{
+  uri_udp_test_main_t *utm = &uri_udp_test_main;
+  svm_fifo_segment_private_t *seg;
+  u64 *seg_indexp;
+  u8 *seg_name;
+
+
+  seg_name = format (0, "%s", mp->segment_name);
+  seg_indexp = hash_get_mem (utm->segments_table, seg_name);
+  if (!seg_indexp)
+    {
+      clib_warning ("segment not mapped: %s", seg_name);
+      return;
+    }
+  hash_unset_mem (utm->segments_table, seg_name);
+  seg = svm_fifo_segment_get_segment ((u32) seg_indexp[0]);
+  svm_fifo_segment_delete (seg);
+  clib_warning ("Unmapped segment '%s'", seg_name);
+  vec_free (seg_name);
 }
 
 /**
@@ -720,6 +753,8 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   svm_fifo_t *rx_fifo, *tx_fifo;
   session_t *session;
   static f64 start_time;
+  u32 session_index;
+  int rv = 0;
 
   if (start_time == 0.0)
     start_time = clib_time_now (&utm->clib_time);
@@ -727,19 +762,42 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   utm->vpp_event_queue =
     uword_to_pointer (mp->vpp_event_queue_address, svm_queue_t *);
 
-  pool_get (utm->sessions, session);
-
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session - utm->sessions;
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session - utm->sessions;
 
-  session->server_rx_fifo = rx_fifo;
-  session->server_tx_fifo = tx_fifo;
+  pool_get (utm->sessions, session);
+  memset (session, 0, sizeof (*session));
+  session_index = session - utm->sessions;
 
-  hash_set (utm->session_index_by_vpp_handles, mp->handle,
-           session - utm->sessions);
+  /* Cut-through case */
+  if (mp->server_event_queue_address)
+    {
+      clib_warning ("cut-through session");
+      utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address,
+                                              svm_queue_t *);
+      rx_fifo->master_session_index = session_index;
+      tx_fifo->master_session_index = session_index;
+      utm->cut_through_session_index = session_index;
+      session->server_rx_fifo = rx_fifo;
+      session->server_tx_fifo = tx_fifo;
+
+      rv = pthread_create (&utm->cut_through_thread_handle,
+                          NULL /*attr */ , cut_through_thread_fn, 0);
+      if (rv)
+       {
+         clib_warning ("pthread_create returned %d", rv);
+         rv = VNET_API_ERROR_SYSCALL_ERROR_1;
+       }
+    }
+  else
+    {
+      rx_fifo->client_session_index = session_index;
+      tx_fifo->client_session_index = session_index;
+      session->server_rx_fifo = rx_fifo;
+      session->server_tx_fifo = tx_fifo;
+    }
 
+  hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index);
   if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
     {
       f64 now = clib_time_now (&utm->clib_time);
@@ -753,6 +811,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
   rmp->handle = mp->handle;
   rmp->context = mp->context;
+  rmp->retval = rv;
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
 
   CLIB_MEMORY_BARRIER ();
@@ -787,6 +846,7 @@ vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
   rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
   rmp->retval = rv;
   rmp->handle = mp->handle;
+  rmp->context = mp->context;
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
 }
 
@@ -804,34 +864,6 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
       return;
     }
 
-  /* We've been redirected */
-  if (mp->segment_name_length > 0)
-    {
-      svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
-      svm_fifo_segment_create_args_t _a, *a = &_a;
-      u32 segment_index;
-      svm_fifo_segment_private_t *seg;
-      int rv;
-
-      memset (a, 0, sizeof (*a));
-      a->segment_name = (char *) mp->segment_name;
-
-      sleep (1);
-
-      rv = svm_fifo_segment_attach (a);
-      if (rv)
-       {
-         clib_warning ("sm_fifo_segment_create ('%v') failed",
-                       mp->segment_name);
-         return;
-       }
-
-      segment_index = a->new_segment_indices[0];
-      vec_add2 (utm->seg, seg, 1);
-      memcpy (seg, sm->segments + segment_index, sizeof (*seg));
-      sleep (1);
-    }
-
   pool_get (utm->sessions, session);
   session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo,
                                              svm_fifo_t *);
@@ -840,8 +872,16 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
                                              svm_fifo_t *);
   ASSERT (session->server_tx_fifo);
 
-  if (mp->segment_name_length > 0)
-    utm->cut_through_session_index = session - utm->sessions;
+  /* Cut-through case */
+  if (mp->client_event_queue_address)
+    {
+      clib_warning ("cut-through session");
+      utm->cut_through_session_index = session - utm->sessions;
+      utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+                                              svm_queue_t *);
+      utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address,
+                                              svm_queue_t *);
+    }
   else
     {
       utm->connected_session = session - utm->sessions;
@@ -859,6 +899,7 @@ _(UNBIND_URI_REPLY, unbind_uri_reply)               \
 _(ACCEPT_SESSION, accept_session)                      \
 _(DISCONNECT_SESSION, disconnect_session)              \
 _(MAP_ANOTHER_SEGMENT, map_another_segment)            \
+_(UNMAP_SEGMENT, unmap_segment)                                \
 _(APPLICATION_ATTACH_REPLY, application_attach_reply)  \
 _(APPLICATION_DETACH_REPLY, application_detach_reply)  \
 
@@ -1068,6 +1109,7 @@ main (int argc, char **argv)
   utm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
   utm->my_pid = getpid ();
   utm->configured_segment_size = 1 << 20;
+  utm->segments_table = hash_create_vec (0, sizeof (u8), sizeof (u64));
 
   clib_time_init (&utm->clib_time);
   init_error_string_table (utm);
index f090041..5f48f6d 100644 (file)
@@ -68,3 +68,5 @@ sock_test_client_SOURCES = vcl/sock_test_client.c
 
 nobase_include_HEADERS +=                      \
   vcl/sock_test.h
+
+# vi:syntax=automake
index 024ffa6..0d03508 100644 (file)
@@ -245,7 +245,6 @@ static session_cb_vft_t echo_server_session_cb_vft = {
   .session_disconnect_callback = echo_server_session_disconnect_callback,
   .session_connected_callback = echo_server_session_connected_callback,
   .add_segment_callback = echo_server_add_segment_callback,
-  .redirect_connect_callback = echo_server_redirect_connect_callback,
   .builtin_server_rx_callback = echo_server_rx_callback,
   .session_reset_callback = echo_server_session_reset_callback
 };
@@ -387,7 +386,7 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
   u8 server_uri_set = 0, *appns_id = 0;
   u64 tmp, appns_flags = 0, appns_secret = 0;
   char *default_uri = "tcp://0.0.0.0/1234";
-  int rv;
+  int rv, is_stop = 0;
 
   esm->no_echo = 0;
   esm->fifo_size = 64 << 10;
@@ -431,11 +430,29 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
        appns_flags |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
       else if (unformat (input, "secret %lu", &appns_secret))
        ;
+      else if (unformat (input, "stop"))
+       is_stop = 1;
       else
        return clib_error_return (0, "failed: unknown input `%U'",
                                  format_unformat_error, input);
     }
 
+  if (is_stop)
+    {
+      if (esm->app_index == (u32) ~ 0)
+       {
+         clib_warning ("server not running");
+         return clib_error_return (0, "failed: server not running");
+       }
+      rv = echo_server_detach ();
+      if (rv)
+       {
+         clib_warning ("failed: detach");
+         return clib_error_return (0, "failed: server detach %d", rv);
+       }
+      return 0;
+    }
+
   vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
 
   if (!server_uri_set)
index 07eaab4..eeb755b 100644 (file)
@@ -471,19 +471,11 @@ http_server_add_segment_callback (u32 client_index, const ssvm_private_t * sp)
   return -1;
 }
 
-static int
-http_server_redirect_connect_callback (u32 client_index, void *mp)
-{
-  clib_warning ("called...");
-  return -1;
-}
-
 static session_cb_vft_t http_server_session_cb_vft = {
   .session_accept_callback = http_server_session_accept_callback,
   .session_disconnect_callback = http_server_session_disconnect_callback,
   .session_connected_callback = http_server_session_connected_callback,
   .add_segment_callback = http_server_add_segment_callback,
-  .redirect_connect_callback = http_server_redirect_connect_callback,
   .builtin_server_rx_callback = http_server_rx_callback,
   .session_reset_callback = http_server_session_reset_callback
 };
index 2fdb63f..1cbacdb 100644 (file)
@@ -145,13 +145,6 @@ proxy_add_segment_callback (u32 client_index, const ssvm_private_t * sp)
   return -1;
 }
 
-static int
-proxy_redirect_connect_callback (u32 client_index, void *mp)
-{
-  clib_warning ("called...");
-  return -1;
-}
-
 static int
 proxy_rx_callback (stream_session_t * s)
 {
@@ -239,7 +232,6 @@ static session_cb_vft_t proxy_session_cb_vft = {
   .session_disconnect_callback = proxy_disconnect_callback,
   .session_connected_callback = proxy_connected_callback,
   .add_segment_callback = proxy_add_segment_callback,
-  .redirect_connect_callback = proxy_redirect_connect_callback,
   .builtin_server_rx_callback = proxy_rx_callback,
   .session_reset_callback = proxy_reset_callback
 };
index 71fc93f..9020d1c 100644 (file)
@@ -28,11 +28,6 @@ static application_t *app_pool;
  */
 static uword *app_by_api_client_index;
 
-/**
- * Default application event queue size
- */
-static u32 default_app_evt_queue_size = 128;
-
 static u8 *
 app_get_name_from_reg_index (application_t * app)
 {
@@ -138,6 +133,7 @@ application_new ()
   app->index = application_get_index (app);
   app->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
   app->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
+  app->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
   if (CLIB_DEBUG > 1)
     clib_warning ("[%d] New app (%d)", getpid (), app->index);
   return app;
@@ -147,8 +143,8 @@ void
 application_del (application_t * app)
 {
   vnet_unbind_args_t _a, *a = &_a;
-  segment_manager_t *sm;
   u64 handle, *handles = 0;
+  segment_manager_t *sm;
   u32 index;
   int i;
 
@@ -207,6 +203,12 @@ application_del (application_t * app)
          segment_manager_del (sm);
        }
     }
+
+  /*
+   * Local connections cleanup
+   */
+  application_local_sessions_del (app);
+
   application_table_del (app);
   pool_put (app_pool, app);
 }
@@ -255,8 +257,8 @@ int
 application_init (application_t * app, u32 api_client_index, u64 * options,
                  session_cb_vft_t * cb_fns)
 {
-  u32 app_evt_queue_size, first_seg_size, prealloc_fifo_pairs;
   ssvm_segment_type_t seg_type = SSVM_SEGMENT_MEMFD;
+  u32 first_seg_size, prealloc_fifo_pairs;
   segment_manager_properties_t *props;
   vl_api_registration_t *reg;
   segment_manager_t *sm;
@@ -298,15 +300,14 @@ application_init (application_t * app, u32 api_client_index, u64 * options,
     props->rx_fifo_size = options[APP_OPTIONS_RX_FIFO_SIZE];
   if (options[APP_OPTIONS_TX_FIFO_SIZE])
     props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE];
+  if (options[APP_OPTIONS_EVT_QUEUE_SIZE])
+    props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE];
   props->segment_type = seg_type;
 
-  app_evt_queue_size = options[APP_OPTIONS_EVT_QUEUE_SIZE] > 0 ?
-    options[APP_OPTIONS_EVT_QUEUE_SIZE] : default_app_evt_queue_size;
   first_seg_size = options[APP_OPTIONS_SEGMENT_SIZE];
   prealloc_fifo_pairs = options[APP_OPTIONS_PREALLOC_FIFO_PAIRS];
 
-  if ((rv = segment_manager_init (sm, first_seg_size, app_evt_queue_size,
-                                 prealloc_fifo_pairs)))
+  if ((rv = segment_manager_init (sm, first_seg_size, prealloc_fifo_pairs)))
     return rv;
   sm->first_is_protected = 1;
 
@@ -319,6 +320,7 @@ application_init (application_t * app, u32 api_client_index, u64 * options,
   app->cb_fns = *cb_fns;
   app->ns_index = options[APP_OPTIONS_NAMESPACE];
   app->listeners_table = hash_create (0, sizeof (u64));
+  app->local_connects = hash_create (0, sizeof (u64));
   app->proxied_transports = options[APP_OPTIONS_PROXY_TRANSPORT];
   app->event_queue = segment_manager_event_queue (sm);
 
@@ -333,6 +335,13 @@ application_init (application_t * app, u32 api_client_index, u64 * options,
   /* Add app to lookup by api_client_index table */
   application_table_add (app);
 
+  /*
+   * Segment manager for local sessions
+   */
+  sm = segment_manager_new ();
+  sm->app_index = app->index;
+  app->local_segment_manager = segment_manager_index (sm);
+
   return 0;
 }
 
@@ -388,11 +397,11 @@ application_alloc_segment_manager (application_t * app)
  */
 int
 application_start_listen (application_t * srv, session_endpoint_t * sep,
-                         u64 * res)
+                         session_handle_t * res)
 {
   segment_manager_t *sm;
   stream_session_t *s;
-  u64 handle;
+  session_handle_t handle;
   session_type_t sst;
 
   sst = session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4);
@@ -425,7 +434,7 @@ err:
  * Stop listening on session associated to handle
  */
 int
-application_stop_listen (application_t * srv, u64 handle)
+application_stop_listen (application_t * srv, session_handle_t handle)
 {
   stream_session_t *listener;
   uword *indexp;
@@ -499,6 +508,26 @@ application_get_listen_segment_manager (application_t * app,
   return segment_manager_get (*smp);
 }
 
+segment_manager_t *
+application_get_local_segment_manager (application_t * app)
+{
+  return segment_manager_get (app->local_segment_manager);
+}
+
+segment_manager_t *
+application_get_local_segment_manager_w_session (application_t * app,
+                                                local_session_t * ls)
+{
+  stream_session_t *listener;
+  if (application_local_session_listener_has_transport (ls))
+    {
+      listener = listen_session_get (ls->listener_session_type,
+                                    ls->listener_index);
+      return application_get_listen_segment_manager (app, listener);
+    }
+  return segment_manager_get (app->local_segment_manager);
+}
+
 int
 application_is_proxy (application_t * app)
 {
@@ -731,12 +760,407 @@ application_get_segment_manager_properties (u32 app_index)
   return &app->sm_properties;
 }
 
+local_session_t *
+application_alloc_local_session (application_t * app)
+{
+  local_session_t *s;
+  pool_get (app->local_sessions, s);
+  memset (s, 0, sizeof (*s));
+  s->app_index = app->index;
+  s->session_index = s - app->local_sessions;
+  s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
+  return s;
+}
+
+void
+application_free_local_session (application_t * app, local_session_t * s)
+{
+  pool_put (app->local_sessions, s);
+  if (CLIB_DEBUG)
+    memset (s, 0xfc, sizeof (*s));
+}
+
+local_session_t *
+application_get_local_session (application_t * app, u32 session_index)
+{
+  return pool_elt_at_index (app->local_sessions, session_index);
+}
+
+local_session_t *
+application_get_local_session_from_handle (session_handle_t handle)
+{
+  application_t *server;
+  u32 session_index, server_index;
+  local_session_parse_handle (handle, &server_index, &session_index);
+  server = application_get (server_index);
+  return application_get_local_session (server, session_index);
+}
+
+always_inline void
+application_local_listener_session_endpoint (local_session_t * ll,
+                                            session_endpoint_t * sep)
+{
+  sep->transport_proto =
+    session_type_transport_proto (ll->listener_session_type);
+  sep->port = ll->port;
+  sep->is_ip4 = ll->listener_session_type & 1;
+}
+
+int
+application_start_local_listen (application_t * server,
+                               session_endpoint_t * sep,
+                               session_handle_t * handle)
+{
+  session_handle_t lh;
+  local_session_t *ll;
+  u32 table_index;
+
+  table_index = application_local_session_table (server);
+
+  /* An exact sep match, as opposed to session_lookup_local_listener */
+  lh = session_lookup_endpoint_listener (table_index, sep, 1);
+  if (lh != SESSION_INVALID_HANDLE)
+    return VNET_API_ERROR_ADDRESS_IN_USE;
+
+  pool_get (server->local_listen_sessions, ll);
+  memset (ll, 0, sizeof (*ll));
+  ll->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
+  ll->app_index = server->index;
+  ll->session_index = ll - server->local_listen_sessions;
+  ll->port = sep->port;
+  /* Store the original session type for the unbind */
+  ll->listener_session_type =
+    session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4);
+
+  *handle = application_local_session_handle (ll);
+  session_lookup_add_session_endpoint (table_index, sep, *handle);
+
+  return 0;
+}
+
+/**
+ * Clean up local session table. If we have a listener session use it to
+ * find the port and proto. If not, the handle must be a local table handle
+ * so parse it.
+ */
+int
+application_stop_local_listen (application_t * server, session_handle_t lh)
+{
+  session_endpoint_t sep = SESSION_ENDPOINT_NULL;
+  u32 table_index, ll_index, server_index;
+  stream_session_t *sl = 0;
+  local_session_t *ll, *ls;
+
+  table_index = application_local_session_table (server);
+
+  /* We have both local and global table binds. Figure from global what
+   * the sep we should be cleaning up is.
+   */
+  if (!session_handle_is_local (lh))
+    {
+      sl = listen_session_get_from_handle (lh);
+      if (!sl || listen_session_get_local_session_endpoint (sl, &sep))
+       {
+         clib_warning ("broken listener");
+         return -1;
+       }
+      lh = session_lookup_endpoint_listener (table_index, &sep, 0);
+      if (lh == SESSION_INVALID_HANDLE)
+       return -1;
+    }
+
+  local_session_parse_handle (lh, &server_index, &ll_index);
+  ASSERT (server->index == server_index);
+  if (!(ll = application_get_local_listen_session (server, ll_index)))
+    {
+      clib_warning ("no local listener");
+      return -1;
+    }
+  application_local_listener_session_endpoint (ll, &sep);
+  session_lookup_del_session_endpoint (table_index, &sep);
+
+  /* *INDENT-OFF* */
+  pool_foreach (ls, server->local_sessions, ({
+    if (ls->listener_index == ll->session_index)
+      application_local_session_disconnect (server->index, ls);
+  }));
+  /* *INDENT-ON* */
+  pool_put_index (server->local_listen_sessions, ll->session_index);
+
+  return 0;
+}
+
+int
+application_local_session_connect (u32 table_index, application_t * client,
+                                  application_t * server,
+                                  local_session_t * ll, u32 opaque)
+{
+  u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
+  segment_manager_properties_t *props, *cprops;
+  int rv, has_transport, seg_index;
+  svm_fifo_segment_private_t *seg;
+  segment_manager_t *sm;
+  local_session_t *ls;
+  svm_queue_t *sq, *cq;
+
+  ls = application_alloc_local_session (server);
+
+  props = application_segment_manager_properties (server);
+  cprops = application_segment_manager_properties (client);
+  evt_q_elts = props->evt_q_size + cprops->evt_q_size;
+  evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t);
+  seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
+
+  has_transport = session_has_transport ((stream_session_t *) ll);
+  if (!has_transport)
+    {
+      /* Local sessions don't have backing transport */
+      ls->port = ll->port;
+      sm = application_get_local_segment_manager (server);
+    }
+  else
+    {
+      stream_session_t *sl = (stream_session_t *) ll;
+      transport_connection_t *tc;
+      tc = listen_session_get_transport (sl);
+      ls->port = tc->lcl_port;
+      sm = application_get_listen_segment_manager (server, sl);
+    }
+
+  seg_index = segment_manager_add_segment (sm, seg_size);
+  if (seg_index < 0)
+    {
+      clib_warning ("failed to add new cut-through segment");
+      return seg_index;
+    }
+  seg = segment_manager_get_segment_w_lock (sm, seg_index);
+  sq = segment_manager_alloc_queue (seg, props->evt_q_size);
+  cq = segment_manager_alloc_queue (seg, cprops->evt_q_size);
+  ls->server_evt_q = pointer_to_uword (sq);
+  ls->client_evt_q = pointer_to_uword (cq);
+  rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
+                                       props->tx_fifo_size,
+                                       &ls->server_rx_fifo,
+                                       &ls->server_tx_fifo);
+  if (rv)
+    {
+      clib_warning ("failed to add fifos in cut-through segment");
+      segment_manager_segment_reader_unlock (sm);
+      goto failed;
+    }
+  ls->server_rx_fifo->master_session_index = ls->session_index;
+  ls->server_tx_fifo->master_session_index = ls->session_index;
+  ls->server_rx_fifo->master_thread_index = ~0;
+  ls->server_tx_fifo->master_thread_index = ~0;
+  ls->svm_segment_index = seg_index;
+  ls->listener_index = ll->session_index;
+  ls->client_index = client->index;
+  ls->client_opaque = opaque;
+  ls->listener_session_type = ll->session_type;
+
+  if ((rv = server->cb_fns.add_segment_callback (server->api_client_index,
+                                                &seg->ssvm)))
+    {
+      clib_warning ("failed to notify server of new segment");
+      segment_manager_segment_reader_unlock (sm);
+      goto failed;
+    }
+  segment_manager_segment_reader_unlock (sm);
+  if ((rv = server->cb_fns.session_accept_callback ((stream_session_t *) ls)))
+    {
+      clib_warning ("failed to send accept cut-through notify to server");
+      goto failed;
+    }
+  if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN)
+    application_local_session_connect_notify (ls);
+
+  return 0;
+
+failed:
+  if (!has_transport)
+    segment_manager_del_segment (sm, seg);
+  return rv;
+}
+
+static uword
+application_client_local_connect_key (local_session_t * ls)
+{
+  return ((uword) ls->app_index << 32 | (uword) ls->session_index);
+}
+
+static void
+application_client_local_connect_key_parse (uword key, u32 * app_index,
+                                           u32 * session_index)
+{
+  *app_index = key >> 32;
+  *session_index = key & 0xFFFFFFFF;
+}
+
+int
+application_local_session_connect_notify (local_session_t * ls)
+{
+  svm_fifo_segment_private_t *seg;
+  application_t *client, *server;
+  segment_manager_t *sm;
+  int rv, is_fail = 0;
+  uword client_key;
+
+  client = application_get (ls->client_index);
+  server = application_get (ls->app_index);
+  sm = application_get_local_segment_manager_w_session (server, ls);
+  seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index);
+  if ((rv = client->cb_fns.add_segment_callback (client->api_client_index,
+                                                &seg->ssvm)))
+    {
+      clib_warning ("failed to notify client %u of new segment",
+                   ls->client_index);
+      segment_manager_segment_reader_unlock (sm);
+      application_local_session_disconnect (ls->client_index, ls);
+      is_fail = 1;
+    }
+  else
+    {
+      segment_manager_segment_reader_unlock (sm);
+    }
+
+  client->cb_fns.session_connected_callback (client->index, ls->client_opaque,
+                                            (stream_session_t *) ls,
+                                            is_fail);
+
+  client_key = application_client_local_connect_key (ls);
+  hash_set (client->local_connects, client_key, client_key);
+  return 0;
+}
+
+int
+application_local_session_disconnect (u32 app_index, local_session_t * ls)
+{
+  svm_fifo_segment_private_t *seg;
+  application_t *client, *server;
+  segment_manager_t *sm;
+  uword client_key;
+
+  client = application_get_if_valid (ls->client_index);
+  server = application_get (ls->app_index);
+
+  if (ls->session_state == SESSION_STATE_CLOSED)
+    {
+    cleanup:
+      client_key = application_client_local_connect_key (ls);
+      sm = application_get_local_segment_manager_w_session (server, ls);
+      seg = segment_manager_get_segment (sm, ls->svm_segment_index);
+
+      if (client)
+       {
+         hash_unset (client->local_connects, client_key);
+         client->cb_fns.del_segment_callback (client->api_client_index,
+                                              &seg->ssvm);
+       }
+
+      server->cb_fns.del_segment_callback (server->api_client_index,
+                                          &seg->ssvm);
+      segment_manager_del_segment (sm, seg);
+      application_free_local_session (server, ls);
+      return 0;
+    }
+
+  if (app_index == ls->client_index)
+    {
+      send_local_session_disconnect_callback (ls->app_index, ls);
+    }
+  else
+    {
+      if (!client)
+       {
+         goto cleanup;
+       }
+      else if (ls->session_state < SESSION_STATE_READY)
+       {
+         client->cb_fns.session_connected_callback (client->index,
+                                                    ls->client_opaque,
+                                                    (stream_session_t *) ls,
+                                                    1 /* is_fail */ );
+         ls->session_state = SESSION_STATE_CLOSED;
+         goto cleanup;
+       }
+      else
+       {
+         send_local_session_disconnect_callback (ls->client_index, ls);
+       }
+    }
+
+  ls->session_state = SESSION_STATE_CLOSED;
+
+  return 0;
+}
+
+void
+application_local_sessions_del (application_t * app)
+{
+  u32 index, server_index, session_index, table_index;
+  segment_manager_t *sm;
+  u64 handle, *handles = 0;
+  local_session_t *ls, *ll;
+  application_t *server;
+  session_endpoint_t sep;
+  int i;
+
+  /*
+   * Local listens. Don't bother with local sessions, we clean them lower
+   */
+  table_index = application_local_session_table (app);
+  /* *INDENT-OFF* */
+  pool_foreach (ll, app->local_listen_sessions, ({
+    application_local_listener_session_endpoint (ll, &sep);
+    session_lookup_del_session_endpoint (table_index, &sep);
+  }));
+  /* *INDENT-ON* */
+
+  /*
+   * Local sessions
+   */
+  if (app->local_sessions)
+    {
+      /* *INDENT-OFF* */
+      pool_foreach (ls, app->local_sessions, ({
+       application_local_session_disconnect (app->index, ls);
+      }));
+      /* *INDENT-ON* */
+    }
+
+  /*
+   * Local connects
+   */
+  vec_reset_length (handles);
+  /* *INDENT-OFF* */
+  hash_foreach (handle, index, app->local_connects, ({
+    vec_add1 (handles, handle);
+  }));
+  /* *INDENT-ON* */
+
+  for (i = 0; i < vec_len (handles); i++)
+    {
+      application_client_local_connect_key_parse (handles[i], &server_index,
+                                                 &session_index);
+      server = application_get_if_valid (server_index);
+      if (server)
+       {
+         ls = application_get_local_session (server, session_index);
+         application_local_session_disconnect (app->index, ls);
+       }
+    }
+
+  sm = segment_manager_get (app->local_segment_manager);
+  sm->app_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
+  segment_manager_del (sm);
+}
+
 u8 *
 format_application_listener (u8 * s, va_list * args)
 {
   application_t *app = va_arg (*args, application_t *);
   u64 handle = va_arg (*args, u64);
-  u32 index = va_arg (*args, u32);
+  u32 sm_index = va_arg (*args, u32);
   int verbose = va_arg (*args, int);
   stream_session_t *listener;
   u8 *app_name, *str;
@@ -759,7 +1183,7 @@ format_application_listener (u8 * s, va_list * args)
   if (verbose)
     {
       s = format (s, "%-40s%-20s%-15u%-15u%-10u", str, app_name,
-                 app->api_client_index, handle, index);
+                 app->api_client_index, handle, sm_index);
     }
   else
     s = format (s, "%-40s%-20s", str, app_name);
@@ -832,6 +1256,76 @@ application_format_connects (application_t * app, int verbose)
   vec_free (app_name);
 }
 
+void
+application_format_local_sessions (application_t * app, int verbose)
+{
+  vlib_main_t *vm = vlib_get_main ();
+  local_session_t *ls;
+  transport_proto_t tp;
+  u8 *conn = 0;
+
+  /* Header */
+  if (app == 0)
+    {
+      vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp",
+                      "ClientApp");
+      return;
+    }
+
+  /* *INDENT-OFF* */
+  pool_foreach (ls, app->local_listen_sessions, ({
+    tp = session_type_transport_proto(ls->listener_session_type);
+    conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
+                   ls->port);
+    vlib_cli_output (vm, "%-40v%-15u%-20s", conn, ls->app_index, "*");
+    vec_reset_length (conn);
+  }));
+  pool_foreach (ls, app->local_sessions, ({
+    tp = session_type_transport_proto(ls->listener_session_type);
+    conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
+                   ls->port);
+    vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_index,
+                     ls->client_index);
+    vec_reset_length (conn);
+  }));
+  /* *INDENT-ON* */
+
+  vec_free (conn);
+}
+
+void
+application_format_local_connects (application_t * app, int verbose)
+{
+  vlib_main_t *vm = vlib_get_main ();
+  u32 app_index, session_index;
+  application_t *server;
+  local_session_t *ls;
+  uword client_key;
+  u64 value;
+
+  /* Header */
+  if (app == 0)
+    {
+      if (verbose)
+       vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App",
+                        "Peer App", "SegManager");
+      else
+       vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App",
+                        "Peer App");
+      return;
+    }
+
+  /* *INDENT-OFF* */
+  hash_foreach (client_key, value, app->local_connects, ({
+    application_client_local_connect_key_parse (client_key, &app_index,
+                                                &session_index);
+    server = application_get (app_index);
+    ls = application_get_local_session (server, session_index);
+    vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_index, ls->client_index);
+  }));
+  /* *INDENT-ON* */
+}
+
 u8 *
 format_application (u8 * s, va_list * args)
 {
@@ -869,13 +1363,92 @@ format_application (u8 * s, va_list * args)
   return s;
 }
 
+
+void
+application_format_all_listeners (vlib_main_t * vm, int do_local, int verbose)
+{
+  application_t *app;
+  u32 sm_index;
+  u64 handle;
+
+  if (!pool_elts (app_pool))
+    {
+      vlib_cli_output (vm, "No active server bindings");
+      return;
+    }
+
+  if (do_local)
+    {
+      application_format_local_sessions (0, verbose);
+      /* *INDENT-OFF* */
+      pool_foreach (app, app_pool, ({
+        if (!pool_elts (app->local_sessions)
+            && !pool_elts(app->local_connects))
+          continue;
+        application_format_local_sessions (app, verbose);
+      }));
+      /* *INDENT-ON* */
+    }
+  else
+    {
+      vlib_cli_output (vm, "%U", format_application_listener, 0 /* header */ ,
+                      0, 0, verbose);
+
+      /* *INDENT-OFF* */
+      pool_foreach (app, app_pool, ({
+        if (hash_elts (app->listeners_table) == 0)
+          continue;
+        hash_foreach (handle, sm_index, app->listeners_table, ({
+          vlib_cli_output (vm, "%U", format_application_listener, app,
+                           handle, sm_index, verbose);
+        }));
+      }));
+      /* *INDENT-ON* */
+    }
+}
+
+void
+application_format_all_clients (vlib_main_t * vm, int do_local, int verbose)
+{
+  application_t *app;
+
+  if (!pool_elts (app_pool))
+    {
+      vlib_cli_output (vm, "No active apps");
+      return;
+    }
+
+  if (do_local)
+    {
+      application_format_local_connects (0, verbose);
+
+      /* *INDENT-OFF* */
+      pool_foreach (app, app_pool, ({
+        if (app->local_connects)
+          application_format_local_connects (app, verbose);
+      }));
+      /* *INDENT-ON* */
+    }
+  else
+    {
+      application_format_connects (0, verbose);
+
+      /* *INDENT-OFF* */
+      pool_foreach (app, app_pool, ({
+        if (app->connects_seg_manager == (u32)~0)
+          continue;
+        application_format_connects (app, verbose);
+      }));
+      /* *INDENT-ON* */
+    }
+}
+
 static clib_error_t *
 show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
                     vlib_cli_command_t * cmd)
 {
+  int do_server = 0, do_client = 0, do_local = 0;
   application_t *app;
-  int do_server = 0;
-  int do_client = 0;
   int verbose = 0;
 
   session_cli_return_if_not_enabled ();
@@ -886,6 +1459,8 @@ show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
        do_server = 1;
       else if (unformat (input, "client"))
        do_client = 1;
+      else if (unformat (input, "local"))
+       do_local = 1;
       else if (unformat (input, "verbose"))
        verbose = 1;
       else
@@ -893,49 +1468,10 @@ show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
     }
 
   if (do_server)
-    {
-      u64 handle;
-      u32 index;
-      if (pool_elts (app_pool))
-       {
-         vlib_cli_output (vm, "%U", format_application_listener,
-                          0 /* header */ , 0, 0, verbose);
-
-         /* *INDENT-OFF* */
-          pool_foreach (app, app_pool, ({
-            /* App's listener sessions */
-            if (hash_elts (app->listeners_table) == 0)
-              continue;
-            hash_foreach (handle, index, app->listeners_table, ({
-              vlib_cli_output (vm, "%U", format_application_listener, app,
-                                      handle, index, verbose);
-            }));
-          }));
-          /* *INDENT-ON* */
-
-       }
-      else
-       vlib_cli_output (vm, "No active server bindings");
-    }
+    application_format_all_listeners (vm, do_local, verbose);
 
   if (do_client)
-    {
-      if (pool_elts (app_pool))
-       {
-         application_format_connects (0, verbose);
-
-          /* *INDENT-OFF* */
-          pool_foreach (app, app_pool,
-          ({
-            if (app->connects_seg_manager == (u32)~0)
-              continue;
-            application_format_connects (app, verbose);
-          }));
-          /* *INDENT-ON* */
-       }
-      else
-       vlib_cli_output (vm, "No active client bindings");
-    }
+    application_format_all_clients (vm, do_local, verbose);
 
   /* Print app related info */
   if (!do_server && !do_client)
index 36ae6fc..4938bef 100644 (file)
@@ -32,6 +32,9 @@ typedef struct _stream_session_cb_vft
   /** Notify server of new segment */
   int (*add_segment_callback) (u32 api_client_index,
                               const ssvm_private_t * ssvm_seg);
+  /** Notify server of new segment */
+  int (*del_segment_callback) (u32 api_client_index,
+                              const ssvm_private_t * ssvm_seg);
 
   /** Notify server of newly accepted session */
   int (*session_accept_callback) (stream_session_t * new_session);
@@ -49,8 +52,6 @@ typedef struct _stream_session_cb_vft
   /** Direct RX callback, for built-in servers */
   int (*builtin_server_rx_callback) (stream_session_t * session);
 
-  /** Redirect connection to local server */
-  int (*redirect_connect_callback) (u32 api_client_index, void *mp);
 } session_cb_vft_t;
 
 typedef struct _application
@@ -81,8 +82,9 @@ typedef struct _application
   session_cb_vft_t cb_fns;
 
   /*
-   * svm segment management
+   * ssvm (fifo) segment management
    */
+  /** Segment manager used for outgoing connects issued by the app */
   u32 connects_seg_manager;
 
   /** Lookup tables for listeners. Value is segment manager index */
@@ -100,10 +102,25 @@ typedef struct _application
   segment_manager_properties_t sm_properties;
 
   u16 proxied_transports;
+
+  /*
+   * Local "cut through" connections specific
+   */
+
+  /** Segment manager used for incoming "cut through" connects */
+  u32 local_segment_manager;
+
+  /** Pool of local listen sessions */
+  local_session_t *local_listen_sessions;
+
+  /** Pool of local sessions the app owns (as a server) */
+  local_session_t *local_sessions;
+
+  /** Hash table of the app's local connects */
+  uword *local_connects;
 } application_t;
 
 #define APP_INVALID_INDEX ((u32)~0)
-#define APP_DROP_INDEX (((u32)~0) - 1)
 #define APP_NS_INVALID_INDEX ((u32)~0)
 #define APP_INVALID_SEGMENT_MANAGER_INDEX ((u32) ~0)
 
@@ -117,8 +134,14 @@ application_t *application_lookup (u32 api_client_index);
 u32 application_get_index (application_t * app);
 
 int application_start_listen (application_t * app,
-                             session_endpoint_t * tep, u64 * handle);
-int application_stop_listen (application_t * srv, u64 handle);
+                             session_endpoint_t * tep,
+                             session_handle_t * handle);
+int application_start_local_listen (application_t * server,
+                                   session_endpoint_t * sep,
+                                   session_handle_t * handle);
+int application_stop_listen (application_t * srv, session_handle_t handle);
+int application_stop_local_listen (application_t * server,
+                                  session_handle_t listener_handle);
 int application_open_session (application_t * app, session_endpoint_t * tep,
                              u32 api_context);
 int application_api_queue_is_full (application_t * app);
@@ -126,7 +149,7 @@ int application_api_queue_is_full (application_t * app);
 segment_manager_t *application_get_listen_segment_manager (application_t *
                                                           app,
                                                           stream_session_t *
-                                                          s);
+                                                          ls);
 segment_manager_t *application_get_connect_segment_manager (application_t *
                                                            app);
 int application_is_proxy (application_t * app);
@@ -151,6 +174,67 @@ segment_manager_properties_t *application_get_segment_manager_properties (u32
 segment_manager_properties_t
   * application_segment_manager_properties (application_t * app);
 
+local_session_t *application_alloc_local_session (application_t * app);
+void application_free_local_session (application_t * app,
+                                    local_session_t * ls);
+local_session_t *application_get_local_session (application_t * app,
+                                               u32 session_index);
+local_session_t *application_get_local_session_from_handle (session_handle_t
+                                                           handle);
+int application_local_session_connect (u32 table_index,
+                                      application_t * client,
+                                      application_t * server,
+                                      local_session_t * ll, u32 opaque);
+int application_local_session_connect_notify (local_session_t * ls);
+int application_local_session_disconnect (u32 app_index,
+                                         local_session_t * ls);
+void application_local_sessions_del (application_t * app);
+
+always_inline u32
+local_session_id (local_session_t * ll)
+{
+  ASSERT (ll->app_index < (2 << 16) && ll->session_index < (2 << 16));
+  return ((u32) ll->app_index << 16 | (u32) ll->session_index);
+}
+
+always_inline void
+local_session_parse_id (u32 ls_id, u32 * app_index, u32 * session_index)
+{
+  *app_index = ls_id >> 16;
+  *session_index = ls_id & 0xFFF;
+}
+
+always_inline void
+local_session_parse_handle (session_handle_t handle, u32 * server_index,
+                           u32 * session_index)
+{
+  u32 bottom;
+  ASSERT ((handle >> 32) == SESSION_LOCAL_TABLE_PREFIX);
+  bottom = (handle & 0xFFFFFFFF);
+  local_session_parse_id (bottom, server_index, session_index);
+}
+
+always_inline session_handle_t
+application_local_session_handle (local_session_t * ls)
+{
+  return ((u64) SESSION_LOCAL_TABLE_PREFIX << 32) | local_session_id (ls);
+}
+
+always_inline local_session_t *
+application_get_local_listen_session (application_t * app, u32 session_index)
+{
+  return pool_elt_at_index (app->local_listen_sessions, session_index);
+}
+
+always_inline u8
+application_local_session_listener_has_transport (local_session_t * ls)
+{
+  transport_proto_t tp;
+  tp = session_type_transport_proto (ls->listener_session_type);
+  return (tp != TRANSPORT_PROTO_NONE);
+}
+
+
 #endif /* SRC_VNET_SESSION_APPLICATION_H_ */
 
 /*
index efdd3dd..1023c8c 100644 (file)
@@ -119,12 +119,8 @@ vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle)
    */
   if (application_has_local_scope (app) && session_endpoint_is_zero (sep))
     {
-      table_index = application_local_session_table (app);
-      listener = session_lookup_endpoint_listener (table_index, sep, 1);
-      if (listener != SESSION_INVALID_HANDLE)
-       return VNET_API_ERROR_ADDRESS_IN_USE;
-      session_lookup_add_session_endpoint (table_index, sep, app->index);
-      *handle = session_lookup_local_listener_make_handle (sep);
+      if ((rv = application_start_local_listen (app, sep, handle)))
+       return rv;
       have_local = 1;
     }
 
@@ -143,47 +139,21 @@ vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle)
 }
 
 int
-vnet_unbind_i (u32 app_index, u64 handle)
+vnet_unbind_i (u32 app_index, session_handle_t handle)
 {
-  application_t *app = application_get_if_valid (app_index);
-  stream_session_t *listener = 0;
-  u32 table_index;
+  application_t *app;
+  int rv;
 
-  if (!app)
+  if (!(app = application_get_if_valid (app_index)))
     {
       SESSION_DBG ("app (%d) not attached", app_index);
       return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
     }
 
-  /*
-   * Clean up local session table. If we have a listener session use it to
-   * find the port and proto. If not, the handle must be a local table handle
-   * so parse it.
-   */
-
   if (application_has_local_scope (app))
     {
-      session_endpoint_t sep = SESSION_ENDPOINT_NULL;
-      if (!session_lookup_local_is_handle (handle))
-       listener = listen_session_get_from_handle (handle);
-      if (listener)
-       {
-         if (listen_session_get_local_session_endpoint (listener, &sep))
-           {
-             clib_warning ("broken listener");
-             return -1;
-           }
-       }
-      else
-       {
-         if (session_lookup_local_listener_parse_handle (handle, &sep))
-           {
-             clib_warning ("can't parse handle");
-             return -1;
-           }
-       }
-      table_index = application_local_session_table (app);
-      session_lookup_del_session_endpoint (table_index, &sep);
+      if ((rv = application_stop_local_listen (app, handle)))
+       return rv;
     }
 
   /*
@@ -194,53 +164,47 @@ vnet_unbind_i (u32 app_index, u64 handle)
   return 0;
 }
 
-static int
-app_connect_redirect (application_t * server, void *mp)
-{
-  return server->cb_fns.redirect_connect_callback (server->api_client_index,
-                                                  mp);
-}
-
 int
-vnet_connect_i (u32 app_index, u32 api_context, session_endpoint_t * sep,
+vnet_connect_i (u32 client_index, u32 api_context, session_endpoint_t * sep,
                void *mp)
 {
-  application_t *server, *app;
-  u32 table_index, server_index;
+  application_t *server, *client;
+  u32 table_index, server_index, li;
   stream_session_t *listener;
+  local_session_t *ll;
+  u64 lh;
 
   if (session_endpoint_is_zero (sep))
     return VNET_API_ERROR_INVALID_VALUE;
 
-  app = application_get (app_index);
-  session_endpoint_update_for_app (sep, app);
+  client = application_get (client_index);
+  session_endpoint_update_for_app (sep, client);
 
   /*
-   * First check the the local scope for locally attached destinations.
+   * First check the local scope for locally attached destinations.
    * If we have local scope, we pass *all* connects through it since we may
    * have special policy rules even for non-local destinations, think proxy.
    */
-  if (application_has_local_scope (app))
+  if (application_has_local_scope (client))
     {
-      table_index = application_local_session_table (app);
-      server_index = session_lookup_local_endpoint (table_index, sep);
-      if (server_index == APP_DROP_INDEX)
+      table_index = application_local_session_table (client);
+      lh = session_lookup_local_endpoint (table_index, sep);
+      if (lh == SESSION_DROP_HANDLE)
        return VNET_API_ERROR_APP_CONNECT_FILTERED;
 
+      local_session_parse_handle (lh, &server_index, &li);
+
       /*
        * Break loop if rule in local table points to connecting app. This
        * can happen if client is a generic proxy. Route connect through
        * global table instead.
        */
-      if (server_index != app_index)
+      if (server_index != client_index
+         && (server = application_get_if_valid (server_index)))
        {
-         server = application_get (server_index);
-         /*
-          * Server is willing to have a direct fifo connection created
-          * instead of going through the state machine, etc.
-          */
-         if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT))
-           return app_connect_redirect (server, mp);
+         ll = application_get_local_listen_session (server, li);
+         return application_local_session_connect (table_index, client,
+                                                   server, ll, api_context);
        }
     }
 
@@ -251,23 +215,25 @@ vnet_connect_i (u32 app_index, u32 api_context, session_endpoint_t * sep,
   if (session_endpoint_is_local (sep))
     return VNET_API_ERROR_SESSION_CONNECT;
 
-  if (!application_has_global_scope (app))
+  if (!application_has_global_scope (client))
     return VNET_API_ERROR_APP_CONNECT_SCOPE;
 
-  table_index = application_session_table (app,
+  table_index = application_session_table (client,
                                           session_endpoint_fib_proto (sep));
   listener = session_lookup_listener (table_index, sep);
   if (listener)
     {
       server = application_get (listener->app_index);
-      if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT))
-       return app_connect_redirect (server, mp);
+      if (server)
+       return application_local_session_connect (table_index, client, server,
+                                                 (local_session_t *)
+                                                 listener, api_context);
     }
 
   /*
    * Not connecting to a local server, propagate to transport
    */
-  if (application_open_session (app, sep, api_context))
+  if (application_open_session (client, sep, api_context))
     return VNET_API_ERROR_SESSION_CONNECT;
   return 0;
 }
@@ -490,19 +456,30 @@ vnet_connect_uri (vnet_connect_args_t * a)
 int
 vnet_disconnect_session (vnet_disconnect_args_t * a)
 {
-  u32 index, thread_index;
-  stream_session_t *s;
-
-  session_parse_handle (a->handle, &index, &thread_index);
-  s = session_get_if_valid (index, thread_index);
-
-  if (!s || s->app_index != a->app_index)
-    return VNET_API_ERROR_INVALID_VALUE;
+  if (session_handle_is_local (a->handle))
+    {
+      local_session_t *ls;
+      ls = application_get_local_session_from_handle (a->handle);
+      if (ls->app_index != a->app_index && ls->client_index != a->app_index)
+       {
+         clib_warning ("app %u is neither client nor server for session %u",
+                       a->app_index, a->app_index);
+         return VNET_API_ERROR_INVALID_VALUE;
+       }
+      return application_local_session_disconnect (a->app_index, ls);
+    }
+  else
+    {
+      stream_session_t *s;
+      s = session_get_from_handle_if_valid (a->handle);
+      if (!s || s->app_index != a->app_index)
+       return VNET_API_ERROR_INVALID_VALUE;
 
-  /* We're peeking into another's thread pool. Make sure */
-  ASSERT (s->session_index == index);
+      /* We're peeking into another's thread pool. Make sure */
+      ASSERT (s->session_index == session_index_from_handle (a->handle));
 
-  stream_session_disconnect (s);
+      stream_session_disconnect (s);
+    }
   return 0;
 }
 
index 8db318f..4b7a2df 100644 (file)
@@ -87,12 +87,12 @@ typedef struct _vnet_connect_args
 
   /* Used for redirects */
   void *mp;
-  u64 session_handle;
+  session_handle_t session_handle;
 } vnet_connect_args_t;
 
 typedef struct _vnet_disconnect_args_t
 {
-  u64 handle;
+  session_handle_t handle;
   u32 app_index;
 } vnet_disconnect_args_t;
 
@@ -152,6 +152,9 @@ int
 api_parse_session_handle (u64 handle, u32 * session_index,
                          u32 * thread_index);
 
+void send_local_session_disconnect_callback (u32 app_index,
+                                            local_session_t * ls);
+
 #endif /* __included_uri_h__ */
 
 /*
index f8af3fb..eb24557 100644 (file)
@@ -27,8 +27,9 @@ static u32 segment_name_counter = 0;
 /**
  * Default fifo and segment size. TODO config.
  */
-u32 default_fifo_size = 1 << 12;
-u32 default_segment_size = 1 << 20;
+static u32 default_fifo_size = 1 << 12;
+static u32 default_segment_size = 1 << 20;
+static u32 default_app_evt_queue_size = 128;
 
 segment_manager_properties_t *
 segment_manager_properties_get (segment_manager_t * sm)
@@ -42,6 +43,7 @@ segment_manager_properties_init (segment_manager_properties_t * props)
   props->add_segment_size = default_segment_size;
   props->rx_fifo_size = default_fifo_size;
   props->tx_fifo_size = default_fifo_size;
+  props->evt_q_size = default_app_evt_queue_size;
   return props;
 }
 
@@ -67,7 +69,7 @@ segment_manager_segment_index (segment_manager_t * sm,
 /**
  * Remove segment without lock
  */
-always_inline void
+void
 segment_manager_del_segment (segment_manager_t * sm,
                             svm_fifo_segment_private_t * fs)
 {
@@ -131,6 +133,7 @@ segment_manager_get_segment_w_lock (segment_manager_t * sm, u32 segment_index)
 void
 segment_manager_segment_reader_unlock (segment_manager_t * sm)
 {
+  ASSERT (sm->segments_rwlock->n_readers > 0);
   clib_rwlock_reader_unlock (&sm->segments_rwlock);
 }
 
@@ -146,7 +149,7 @@ segment_manager_segment_writer_unlock (segment_manager_t * sm)
  * If needed a writer's lock is acquired before allocating a new segment
  * to avoid affecting any of the segments pool readers.
  */
-always_inline int
+int
 segment_manager_add_segment (segment_manager_t * sm, u32 segment_size)
 {
   segment_manager_main_t *smm = &segment_manager_main;
@@ -243,7 +246,7 @@ segment_manager_new ()
  */
 int
 segment_manager_init (segment_manager_t * sm, u32 first_seg_size,
-                     u32 evt_q_size, u32 prealloc_fifo_pairs)
+                     u32 prealloc_fifo_pairs)
 {
   u32 rx_fifo_size, tx_fifo_size, pair_size;
   u32 rx_rounded_data_size, tx_rounded_data_size;
@@ -283,10 +286,11 @@ segment_manager_init (segment_manager_t * sm, u32 first_seg_size,
              return seg_index;
            }
 
+         segment = segment_manager_get_segment (sm, seg_index);
          if (i == 0)
-           sm->event_queue = segment_manager_alloc_queue (sm, evt_q_size);
+           sm->event_queue = segment_manager_alloc_queue (segment,
+                                                          props->evt_q_size);
 
-         segment = segment_manager_get_segment (sm, seg_index);
          svm_fifo_segment_preallocate_fifo_pairs (segment,
                                                   props->rx_fifo_size,
                                                   props->tx_fifo_size,
@@ -304,7 +308,9 @@ segment_manager_init (segment_manager_t * sm, u32 first_seg_size,
          clib_warning ("Failed to allocate segment");
          return seg_index;
        }
-      sm->event_queue = segment_manager_alloc_queue (sm, evt_q_size);
+      segment = segment_manager_get_segment (sm, seg_index);
+      sm->event_queue = segment_manager_alloc_queue (segment,
+                                                    props->evt_q_size);
     }
 
   return 0;
@@ -422,10 +428,10 @@ segment_manager_init_del (segment_manager_t * sm)
     }
 }
 
-always_inline int
-segment_try_alloc_fifos (svm_fifo_segment_private_t * fifo_segment,
-                        u32 rx_fifo_size, u32 tx_fifo_size,
-                        svm_fifo_t ** rx_fifo, svm_fifo_t ** tx_fifo)
+int
+segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fifo_segment,
+                                u32 rx_fifo_size, u32 tx_fifo_size,
+                                svm_fifo_t ** rx_fifo, svm_fifo_t ** tx_fifo)
 {
   rx_fifo_size = clib_max (rx_fifo_size, default_fifo_size);
   *rx_fifo = svm_fifo_segment_alloc_fifo (fifo_segment, rx_fifo_size,
@@ -466,7 +472,7 @@ segment_manager_alloc_session_fifos (segment_manager_t * sm,
                                     svm_fifo_t ** tx_fifo,
                                     u32 * fifo_segment_index)
 {
-  svm_fifo_segment_private_t *fifo_segment;
+  svm_fifo_segment_private_t *fifo_segment = 0;
   int alloc_fail = 1, rv = 0, new_fs_index;
   segment_manager_properties_t *props;
   u8 added_a_segment = 0;
@@ -481,9 +487,10 @@ segment_manager_alloc_session_fifos (segment_manager_t * sm,
 
   /* *INDENT-OFF* */
   segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({
-    alloc_fail = segment_try_alloc_fifos (fifo_segment, props->rx_fifo_size,
-                                          props->tx_fifo_size, rx_fifo,
-                                          tx_fifo);
+    alloc_fail = segment_manager_try_alloc_fifos (fifo_segment,
+                                                  props->rx_fifo_size,
+                                                  props->tx_fifo_size,
+                                                  rx_fifo, tx_fifo);
     /* Exit with lock held, drop it after notifying app */
     if (!alloc_fail)
       goto alloc_success;
@@ -528,9 +535,10 @@ alloc_check:
          return SESSION_ERROR_SEG_CREATE;
        }
       fifo_segment = segment_manager_get_segment_w_lock (sm, new_fs_index);
-      alloc_fail = segment_try_alloc_fifos (fifo_segment, props->rx_fifo_size,
-                                           props->tx_fifo_size, rx_fifo,
-                                           tx_fifo);
+      alloc_fail = segment_manager_try_alloc_fifos (fifo_segment,
+                                                   props->rx_fifo_size,
+                                                   props->tx_fifo_size,
+                                                   rx_fifo, tx_fifo);
       added_a_segment = 1;
       goto alloc_check;
     }
@@ -588,16 +596,13 @@ segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
  * Must be called with lock held
  */
 svm_queue_t *
-segment_manager_alloc_queue (segment_manager_t * sm, u32 queue_size)
+segment_manager_alloc_queue (svm_fifo_segment_private_t * segment,
+                            u32 queue_size)
 {
-  svm_fifo_segment_private_t *segment;
   ssvm_shared_header_t *sh;
   svm_queue_t *q;
   void *oldheap;
 
-  ASSERT (!pool_is_free_index (sm->segments, 0));
-
-  segment = segment_manager_get_segment (sm, 0);
   sh = segment->ssvm.sh;
 
   oldheap = ssvm_push_heap (sh);
index 9b1d4cd..62e5e97 100644 (file)
@@ -27,9 +27,7 @@ typedef struct _segment_manager_properties
   /** Session fifo sizes.  */
   u32 rx_fifo_size;
   u32 tx_fifo_size;
-
-  /** Preallocated pool sizes */
-//  u32 preallocated_fifo_pairs;
+  u32 evt_q_size;
 
   /** Configured additional segment size */
   u32 add_segment_size;
@@ -40,8 +38,6 @@ typedef struct _segment_manager_properties
   /** Segment type: if set to SSVM_N_TYPES, private segments are used */
   ssvm_segment_type_t segment_type;
 
-  /** Use one or more private mheaps, instead of the global heap */
-//  u32 private_segment_count;
 } segment_manager_properties_t;
 
 typedef struct _segment_manager
@@ -126,7 +122,7 @@ segment_manager_event_queue (segment_manager_t * sm)
 
 segment_manager_t *segment_manager_new ();
 int segment_manager_init (segment_manager_t * sm, u32 first_seg_size,
-                         u32 evt_q_size, u32 prealloc_fifo_pairs);
+                         u32 prealloc_fifo_pairs);
 
 svm_fifo_segment_private_t *segment_manager_get_segment (segment_manager_t *
                                                         sm,
@@ -134,6 +130,9 @@ svm_fifo_segment_private_t *segment_manager_get_segment (segment_manager_t *
 svm_fifo_segment_private_t
   * segment_manager_get_segment_w_lock (segment_manager_t * sm,
                                        u32 segment_index);
+int segment_manager_add_segment (segment_manager_t * sm, u32 segment_size);
+void segment_manager_del_segment (segment_manager_t * sm,
+                                 svm_fifo_segment_private_t * fs);
 void segment_manager_segment_reader_unlock (segment_manager_t * sm);
 void segment_manager_segment_writer_unlock (segment_manager_t * sm);
 
@@ -143,15 +142,17 @@ void segment_manager_del_sessions (segment_manager_t * sm);
 void segment_manager_del (segment_manager_t * sm);
 void segment_manager_init_del (segment_manager_t * sm);
 u8 segment_manager_has_fifos (segment_manager_t * sm);
-int
-segment_manager_alloc_session_fifos (segment_manager_t * sm,
-                                    svm_fifo_t ** server_rx_fifo,
-                                    svm_fifo_t ** server_tx_fifo,
-                                    u32 * fifo_segment_index);
-void
-segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
-                              svm_fifo_t * tx_fifo);
-svm_queue_t *segment_manager_alloc_queue (segment_manager_t * sm,
+int segment_manager_alloc_session_fifos (segment_manager_t * sm,
+                                        svm_fifo_t ** server_rx_fifo,
+                                        svm_fifo_t ** server_tx_fifo,
+                                        u32 * fifo_segment_index);
+int segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fs,
+                                    u32 rx_fifo_size, u32 tx_fifo_size,
+                                    svm_fifo_t ** rx_fifo,
+                                    svm_fifo_t ** tx_fifo);
+void segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
+                                   svm_fifo_t * tx_fifo);
+svm_queue_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs,
                                          u32 queue_size);
 void segment_manager_dealloc_queue (segment_manager_t * sm, svm_queue_t * q);
 void segment_manager_app_detach (segment_manager_t * sm);
index 1c3e84b..a6739fc 100644 (file)
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-option version = "1.0.0";
+option version = "1.0.1";
 
 /** \brief client->vpp, attach application to session layer
     @param client_index - opaque cookie to identify the sender
@@ -72,6 +72,17 @@ autoreply define map_another_segment {
     u8 segment_name[128];
 };
 
+/** \brief vpp->client unmap shared memory segment
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param segment_name - 
+*/
+autoreply define unmap_segment {
+    u32 client_index;
+    u32 context;
+    u8 segment_name[128];
+};
+
  /** \brief Bind to a given URI
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
@@ -122,10 +133,12 @@ autoreply define connect_uri {
     @param context - sender context, to match reply w/ request
     @param listener_handle - tells client which listener this pertains to
     @param handle - unique session identifier
-    @param session_thread_index - thread index of new session
     @param rx_fifo_address - rx (vpp -> vpp-client) fifo address 
     @param tx_fifo_address - tx (vpp-client -> vpp) fifo address 
-    @param vpp_event_queue_address - vpp's event queue address
+    @param vpp_event_queue_address - vpp's event queue address or client's
+                                                                       event queue for cut through
+    @param server_event_queue_address - server's event queue address for
+                                                                          cut through sessions
     @param port - remote port
     @param is_ip4 - 1 if the ip is ip4
     @param ip - remote ip
@@ -138,6 +151,7 @@ define accept_session {
   u64 server_rx_fifo;
   u64 server_tx_fifo;
   u64 vpp_event_queue_address;
+  u64 server_event_queue_address;
   u16 port;
   u8 is_ip4;
   u8 ip[16];
@@ -176,7 +190,6 @@ define disconnect_session {
     @param handle - session handle
 */
 define disconnect_session_reply {
-  u32 client_index;
   u32 context;
   i32 retval;
   u64 handle;
@@ -302,6 +315,7 @@ define connect_session {
     @param server_rx_fifo - rx (vpp -> vpp-client) fifo address 
     @param server_tx_fifo - tx (vpp-client -> vpp) fifo address 
     @param vpp_event_queue_address - vpp's event queue address
+    @param client_event_queue_address - client's event queue address
     @param segment_size - size of segment to be attached. Only for redirects.
     @param segment_name_length - non-zero if the client needs to attach to 
                                  the fifo segment
@@ -317,6 +331,7 @@ define connect_session_reply {
   u64 server_rx_fifo;
   u64 server_tx_fifo;
   u64 vpp_event_queue_address;
+  u64 client_event_queue_address;
   u32 segment_size;
   u8 segment_name_length;
   u8 segment_name[128];
index ec00e29..108e5fe 100644 (file)
@@ -87,12 +87,14 @@ typedef struct
   void *arg;
 } rpc_args_t;
 
+typedef u64 session_handle_t;
+
 /* *INDENT-OFF* */
 typedef CLIB_PACKED (struct {
   union
     {
       svm_fifo_t * fifo;
-      u64 session_handle;
+      session_handle_t session_handle;
       rpc_args_t rpc_args;
     };
   u8 event_type;
@@ -251,38 +253,61 @@ session_get_if_valid (u64 si, u32 thread_index)
   return pool_elt_at_index (session_manager_main.sessions[thread_index], si);
 }
 
-always_inline u64
+always_inline session_handle_t
 session_handle (stream_session_t * s)
 {
   return ((u64) s->thread_index << 32) | (u64) s->session_index;
 }
 
 always_inline u32
-session_index_from_handle (u64 handle)
+session_index_from_handle (session_handle_t handle)
 {
   return handle & 0xFFFFFFFF;
 }
 
 always_inline u32
-session_thread_from_handle (u64 handle)
+session_thread_from_handle (session_handle_t handle)
 {
   return handle >> 32;
 }
 
 always_inline void
-session_parse_handle (u64 handle, u32 * index, u32 * thread_index)
+session_parse_handle (session_handle_t handle, u32 * index,
+                     u32 * thread_index)
 {
   *index = session_index_from_handle (handle);
   *thread_index = session_thread_from_handle (handle);
 }
 
 always_inline stream_session_t *
-session_get_from_handle (u64 handle)
+session_get_from_handle (session_handle_t handle)
 {
   session_manager_main_t *smm = &session_manager_main;
-  return
-    pool_elt_at_index (smm->sessions[session_thread_from_handle (handle)],
-                      session_index_from_handle (handle));
+  u32 session_index, thread_index;
+  session_parse_handle (handle, &session_index, &thread_index);
+  return pool_elt_at_index (smm->sessions[thread_index], session_index);
+}
+
+always_inline stream_session_t *
+session_get_from_handle_if_valid (session_handle_t handle)
+{
+  u32 session_index, thread_index;
+  session_parse_handle (handle, &session_index, &thread_index);
+  return session_get_if_valid (session_index, thread_index);
+}
+
+always_inline u8
+session_handle_is_local (session_handle_t handle)
+{
+  if ((handle >> 32) == SESSION_LOCAL_TABLE_PREFIX)
+    return 1;
+  return 0;
+}
+
+always_inline transport_proto_t
+session_type_transport_proto (session_type_t st)
+{
+  return (st >> 1);
 }
 
 always_inline transport_proto_t
@@ -291,12 +316,25 @@ session_get_transport_proto (stream_session_t * s)
   return (s->session_type >> 1);
 }
 
+always_inline fib_protocol_t
+session_get_fib_proto (stream_session_t * s)
+{
+  u8 is_ip4 = s->session_type & 1;
+  return (is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6);
+}
+
 always_inline session_type_t
 session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4)
 {
   return (proto << 1 | is_ip4);
 }
 
+always_inline u8
+session_has_transport (stream_session_t * s)
+{
+  return (session_get_transport_proto (s) != TRANSPORT_PROTO_NONE);
+}
+
 /**
  * Acquires a lock that blocks a session pool from expanding.
  *
@@ -470,7 +508,7 @@ listen_session_get_handle (stream_session_t * s)
 }
 
 always_inline stream_session_t *
-listen_session_get_from_handle (u64 handle)
+listen_session_get_from_handle (session_handle_t handle)
 {
   session_manager_main_t *smm = &session_manager_main;
   stream_session_t *s;
index 57af960..11fa0fa 100755 (executable)
@@ -110,6 +110,40 @@ send_add_segment_callback (u32 api_client_index, const ssvm_private_t * sp)
   return 0;
 }
 
+static int
+send_del_segment_callback (u32 api_client_index, const ssvm_private_t * fs)
+{
+  vl_api_unmap_segment_t *mp;
+  vl_api_registration_t *reg;
+
+  reg = vl_mem_api_client_index_to_registration (api_client_index);
+  if (!reg)
+    {
+      clib_warning ("no registration: %u", api_client_index);
+      return -1;
+    }
+
+  if (ssvm_type (fs) == SSVM_SEGMENT_MEMFD
+      && vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
+    {
+      clib_warning ("can't send memfd fd");
+      return -1;
+    }
+
+  mp = vl_msg_api_alloc_as_if_client (sizeof (*mp));
+  memset (mp, 0, sizeof (*mp));
+  mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_UNMAP_SEGMENT);
+  strncpy ((char *) mp->segment_name, (char *) fs->name,
+          sizeof (mp->segment_name) - 1);
+
+  vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
+
+  if (ssvm_type (fs) == SSVM_SEGMENT_MEMFD)
+    return session_send_memfd_fd (reg, fs);
+
+  return 0;
+}
+
 static int
 send_session_accept_callback (stream_session_t * s)
 {
@@ -121,7 +155,6 @@ send_session_accept_callback (stream_session_t * s)
   stream_session_t *listener;
   svm_queue_t *vpp_queue;
 
-  vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
   reg = vl_mem_api_client_index_to_registration (server->api_client_index);
   if (!reg)
     {
@@ -134,32 +167,78 @@ send_session_accept_callback (stream_session_t * s)
 
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
   mp->context = server->index;
-  listener = listen_session_get (s->session_type, s->listener_index);
-  tp_vft = transport_protocol_get_vft (session_get_transport_proto (s));
-  tc = tp_vft->get_connection (s->connection_index, s->thread_index);
-  mp->listener_handle = listen_session_get_handle (listener);
+  mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
+  mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
 
-  if (application_is_proxy (server))
+  if (session_has_transport (s))
     {
-      listener =
-       application_first_listener (server,
-                                   transport_connection_fib_proto (tc),
-                                   tc->proto);
-      if (listener)
-       mp->listener_handle = listen_session_get_handle (listener);
+      listener = listen_session_get (s->session_type, s->listener_index);
+      mp->listener_handle = listen_session_get_handle (listener);
+      if (application_is_proxy (server))
+       {
+         listener =
+           application_first_listener (server, session_get_fib_proto (s),
+                                       session_get_transport_proto (s));
+         if (listener)
+           mp->listener_handle = listen_session_get_handle (listener);
+       }
+      vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+      mp->handle = session_handle (s);
+      tp_vft = transport_protocol_get_vft (session_get_transport_proto (s));
+      tc = tp_vft->get_connection (s->connection_index, s->thread_index);
+      mp->port = tc->rmt_port;
+      mp->is_ip4 = tc->is_ip4;
+      clib_memcpy (&mp->ip, &tc->rmt_ip, sizeof (tc->rmt_ip));
+    }
+  else
+    {
+      local_session_t *ls = (local_session_t *) s;
+      local_session_t *ll;
+      if (application_local_session_listener_has_transport (ls))
+       {
+         listener = listen_session_get (ls->listener_session_type,
+                                        ls->listener_index);
+         mp->listener_handle = listen_session_get_handle (listener);
+       }
+      else
+       {
+         ll = application_get_local_listen_session (server,
+                                                    ls->listener_index);
+         mp->listener_handle = application_local_session_handle (ll);
+       }
+      mp->handle = application_local_session_handle (ls);
+      mp->port = ls->port;
+      mp->vpp_event_queue_address = ls->client_evt_q;
+      mp->server_event_queue_address = ls->server_evt_q;
     }
-  mp->handle = session_handle (s);
-  mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
-  mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
-  mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
-  mp->port = tc->rmt_port;
-  mp->is_ip4 = tc->is_ip4;
-  clib_memcpy (&mp->ip, &tc->rmt_ip, sizeof (tc->rmt_ip));
   vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
 
   return 0;
 }
 
+void
+send_local_session_disconnect_callback (u32 app_index, local_session_t * ls)
+{
+  application_t *app = application_get (app_index);
+  vl_api_disconnect_session_t *mp;
+  vl_api_registration_t *reg;
+
+  reg = vl_mem_api_client_index_to_registration (app->api_client_index);
+  if (!reg)
+    {
+      clib_warning ("no registration: %u", app->api_client_index);
+      return;
+    }
+
+  mp = vl_mem_api_alloc_as_if_client_w_reg (reg, sizeof (*mp));
+  memset (mp, 0, sizeof (*mp));
+  mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SESSION);
+  mp->handle = application_local_session_handle (ls);
+  mp->context = app->api_client_index;
+  vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
+}
+
 static void
 send_session_disconnect_callback (stream_session_t * s)
 {
@@ -178,6 +257,7 @@ send_session_disconnect_callback (stream_session_t * s)
   memset (mp, 0, sizeof (*mp));
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SESSION);
   mp->handle = session_handle (s);
+  mp->context = app->api_client_index;
   vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
 }
 
@@ -227,21 +307,34 @@ send_session_connected_callback (u32 app_index, u32 api_context,
   if (is_fail)
     goto done;
 
-  tc = session_get_transport (s);
-  if (!tc)
+  if (session_has_transport (s))
     {
-      is_fail = 1;
-      goto done;
-    }
+      tc = session_get_transport (s);
+      if (!tc)
+       {
+         is_fail = 1;
+         goto done;
+       }
 
-  vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
-  mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
-  mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
-  mp->handle = session_handle (s);
-  mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
-  clib_memcpy (mp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
-  mp->is_ip4 = tc->is_ip4;
-  mp->lcl_port = tc->lcl_port;
+      vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
+      mp->handle = session_handle (s);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+      clib_memcpy (mp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
+      mp->is_ip4 = tc->is_ip4;
+      mp->lcl_port = tc->lcl_port;
+      mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+    }
+  else
+    {
+      local_session_t *ls = (local_session_t *) s;
+      mp->handle = application_local_session_handle (ls);
+      mp->lcl_port = ls->port;
+      mp->vpp_event_queue_address = ls->server_evt_q;
+      mp->client_event_queue_address = ls->client_evt_q;
+      mp->server_rx_fifo = pointer_to_uword (s->server_tx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->server_rx_fifo);
+    }
 
 done:
   mp->retval = is_fail ?
@@ -250,87 +343,13 @@ done:
   return 0;
 }
 
-/**
- * Redirect a connect_uri message to the indicated server.
- * Only sent if the server has bound the related port with
- * URI_OPTIONS_FLAGS_USE_FIFO
- */
-static int
-redirect_connect_callback (u32 server_api_client_index, void *mp_arg)
-{
-  vl_api_connect_sock_t *mp = mp_arg;
-  svm_queue_t *server_q, *client_q;
-  segment_manager_properties_t *props;
-  vlib_main_t *vm = vlib_get_main ();
-  f64 timeout = vlib_time_now (vm) + 0.5;
-  application_t *app;
-  int rv = 0;
-
-  server_q = vl_api_client_index_to_input_queue (server_api_client_index);
-
-  if (!server_q)
-    {
-      rv = VNET_API_ERROR_INVALID_VALUE;
-      goto out;
-    }
-
-  client_q = vl_api_client_index_to_input_queue (mp->client_index);
-  if (!client_q)
-    {
-      rv = VNET_API_ERROR_INVALID_VALUE_2;
-      goto out;
-    }
-
-  /* Tell the server the client's API queue address, so it can reply */
-  mp->client_queue_address = pointer_to_uword (client_q);
-  app = application_lookup (mp->client_index);
-  if (!app)
-    {
-      clib_warning ("no client application");
-      return -1;
-    }
-
-  props = application_segment_manager_properties (app);
-  mp->options[APP_OPTIONS_RX_FIFO_SIZE] = props->rx_fifo_size;
-  mp->options[APP_OPTIONS_TX_FIFO_SIZE] = props->tx_fifo_size;
-
-  /*
-   * Bounce message handlers MUST NOT block the data-plane.
-   * Spin waiting for the queue lock, but
-   */
-
-  while (vlib_time_now (vm) < timeout)
-    {
-      rv = svm_queue_add (server_q, (u8 *) & mp, 1 /*nowait */ );
-      switch (rv)
-       {
-         /* correctly enqueued */
-       case 0:
-         return VNET_API_ERROR_SESSION_REDIRECT;
-
-         /* continue spinning, wait for pthread_mutex_trylock to work */
-       case -1:
-         continue;
-
-         /* queue stuffed, drop the msg */
-       case -2:
-         rv = VNET_API_ERROR_QUEUE_FULL;
-         goto out;
-       }
-    }
-out:
-  /* Dispose of the message */
-  vl_msg_api_free (mp);
-  return rv;
-}
-
 static session_cb_vft_t session_cb_vft = {
   .session_accept_callback = send_session_accept_callback,
   .session_disconnect_callback = send_session_disconnect_callback,
   .session_connected_callback = send_session_connected_callback,
   .session_reset_callback = send_session_reset_callback,
   .add_segment_callback = send_add_segment_callback,
-  .redirect_connect_callback = redirect_connect_callback
+  .del_segment_callback = send_del_segment_callback,
 };
 
 static void
@@ -601,7 +620,7 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
     }
 
   /* Disconnect has been confirmed. Confirm close to transport */
-  app = application_lookup (mp->client_index);
+  app = application_lookup (mp->context);
   if (app)
     {
       a->handle = mp->handle;
@@ -644,9 +663,9 @@ vl_api_reset_session_reply_t_handler (vl_api_reset_session_reply_t * mp)
 static void
 vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
 {
+  vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+  local_session_t *ls;
   stream_session_t *s;
-  u32 session_index, thread_index;
-  vnet_disconnect_args_t _a, *a = &_a;
 
   /* Server isn't interested, kill the session */
   if (mp->retval)
@@ -654,11 +673,25 @@ vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
       a->app_index = mp->context;
       a->handle = mp->handle;
       vnet_disconnect_session (a);
+      return;
+    }
+
+  if (session_handle_is_local (mp->handle))
+    {
+      ls = application_get_local_session_from_handle (mp->handle);
+      if (!ls || ls->app_index != mp->context)
+       {
+         clib_warning ("server %u doesn't own local handle %llu",
+                       mp->context, mp->handle);
+         return;
+       }
+      if (application_local_session_connect_notify (ls))
+       return;
+      ls->session_state = SESSION_STATE_READY;
     }
   else
     {
-      session_parse_handle (mp->handle, &session_index, &thread_index);
-      s = session_get_if_valid (session_index, thread_index);
+      s = session_get_from_handle_if_valid (mp->handle);
       if (!s)
        {
          clib_warning ("session doesn't exist");
index 77f6f4b..9ce0b1a 100644 (file)
@@ -388,17 +388,18 @@ session_lookup_action_index_is_valid (u32 action_index)
   return 1;
 }
 
-static u32
-session_lookup_action_to_app_index (u32 action_index)
+static u64
+session_lookup_action_to_handle (u32 action_index)
 {
   switch (action_index)
     {
     case SESSION_RULES_TABLE_ACTION_DROP:
-      return APP_DROP_INDEX;
+      return SESSION_DROP_HANDLE;
     case SESSION_RULES_TABLE_ACTION_ALLOW:
     case SESSION_RULES_TABLE_INVALID_INDEX:
-      return APP_INVALID_INDEX;
+      return SESSION_INVALID_HANDLE;
     default:
+      /* application index */
       return action_index;
     }
 }
@@ -420,12 +421,13 @@ session_lookup_action_to_session (u32 action_index, u8 fib_proto,
                                  u8 transport_proto)
 {
   u32 app_index;
-  app_index = session_lookup_action_to_app_index (action_index);
+  app_index = session_lookup_action_to_handle (action_index);
   /* Nothing sophisticated for now, action index is app index */
   return session_lookup_app_listen_session (app_index, fib_proto,
                                            transport_proto);
 }
 
+/** UNUSED */
 stream_session_t *
 session_lookup_rules_table_session4 (session_table_t * st, u8 proto,
                                     ip4_address_t * lcl, u16 lcl_port,
@@ -435,12 +437,13 @@ session_lookup_rules_table_session4 (session_table_t * st, u8 proto,
   u32 action_index, app_index;
   action_index = session_rules_table_lookup4 (srt, lcl, rmt, lcl_port,
                                              rmt_port);
-  app_index = session_lookup_action_to_app_index (action_index);
+  app_index = session_lookup_action_to_handle (action_index);
   /* Nothing sophisticated for now, action index is app index */
   return session_lookup_app_listen_session (app_index, FIB_PROTOCOL_IP4,
                                            proto);
 }
 
+/** UNUSED */
 stream_session_t *
 session_lookup_rules_table_session6 (session_table_t * st, u8 proto,
                                     ip6_address_t * lcl, u16 lcl_port,
@@ -450,7 +453,7 @@ session_lookup_rules_table_session6 (session_table_t * st, u8 proto,
   u32 action_index, app_index;
   action_index = session_rules_table_lookup6 (srt, lcl, rmt, lcl_port,
                                              rmt_port);
-  app_index = session_lookup_action_to_app_index (action_index);
+  app_index = session_lookup_action_to_handle (action_index);
   return session_lookup_app_listen_session (app_index, FIB_PROTOCOL_IP6,
                                            proto);
 }
@@ -463,7 +466,7 @@ session_lookup_rules_table_session6 (session_table_t * st, u8 proto,
  * @param use_rules flag that indicates if the session rules of the table
  *                 should be used
  * @return invalid handle if nothing is found, the handle of a valid listener
- *        or an action_index if a rule is hit
+ *        or an action derived handle if a rule is hit
  */
 u64
 session_lookup_endpoint_listener (u32 table_index, session_endpoint_t * sep,
@@ -494,7 +497,7 @@ session_lookup_endpoint_listener (u32 table_index, session_endpoint_t * sep,
          ai = session_rules_table_lookup4 (srt, &lcl4, &sep->ip.ip4, 0,
                                            sep->port);
          if (session_lookup_action_index_is_valid (ai))
-           return session_lookup_action_to_app_index (ai);
+           return session_lookup_action_to_handle (ai);
        }
     }
   else
@@ -515,7 +518,7 @@ session_lookup_endpoint_listener (u32 table_index, session_endpoint_t * sep,
          ai = session_rules_table_lookup6 (srt, &lcl6, &sep->ip.ip6, 0,
                                            sep->port);
          if (session_lookup_action_index_is_valid (ai))
-           return session_lookup_action_to_app_index (ai);
+           return session_lookup_action_to_handle (ai);
        }
     }
   return SESSION_INVALID_HANDLE;
@@ -535,9 +538,9 @@ session_lookup_endpoint_listener (u32 table_index, session_endpoint_t * sep,
  *
  * @param table_index table where the lookup should be done
  * @param sep session endpoint to be looked up
- * @return index that can be interpreted as an app index or drop action.
+ * @return session handle that can be interpreted as an adjacency
  */
-u32
+u64
 session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep)
 {
   session_rules_table_t *srt;
@@ -563,7 +566,7 @@ session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep)
       ai = session_rules_table_lookup4 (srt, &lcl4, &sep->ip.ip4, 0,
                                        sep->port);
       if (session_lookup_action_index_is_valid (ai))
-       return session_lookup_action_to_app_index (ai);
+       return session_lookup_action_to_handle (ai);
 
       /*
        * Check if session endpoint is a listener
@@ -572,7 +575,7 @@ session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep)
                           sep->transport_proto);
       rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
       if (rv == 0)
-       return (u32) kv4.value;
+       return kv4.value;
 
       /*
        * Zero out the ip. Logic is that connect to local ips, say
@@ -581,7 +584,7 @@ session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep)
       kv4.key[0] = 0;
       rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
       if (rv == 0)
-       return (u32) kv4.value;
+       return kv4.value;
 
       /*
        * Zero out the port and check if we have proxy
@@ -589,7 +592,7 @@ session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep)
       kv4.key[1] = 0;
       rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
       if (rv == 0)
-       return (u32) kv4.value;
+       return kv4.value;
     }
   else
     {
@@ -601,13 +604,13 @@ session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep)
       ai = session_rules_table_lookup6 (srt, &lcl6, &sep->ip.ip6, 0,
                                        sep->port);
       if (session_lookup_action_index_is_valid (ai))
-       return session_lookup_action_to_app_index (ai);
+       return session_lookup_action_to_handle (ai);
 
       make_v6_listener_kv (&kv6, &sep->ip.ip6, sep->port,
                           sep->transport_proto);
       rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
       if (rv == 0)
-       return (u32) kv6.value;
+       return kv6.value;
 
       /*
        * Zero out the ip. Same logic as above.
@@ -615,7 +618,7 @@ session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep)
       kv6.key[0] = kv6.key[1] = 0;
       rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
       if (rv == 0)
-       return (u32) kv6.value;
+       return kv6.value;
 
       /*
        * Zero out the port. Same logic as above.
@@ -623,9 +626,9 @@ session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep)
       kv6.key[4] = kv6.key[5] = 0;
       rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
       if (rv == 0)
-       return (u32) kv6.value;
+       return kv6.value;
     }
-  return APP_INVALID_INDEX;
+  return SESSION_INVALID_HANDLE;
 }
 
 static stream_session_t *
@@ -1234,37 +1237,6 @@ session_lookup_safe6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt,
   return 0;
 }
 
-u64
-session_lookup_local_listener_make_handle (session_endpoint_t * sep)
-{
-  return ((u64) SESSION_LOCAL_TABLE_PREFIX << 32
-         | (u32) sep->port << 16 | (u32) sep->transport_proto << 8
-         | (u32) sep->is_ip4);
-}
-
-u8
-session_lookup_local_is_handle (u64 handle)
-{
-  if (handle >> 32 == SESSION_LOCAL_TABLE_PREFIX)
-    return 1;
-  return 0;
-}
-
-int
-session_lookup_local_listener_parse_handle (u64 handle,
-                                           session_endpoint_t * sep)
-{
-  u32 local_table_handle;
-  if (handle >> 32 != SESSION_LOCAL_TABLE_PREFIX)
-    return -1;
-  local_table_handle = handle & 0xFFFFFFFFULL;
-  sep->is_ip4 = local_table_handle & 0xff;
-  local_table_handle >>= 8;
-  sep->transport_proto = local_table_handle & 0xff;
-  sep->port = local_table_handle >> 8;
-  return 0;
-}
-
 clib_error_t *
 vnet_session_rule_add_del (session_rule_add_del_args_t * args)
 {
index 1bdf6d1..1acf923 100644 (file)
@@ -64,7 +64,7 @@ int session_lookup_del_connection (transport_connection_t * tc);
 u64 session_lookup_endpoint_listener (u32 table_index,
                                      session_endpoint_t * sepi,
                                      u8 use_rules);
-u32 session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep);
+u64 session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep);
 stream_session_t *session_lookup_global_session_endpoint (session_endpoint_t
                                                          *);
 int session_lookup_add_session_endpoint (u32 table_index,
@@ -80,11 +80,6 @@ transport_connection_t *session_lookup_half_open_connection (u64 handle,
                                                             u8 is_ip4);
 u32 session_lookup_get_index_for_fib (u32 fib_proto, u32 fib_index);
 
-u64 session_lookup_local_listener_make_handle (session_endpoint_t * sep);
-u8 session_lookup_local_is_handle (u64 handle);
-int session_lookup_local_listener_parse_handle (u64 handle,
-                                               session_endpoint_t * sep);
-
 void session_lookup_show_table_entries (vlib_main_t * vm,
                                        session_table_t * table, u8 type,
                                        u8 is_local);
index 636ee44..31b1f64 100644 (file)
@@ -55,6 +55,7 @@ typedef struct _session_lookup_table
 #define SESSION_LOCAL_TABLE_PREFIX ((u32)~0)
 #define SESSION_INVALID_INDEX ((u32)~0)
 #define SESSION_INVALID_HANDLE ((u64)~0)
+#define SESSION_DROP_HANDLE (((u64)~0) - 1)
 
 typedef int (*ip4_session_table_walk_fn_t) (clib_bihash_kv_16_8_t * kvp,
                                            void *ctx);
index febe1b7..85e8732 100644 (file)
@@ -53,12 +53,20 @@ dummy_session_connected_callback (u32 app_index, u32 api_context,
   return -1;
 }
 
+static u32 dummy_segment_count;
+
 int
-dummy_add_segment_callback (u32 client_index, const u8 * seg_name,
-                           u32 seg_size)
+dummy_add_segment_callback (u32 client_index, const ssvm_private_t * fs)
 {
-  clib_warning ("called...");
-  return -1;
+  dummy_segment_count = 1;
+  return 0;
+}
+
+int
+dummy_del_segment_callback (u32 client_index, const ssvm_private_t * fs)
+{
+  dummy_segment_count = 0;
+  return 0;
 }
 
 int
@@ -73,11 +81,14 @@ dummy_session_disconnect_callback (stream_session_t * s)
   clib_warning ("called...");
 }
 
+static u32 dummy_accept;
+
 int
 dummy_session_accept_callback (stream_session_t * s)
 {
-  clib_warning ("called...");
-  return -1;
+  dummy_accept = 1;
+  s->session_state = SESSION_STATE_READY;
+  return 0;
 }
 
 int
@@ -94,7 +105,8 @@ static session_cb_vft_t dummy_session_cbs = {
   .session_accept_callback = dummy_session_accept_callback,
   .session_disconnect_callback = dummy_session_disconnect_callback,
   .builtin_server_rx_callback = dummy_server_rx_callback,
-  .redirect_connect_callback = dummy_redirect_connect_callback,
+  .add_segment_callback = dummy_add_segment_callback,
+  .del_segment_callback = dummy_del_segment_callback,
 };
 /* *INDENT-ON* */
 
@@ -146,7 +158,6 @@ session_test_basic (vlib_main_t * vm, unformat_input_t * input)
 
   memset (options, 0, sizeof (options));
   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
-  options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
   options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
   options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
   vnet_app_attach_args_t attach_args = {
@@ -205,7 +216,7 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
 {
   u64 options[APP_OPTIONS_N_OPTIONS], dummy_secret = 1234;
   u32 server_index, server_st_index, server_local_st_index;
-  u32 dummy_port = 1234, local_listener, client_index;
+  u32 dummy_port = 1234, client_index;
   u32 dummy_api_context = 4321, dummy_client_api_index = 1234;
   u32 dummy_server_api_index = ~0, sw_if_index = 0;
   session_endpoint_t server_sep = SESSION_ENDPOINT_NULL;
@@ -216,6 +227,7 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   app_namespace_t *app_ns;
   application_t *server;
   stream_session_t *s;
+  u64 handle;
   int code;
 
   server_sep.is_ip4 = 1;
@@ -225,7 +237,6 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   memset (options, 0, sizeof (options));
 
   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
-  options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
   vnet_app_attach_args_t attach_args = {
     .api_client_index = ~0,
     .options = options,
@@ -361,9 +372,8 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   SESSION_TEST ((s->app_index == server_index), "app_index should be that of "
                "the server");
   server_local_st_index = application_local_session_table (server);
-  local_listener =
-    session_lookup_local_endpoint (server_local_st_index, &server_sep);
-  SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
+  handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+  SESSION_TEST ((handle != SESSION_INVALID_HANDLE),
                "listener should exist in local table");
 
   /*
@@ -381,12 +391,15 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   code = clib_error_get_code (error);
   SESSION_TEST ((code == VNET_API_ERROR_INVALID_VALUE),
                "error code should be invalid value (zero ip)");
+  SESSION_TEST ((dummy_segment_count == 0),
+               "shouldn't have received request to map new segment");
   connect_args.sep.ip.ip4.as_u8[0] = 127;
   error = vnet_connect (&connect_args);
-  SESSION_TEST ((error != 0), "client connect should return error code");
+  SESSION_TEST ((error == 0), "client connect should not return error code");
   code = clib_error_get_code (error);
-  SESSION_TEST ((code == VNET_API_ERROR_SESSION_REDIRECT),
-               "error code should be redirect");
+  SESSION_TEST ((dummy_segment_count == 1),
+               "should've received request to map new segment");
+  SESSION_TEST ((dummy_accept == 1), "should've received accept request");
   detach_args.app_index = client_index;
   vnet_application_detach (&detach_args);
 
@@ -413,9 +426,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
 
   s = session_lookup_listener (server_st_index, &server_sep);
   SESSION_TEST ((s == 0), "listener should not exist in global table");
-  local_listener =
-    session_lookup_local_endpoint (server_local_st_index, &server_sep);
-  SESSION_TEST ((s == 0), "listener should not exist in local table");
+  handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+  SESSION_TEST ((handle == SESSION_INVALID_HANDLE),
+               "listener should not exist in local table");
 
   detach_args.app_index = server_index;
   vnet_application_detach (&detach_args);
@@ -438,18 +451,16 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   s = session_lookup_listener (server_st_index, &server_sep);
   SESSION_TEST ((s == 0), "listener should not exist in global table");
   server_local_st_index = application_local_session_table (server);
-  local_listener =
-    session_lookup_local_endpoint (server_local_st_index, &server_sep);
-  SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
+  handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+  SESSION_TEST ((handle != SESSION_INVALID_HANDLE),
                "listener should exist in local table");
 
   unbind_args.handle = bind_args.handle;
   error = vnet_unbind (&unbind_args);
   SESSION_TEST ((error == 0), "unbind should work");
 
-  local_listener =
-    session_lookup_local_endpoint (server_local_st_index, &server_sep);
-  SESSION_TEST ((local_listener == SESSION_INVALID_INDEX),
+  handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+  SESSION_TEST ((handle == SESSION_INVALID_HANDLE),
                "listener should not exist in local table");
 
   /*
@@ -510,9 +521,8 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
   SESSION_TEST ((s->app_index == server_index), "app_index should be that of "
                "the server");
   server_local_st_index = application_local_session_table (server);
-  local_listener =
-    session_lookup_local_endpoint (server_local_st_index, &server_sep);
-  SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
+  handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+  SESSION_TEST ((handle != SESSION_INVALID_HANDLE),
                "zero listener should exist in local table");
   detach_args.app_index = server_index;
   vnet_application_detach (&detach_args);
@@ -800,7 +810,7 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
   session_endpoint_t server_sep = SESSION_ENDPOINT_NULL;
   u64 options[APP_OPTIONS_N_OPTIONS];
   u16 lcl_port = 1234, rmt_port = 4321;
-  u32 server_index, server_index2, app_index;
+  u32 server_index, server_index2;
   u32 dummy_server_api_index = ~0;
   transport_connection_t *tc;
   u32 dummy_port = 1111;
@@ -811,6 +821,7 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
   u32 local_ns_index = default_ns->local_table_index;
   int verbose = 0, rv;
   app_namespace_t *app_ns;
+  u64 handle;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
     {
@@ -844,7 +855,6 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
    * Attach server with global and local default scope
    */
   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
-  options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
   options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
   options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
   attach_args.namespace_id = 0;
@@ -918,8 +928,8 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
     .port = rmt_port,
     .transport_proto = TRANSPORT_PROTO_TCP,
   };
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index != server_index), "local session endpoint lookup "
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle != server_index), "local session endpoint lookup "
                "should not work (global scope)");
 
   tc = session_lookup_connection_wt4 (0, &lcl_pref.fp_addr.ip4,
@@ -943,8 +953,8 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
                                      &is_filtered);
   SESSION_TEST ((tc->c_index == listener->connection_index),
                "optimized lookup for lcl port + 1 should work");
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index == server_index), "local session endpoint lookup "
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle == server_index), "local session endpoint lookup "
                "should work (lcl ip was zeroed)");
 
   /*
@@ -976,8 +986,8 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
                "should fail (deny rule)");
   SESSION_TEST ((is_filtered == 1), "lookup should be filtered (deny)");
 
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index == APP_DROP_INDEX), "lookup for 1.2.3.4/32 1234 "
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle == SESSION_DROP_HANDLE), "lookup for 1.2.3.4/32 1234 "
                "5.6.7.8/16 4321 in local table should return deny");
 
   tc = session_lookup_connection_wt4 (0, &lcl_pref.fp_addr.ip4,
@@ -1011,9 +1021,9 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
                "should fail (allow without app)");
   SESSION_TEST ((is_filtered == 0), "lookup should NOT be filtered");
 
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index == APP_INVALID_INDEX), "lookup for 1.2.3.4/32 1234"
-               " 5.6.7.8/32 4321 in local table should return invalid");
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle == SESSION_INVALID_HANDLE), "lookup for 1.2.3.4/32 "
+               "1234 5.6.7.8/32 4321 in local table should return invalid");
 
   if (verbose)
     {
@@ -1023,8 +1033,8 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
     }
 
   sep.ip.ip4.as_u32 += 1 << 24;
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index == APP_DROP_INDEX), "lookup for 1.2.3.4/32 1234"
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle == SESSION_DROP_HANDLE), "lookup for 1.2.3.4/32 1234"
                " 5.6.7.9/32 4321 in local table should return deny");
 
   vnet_connect_args_t connect_args = {
@@ -1075,8 +1085,8 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
                                             TRANSPORT_PROTO_TCP);
     }
 
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index == APP_DROP_INDEX),
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle == SESSION_DROP_HANDLE),
                "local session endpoint lookup should return deny");
 
   /*
@@ -1091,8 +1101,8 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
   error = vnet_session_rule_add_del (&args);
   SESSION_TEST ((error == 0), "Del 1.2.3.4/32 1234 5.6.7.8/32 4321 deny");
 
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index == APP_INVALID_INDEX),
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle == SESSION_INVALID_HANDLE),
                "local session endpoint lookup should return invalid");
 
   /*
@@ -1108,8 +1118,8 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
   args.table_args.rmt_port = 4321;
   error = vnet_session_rule_add_del (&args);
   SESSION_TEST ((error == 0), "Del 0/0 * 5.6.7.8/16 4321");
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index != server_index), "local session endpoint lookup "
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle != server_index), "local session endpoint lookup "
                "should not work (removed)");
 
   args.table_args.is_add = 0;
@@ -1296,14 +1306,14 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Lookup default namespace
    */
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index == APP_INVALID_INDEX),
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle == SESSION_INVALID_HANDLE),
                "lookup for 1.2.3.4/32 1234 5.6.7.8/32 4321 in local table "
                "should return allow (invalid)");
 
   sep.port += 1;
-  app_index = session_lookup_local_endpoint (local_ns_index, &sep);
-  SESSION_TEST ((app_index == APP_DROP_INDEX), "lookup for 1.2.3.4/32 1234 "
+  handle = session_lookup_local_endpoint (local_ns_index, &sep);
+  SESSION_TEST ((handle == SESSION_DROP_HANDLE), "lookup for 1.2.3.4/32 1234 "
                "5.6.7.8/16 432*2* in local table should return deny");
 
   connect_args.app_index = server_index;
@@ -1317,8 +1327,8 @@ session_test_rules (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Lookup test namespace
    */
-  app_index = session_lookup_local_endpoint (app_ns->local_table_index, &sep);
-  SESSION_TEST ((app_index == APP_DROP_INDEX), "lookup for 1.2.3.4/32 1234 "
+  handle = session_lookup_local_endpoint (app_ns->local_table_index, &sep);
+  SESSION_TEST ((handle == SESSION_DROP_HANDLE), "lookup for 1.2.3.4/32 1234 "
                "5.6.7.8/16 4321 in local table should return deny");
 
   connect_args.app_index = server_index;
index b484efe..57d256c 100644 (file)
@@ -36,6 +36,26 @@ typedef enum
   SESSION_STATE_N_STATES,
 } stream_session_state_t;
 
+/* TODO convert to macro once cleanup completed */
+typedef struct app_session_
+{
+  /** fifo pointers. Once allocated, these do not move */
+  svm_fifo_t *server_rx_fifo;
+  svm_fifo_t *server_tx_fifo;
+
+  /** Type */
+  session_type_t session_type;
+
+  /** State */
+  volatile u8 session_state;
+
+  /** Session index in owning pool */
+  u32 session_index;
+
+  /** Application index */
+  u32 app_index;
+} app_session_t;
+
 typedef struct _stream_session_t
 {
   /** fifo pointers. Once allocated, these do not move */
@@ -48,6 +68,12 @@ typedef struct _stream_session_t
   /** State */
   volatile u8 session_state;
 
+  /** Session index in per_thread pool */
+  u32 session_index;
+
+  /** stream server pool index */
+  u32 app_index;
+
   u8 thread_index;
 
   /** To avoid n**2 "one event per frame" check */
@@ -56,21 +82,56 @@ typedef struct _stream_session_t
   /** svm segment index where fifos were allocated */
   u32 svm_segment_index;
 
-  /** Session index in per_thread pool */
-  u32 session_index;
-
   /** Transport specific */
   u32 connection_index;
 
-  /** stream server pool index */
-  u32 app_index;
-
   /** Parent listener session if the result of an accept */
   u32 listener_index;
 
     CLIB_CACHE_LINE_ALIGN_MARK (pad);
 } stream_session_t;
 
+typedef struct local_session_
+{
+  /** fifo pointers. Once allocated, these do not move */
+  svm_fifo_t *server_rx_fifo;
+  svm_fifo_t *server_tx_fifo;
+
+  /** Type */
+  session_type_t session_type;
+
+  /** State */
+  volatile u8 session_state;
+
+  /** Session index */
+  u32 session_index;
+
+  /** Server index */
+  u32 app_index;
+
+  /** Segment index where fifos were allocated */
+  u32 svm_segment_index;
+
+  u32 listener_index;
+
+  /** Port for connection */
+  u16 port;
+
+  /** Has transport embedded when listener not purely local */
+  session_type_t listener_session_type;
+
+  /**
+   * Client data
+   */
+  u32 client_index;
+  u32 client_opaque;
+
+  u64 server_evt_q;
+  u64 client_evt_q;
+
+    CLIB_CACHE_LINE_ALIGN_MARK (pad);
+} local_session_t;
+
 typedef struct _session_endpoint
 {
   /*
index 12b6a05..76ee226 100644 (file)
@@ -71,6 +71,7 @@ typedef enum _transport_proto
   TRANSPORT_PROTO_TCP,
   TRANSPORT_PROTO_UDP,
   TRANSPORT_PROTO_SCTP,
+  TRANSPORT_PROTO_NONE,
   TRANSPORT_N_PROTO
 } transport_proto_t;
 
index 80e27c0..047b8ca 100644 (file)
@@ -85,6 +85,9 @@ class TestSession(VppTestCase):
             self.logger.critical(error)
             self.assertEqual(error.find("failed"), -1)
 
+        if self.vpp_dead:
+            self.assert_equal(0)
+
         # Delete inter-table routes
         ip_t01.remove_vpp_config()
         ip_t10.remove_vpp_config()