Session layer improvements 09/6309/7
authorFlorin Coras <fcoras@cisco.com>
Wed, 19 Apr 2017 20:00:05 +0000 (13:00 -0700)
committerDave Barach <openvpp@barachs.net>
Mon, 24 Apr 2017 12:02:14 +0000 (12:02 +0000)
Among others:
- Moved app event queue to shared memory segment
- Use private memory segment for builtin apps
- Remove pid from svm fifo
- Protect session fifo (de)allocation
- Use fifo event for session disconnects
- Have session queue node poll in all wk threads

Change-Id: I89dbf7fdfebef12f5ef2b34ba3ef3c2c07f49ff2
Signed-off-by: Florin Coras <fcoras@cisco.com>
25 files changed:
src/svm/svm_fifo.c
src/svm/svm_fifo.h
src/svm/svm_fifo_segment.c
src/svm/svm_fifo_segment.h
src/svm/test_svm_fifo1.c
src/uri/uri_tcp_test.c
src/uri/uri_udp_test.c
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_interface.c
src/vnet/session/application_interface.h
src/vnet/session/node.c
src/vnet/session/segment_manager.c
src/vnet/session/segment_manager.h
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/tcp/builtin_client.c
src/vnet/tcp/builtin_server.c
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp_input.c
src/vnet/tcp/tcp_output.c
src/vnet/tcp/tcp_test.c
src/vnet/udp/builtin_server.c
src/vnet/udp/udp_input.c

index f428d3e..8f2ed0c 100644 (file)
@@ -57,7 +57,7 @@ format_svm_fifo (u8 * s, va_list * args)
   if (verbose > 1)
     s = format
       (s, "server session %d thread %d client session %d thread %d\n",
-       f->server_session_index, f->server_thread_index,
+       f->master_session_index, f->master_thread_index,
        f->client_session_index, f->client_thread_index);
 
   if (verbose)
@@ -353,8 +353,7 @@ ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued)
 }
 
 static int
-svm_fifo_enqueue_internal (svm_fifo_t * f,
-                          int pid, u32 max_bytes, u8 * copy_from_here)
+svm_fifo_enqueue_internal (svm_fifo_t * f, u32 max_bytes, u8 * copy_from_here)
 {
   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
   u32 cursize, nitems;
@@ -411,10 +410,9 @@ svm_fifo_enqueue_internal (svm_fifo_t * f,
 }
 
 int
-svm_fifo_enqueue_nowait (svm_fifo_t * f,
-                        int pid, u32 max_bytes, u8 * copy_from_here)
+svm_fifo_enqueue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_from_here)
 {
-  return svm_fifo_enqueue_internal (f, pid, max_bytes, copy_from_here);
+  return svm_fifo_enqueue_internal (f, max_bytes, copy_from_here);
 }
 
 /**
@@ -426,7 +424,6 @@ svm_fifo_enqueue_nowait (svm_fifo_t * f,
  */
 static int
 svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
-                                      int pid,
                                       u32 offset,
                                       u32 required_bytes,
                                       u8 * copy_from_here)
@@ -439,7 +436,7 @@ svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
   /* Users would do well to avoid this */
   if (PREDICT_FALSE (f->tail == (offset % f->nitems)))
     {
-      rv = svm_fifo_enqueue_internal (f, pid, required_bytes, copy_from_here);
+      rv = svm_fifo_enqueue_internal (f, required_bytes, copy_from_here);
       if (rv > 0)
        return 0;
       return -1;
@@ -484,18 +481,16 @@ svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
 
 int
 svm_fifo_enqueue_with_offset (svm_fifo_t * f,
-                             int pid,
                              u32 offset,
                              u32 required_bytes, u8 * copy_from_here)
 {
-  return svm_fifo_enqueue_with_offset_internal
-    (f, pid, offset, required_bytes, copy_from_here);
+  return svm_fifo_enqueue_with_offset_internal (f, offset, required_bytes,
+                                               copy_from_here);
 }
 
 
 static int
-svm_fifo_dequeue_internal (svm_fifo_t * f,
-                          int pid, u32 max_bytes, u8 * copy_here)
+svm_fifo_dequeue_internal (svm_fifo_t * f, u32 max_bytes, u8 * copy_here)
 {
   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
   u32 cursize, nitems;
@@ -545,14 +540,13 @@ svm_fifo_dequeue_internal (svm_fifo_t * f,
 }
 
 int
-svm_fifo_dequeue_nowait (svm_fifo_t * f,
-                        int pid, u32 max_bytes, u8 * copy_here)
+svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_here)
 {
-  return svm_fifo_dequeue_internal (f, pid, max_bytes, copy_here);
+  return svm_fifo_dequeue_internal (f, max_bytes, copy_here);
 }
 
 int
-svm_fifo_peek (svm_fifo_t * f, int pid, u32 relative_offset, u32 max_bytes,
+svm_fifo_peek (svm_fifo_t * f, u32 relative_offset, u32 max_bytes,
               u8 * copy_here)
 {
   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
@@ -590,7 +584,7 @@ svm_fifo_peek (svm_fifo_t * f, int pid, u32 relative_offset, u32 max_bytes,
 }
 
 int
-svm_fifo_dequeue_drop (svm_fifo_t * f, int pid, u32 max_bytes)
+svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes)
 {
   u32 total_drop_bytes, first_drop_bytes, second_drop_bytes;
   u32 cursize, nitems;
index 0fff257..d67237c 100644 (file)
 #include <vppinfra/format.h>
 #include <pthread.h>
 
-typedef enum
-{
-  SVM_FIFO_TAG_NOT_HELD = 0,
-  SVM_FIFO_TAG_DEQUEUE,
-  SVM_FIFO_TAG_ENQUEUE,
-} svm_lock_tag_t;
-
 /** Out-of-order segment */
 typedef struct
 {
@@ -37,7 +30,7 @@ typedef struct
   u32 prev;    /**< Previous linked-list element pool index */
 
   u32 start;   /**< Start of segment, normalized*/
-  u32 length;          /**< Length of segment */
+  u32 length;  /**< Length of segment */
 } ooo_segment_t;
 
 format_function_t format_ooo_segment;
@@ -52,12 +45,11 @@ typedef struct
     CLIB_CACHE_LINE_ALIGN_MARK (end_cursize);
 
   volatile u8 has_event;       /**< non-zero if deq event exists */
-  u32 owner_pid;
 
   /* Backpointers */
-  u32 server_session_index;
+  u32 master_session_index;
   u32 client_session_index;
-  u8 server_thread_index;
+  u8 master_thread_index;
   u8 client_thread_index;
   u32 segment_manager;
     CLIB_CACHE_LINE_ALIGN_MARK (end_shared);
@@ -117,19 +109,14 @@ svm_fifo_unset_event (svm_fifo_t * f)
 svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes);
 void svm_fifo_free (svm_fifo_t * f);
 
-int svm_fifo_enqueue_nowait (svm_fifo_t * f, int pid, u32 max_bytes,
+int svm_fifo_enqueue_nowait (svm_fifo_t * f, u32 max_bytes,
                             u8 * copy_from_here);
+int svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset,
+                                 u32 required_bytes, u8 * copy_from_here);
+int svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_here);
 
-int svm_fifo_enqueue_with_offset (svm_fifo_t * f, int pid,
-                                 u32 offset, u32 required_bytes,
-                                 u8 * copy_from_here);
-
-int svm_fifo_dequeue_nowait (svm_fifo_t * f, int pid, u32 max_bytes,
-                            u8 * copy_here);
-
-int svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
-                  u8 * copy_here);
-int svm_fifo_dequeue_drop (svm_fifo_t * f, int pid, u32 max_bytes);
+int svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 max_bytes, u8 * copy_here);
+int svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes);
 u32 svm_fifo_number_ooo_segments (svm_fifo_t * f);
 ooo_segment_t *svm_fifo_first_ooo_segment (svm_fifo_t * f);
 
index acabb3b..281fae2 100644 (file)
@@ -70,6 +70,44 @@ svm_fifo_segment_create (svm_fifo_segment_create_args_t * a)
   return (0);
 }
 
+/** Create an svm fifo segment in process-private memory */
+int
+svm_fifo_segment_create_process_private (svm_fifo_segment_create_args_t * a)
+{
+  svm_fifo_segment_private_t *s;
+  svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
+  ssvm_shared_header_t *sh;
+  svm_fifo_segment_header_t *fsh;
+
+  /* Allocate a fresh segment */
+  pool_get (sm->segments, s);
+  memset (s, 0, sizeof (*s));
+
+  s->ssvm.ssvm_size = ~0;
+  s->ssvm.i_am_master = 1;
+  s->ssvm.my_pid = getpid ();
+  s->ssvm.name = (u8 *) a->segment_name;
+  s->ssvm.requested_va = ~0;
+
+  /* Allocate a [sic] shared memory header, in process memory... */
+  sh = clib_mem_alloc_aligned (sizeof (*sh), CLIB_CACHE_LINE_BYTES);
+  s->ssvm.sh = sh;
+
+  memset (sh, 0, sizeof (*sh));
+  sh->heap = clib_mem_get_heap ();
+
+  /* Set up svm_fifo_segment shared header */
+  fsh = clib_mem_alloc (sizeof (*fsh));
+  memset (fsh, 0, sizeof (*fsh));
+  sh->opaque[0] = fsh;
+  s->h = fsh;
+  fsh->segment_name = format (0, "%s%c", a->segment_name, 0);
+
+  sh->ready = 1;
+  a->new_segment_index = s - sm->segments;
+  return (0);
+}
+
 /** (slave) attach to an svm fifo segment */
 int
 svm_fifo_segment_attach (svm_fifo_segment_create_args_t * a)
@@ -82,7 +120,6 @@ svm_fifo_segment_attach (svm_fifo_segment_create_args_t * a)
 
   /* Allocate a fresh segment */
   pool_get (sm->segments, s);
-
   memset (s, 0, sizeof (*s));
 
   s->ssvm.ssvm_size = a->segment_size;
@@ -126,19 +163,22 @@ svm_fifo_segment_alloc_fifo (svm_fifo_segment_private_t * s,
 
   sh = s->ssvm.sh;
   fsh = (svm_fifo_segment_header_t *) sh->opaque[0];
+
+  ssvm_lock (sh, 1, 0);
   oldheap = ssvm_push_heap (sh);
 
   /* Note: this can fail, in which case: create another segment */
   f = svm_fifo_create (data_size_in_bytes);
-  if (f == 0)
+  if (PREDICT_FALSE (f == 0))
     {
       ssvm_pop_heap (oldheap);
+      ssvm_unlock (sh);
       return (0);
     }
 
   vec_add1 (fsh->fifos, f);
-
   ssvm_pop_heap (oldheap);
+  ssvm_unlock (sh);
   return (f);
 }
 
@@ -152,8 +192,9 @@ svm_fifo_segment_free_fifo (svm_fifo_segment_private_t * s, svm_fifo_t * f)
 
   sh = s->ssvm.sh;
   fsh = (svm_fifo_segment_header_t *) sh->opaque[0];
-  oldheap = ssvm_push_heap (sh);
 
+  ssvm_lock (sh, 1, 0);
+  oldheap = ssvm_push_heap (sh);
   for (i = 0; i < vec_len (fsh->fifos); i++)
     {
       if (fsh->fifos[i] == f)
@@ -167,6 +208,7 @@ svm_fifo_segment_free_fifo (svm_fifo_segment_private_t * s, svm_fifo_t * f)
 found:
   clib_mem_free (f);
   ssvm_pop_heap (oldheap);
+  ssvm_unlock (sh);
 }
 
 void
index 9ab47a4..4218013 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <svm/svm_fifo.h>
 #include <svm/ssvm.h>
+#include <vppinfra/lock.h>
 
 typedef struct
 {
@@ -32,6 +33,8 @@ typedef struct
 
 typedef struct
 {
+  volatile u32 lock;
+
   /** pool of segments */
   svm_fifo_segment_private_t *segments;
   /* Where to put the next one */
@@ -78,6 +81,8 @@ typedef enum
 } ssvm_fifo_segment_api_error_enum_t;
 
 int svm_fifo_segment_create (svm_fifo_segment_create_args_t * a);
+int svm_fifo_segment_create_process_private (svm_fifo_segment_create_args_t
+                                            * a);
 int svm_fifo_segment_attach (svm_fifo_segment_create_args_t * a);
 void svm_fifo_segment_delete (svm_fifo_segment_private_t * s);
 
index 355653d..398dd6d 100644 (file)
@@ -25,7 +25,6 @@ hello_world (int verbose)
   u8 *test_data;
   u8 *retrieved_data = 0;
   clib_error_t *error = 0;
-  int pid = getpid ();
 
   memset (a, 0, sizeof (*a));
 
@@ -48,18 +47,16 @@ hello_world (int verbose)
   vec_validate (retrieved_data, vec_len (test_data) - 1);
 
   while (svm_fifo_max_enqueue (f) >= vec_len (test_data))
-    svm_fifo_enqueue_nowait (f, pid, vec_len (test_data), test_data);
+    svm_fifo_enqueue_nowait (f, vec_len (test_data), test_data);
 
   while (svm_fifo_max_dequeue (f) >= vec_len (test_data))
-    svm_fifo_dequeue_nowait (f, pid, vec_len (retrieved_data),
-                            retrieved_data);
+    svm_fifo_dequeue_nowait (f, vec_len (retrieved_data), retrieved_data);
 
   while (svm_fifo_max_enqueue (f) >= vec_len (test_data))
-    svm_fifo_enqueue_nowait (f, pid, vec_len (test_data), test_data);
+    svm_fifo_enqueue_nowait (f, vec_len (test_data), test_data);
 
   while (svm_fifo_max_dequeue (f) >= vec_len (test_data))
-    svm_fifo_dequeue_nowait (f, pid, vec_len (retrieved_data),
-                            retrieved_data);
+    svm_fifo_dequeue_nowait (f, vec_len (retrieved_data), retrieved_data);
 
   if (!memcmp (retrieved_data, test_data, vec_len (test_data)))
     error = clib_error_return (0, "data test OK, got '%s'", retrieved_data);
@@ -81,7 +78,6 @@ master (int verbose)
   u8 *test_data;
   u8 *retrieved_data = 0;
   int i;
-  int pid = getpid ();
 
   memset (a, 0, sizeof (*a));
 
@@ -104,7 +100,7 @@ master (int verbose)
   vec_validate (retrieved_data, vec_len (test_data) - 1);
 
   for (i = 0; i < 1000; i++)
-    svm_fifo_enqueue_nowait (f, pid, vec_len (test_data), test_data);
+    svm_fifo_enqueue_nowait (f, vec_len (test_data), test_data);
 
   return clib_error_return (0, "master (enqueue) done");
 }
@@ -176,7 +172,6 @@ offset (int verbose)
   u32 *test_data = 0;
   u32 *recovered_data = 0;
   int i;
-  int pid = getpid ();
 
   memset (a, 0, sizeof (*a));
 
@@ -199,19 +194,19 @@ offset (int verbose)
     vec_add1 (test_data, i);
 
   /* Enqueue the first 1024 u32's */
-  svm_fifo_enqueue_nowait (f, pid, 4096 /* bytes to enqueue */ ,
+  svm_fifo_enqueue_nowait (f, 4096 /* bytes to enqueue */ ,
                           (u8 *) test_data);
 
   /* Enqueue the third 1024 u32's 2048 ahead of the current tail */
-  svm_fifo_enqueue_with_offset (f, pid, 4096, 4096, (u8 *) & test_data[2048]);
+  svm_fifo_enqueue_with_offset (f, 4096, 4096, (u8 *) & test_data[2048]);
 
   /* Enqueue the second 1024 u32's at the current tail */
-  svm_fifo_enqueue_nowait (f, pid, 4096 /* bytes to enqueue */ ,
+  svm_fifo_enqueue_nowait (f, 4096 /* bytes to enqueue */ ,
                           (u8 *) & test_data[1024]);
 
   vec_validate (recovered_data, (3 * 1024) - 1);
 
-  svm_fifo_dequeue_nowait (f, pid, 3 * 4096, (u8 *) recovered_data);
+  svm_fifo_dequeue_nowait (f, 3 * 4096, (u8 *) recovered_data);
 
   for (i = 0; i < (3 * 1024); i++)
     {
@@ -237,7 +232,6 @@ slave (int verbose)
   int rv;
   u8 *test_data;
   u8 *retrieved_data = 0;
-  int pid = getpid ();
   int i;
 
   memset (a, 0, sizeof (*a));
@@ -262,8 +256,7 @@ slave (int verbose)
 
   for (i = 0; i < 1000; i++)
     {
-      svm_fifo_dequeue_nowait (f, pid, vec_len (retrieved_data),
-                              retrieved_data);
+      svm_fifo_dequeue_nowait (f, vec_len (retrieved_data), retrieved_data);
       if (memcmp (retrieved_data, test_data, vec_len (retrieved_data)))
        return clib_error_return (0, "retrieved data incorrect, '%s'",
                                  retrieved_data);
index 2e15d36..686c93f 100755 (executable)
@@ -45,12 +45,13 @@ typedef struct
   svm_fifo_t *server_rx_fifo;
   svm_fifo_t *server_tx_fifo;
 
-  u32 vpp_session_handle;
+  u64 vpp_session_handle;
 } session_t;
 
 typedef enum
 {
   STATE_START,
+  STATE_ATTACHED,
   STATE_READY,
   STATE_DISCONNECTING,
   STATE_FAILED
@@ -127,6 +128,34 @@ uri_tcp_test_main_t uri_tcp_test_main;
 #define NITER 4000000
 #endif
 
+static u8 *
+format_api_error (u8 * s, va_list * args)
+{
+  uri_tcp_test_main_t *utm = &uri_tcp_test_main;
+  i32 error = va_arg (*args, u32);
+  uword *p;
+
+  p = hash_get (utm->error_string_by_error_number, -error);
+
+  if (p)
+    s = format (s, "%s", p[0]);
+  else
+    s = format (s, "%d", error);
+  return s;
+}
+
+static void
+init_error_string_table (uri_tcp_test_main_t * utm)
+{
+  utm->error_string_by_error_number = hash_create (0, sizeof (uword));
+
+#define _(n,v,s) hash_set (utm->error_string_by_error_number, -v, s);
+  foreach_vnet_api_error;
+#undef _
+
+  hash_set (utm->error_string_by_error_number, 99, "Misc");
+}
+
 int
 wait_for_state_change (uri_tcp_test_main_t * utm, connection_state_t state)
 {
@@ -150,7 +179,7 @@ wait_for_state_change (uri_tcp_test_main_t * utm, connection_state_t state)
 }
 
 void
-application_attach (uri_tcp_test_main_t * utm)
+application_send_attach (uri_tcp_test_main_t * utm)
 {
   vl_api_application_attach_t *bmp;
   u32 fifo_size = 3 << 20;
@@ -160,8 +189,8 @@ application_attach (uri_tcp_test_main_t * utm)
   bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
   bmp->client_index = utm->my_client_index;
   bmp->context = ntohl (0xfeedface);
-  bmp->options[SESSION_OPTIONS_FLAGS] =
-    SESSION_OPTIONS_FLAGS_USE_FIFO | SESSION_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] =
+    APP_OPTIONS_FLAGS_USE_FIFO | APP_OPTIONS_FLAGS_ADD_SEGMENT;
   bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
   bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
   bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
@@ -169,6 +198,18 @@ application_attach (uri_tcp_test_main_t * utm)
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
 }
 
+int
+application_attach (uri_tcp_test_main_t * utm)
+{
+  application_send_attach (utm);
+  if (wait_for_state_change (utm, STATE_ATTACHED))
+    {
+      clib_warning ("timeout waiting for STATE_ATTACHED");
+      return -1;
+    }
+  return 0;
+}
+
 void
 application_detach (uri_tcp_test_main_t * utm)
 {
@@ -192,8 +233,8 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
 
   if (mp->retval)
     {
-      uword *errp = hash_get (utm->error_string_by_error_number, -mp->retval);
-      clib_warning ("attach failed: %s", *errp);
+      clib_warning ("attach failed: %U", format_api_error,
+                   clib_net_to_host_u32 (mp->retval));
       utm->state = STATE_FAILED;
       return;
     }
@@ -220,7 +261,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
 
   utm->our_event_queue =
     (unix_shared_memory_queue_t *) mp->app_event_queue_address;
-
+  utm->state = STATE_ATTACHED;
 }
 
 static void
@@ -231,18 +272,6 @@ vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t *
     clib_warning ("detach returned with err: %d", mp->retval);
 }
 
-static void
-init_error_string_table (uri_tcp_test_main_t * utm)
-{
-  utm->error_string_by_error_number = hash_create (0, sizeof (uword));
-
-#define _(n,v,s) hash_set (utm->error_string_by_error_number, -v, s);
-  foreach_vnet_api_error;
-#undef _
-
-  hash_set (utm->error_string_by_error_number, 99, "Misc");
-}
-
 static void
 stop_signal (int signum)
 {
@@ -392,7 +421,7 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
   /* Read the bytes */
   do
     {
-      n_read = svm_fifo_dequeue_nowait (rx_fifo, 0,
+      n_read = svm_fifo_dequeue_nowait (rx_fifo,
                                        clib_min (vec_len (utm->rx_buf),
                                                  bytes), utm->rx_buf);
       if (n_read > 0)
@@ -432,11 +461,11 @@ client_handle_event_queue (uri_tcp_test_main_t * utm)
                                0 /* nowait */ );
   switch (e->event_type)
     {
-    case FIFO_EVENT_SERVER_RX:
+    case FIFO_EVENT_APP_RX:
       client_handle_fifo_event_rx (utm, e);
       break;
 
-    case FIFO_EVENT_SERVER_EXIT:
+    case FIFO_EVENT_DISCONNECT:
       return;
 
     default:
@@ -458,11 +487,11 @@ client_rx_thread_fn (void *arg)
                                    0 /* nowait */ );
       switch (e->event_type)
        {
-       case FIFO_EVENT_SERVER_RX:
+       case FIFO_EVENT_APP_RX:
          client_handle_fifo_event_rx (utm, e);
          break;
 
-       case FIFO_EVENT_SERVER_EXIT:
+       case FIFO_EVENT_DISCONNECT:
          return 0;
        default:
          clib_warning ("unknown event type %d", e->event_type);
@@ -487,9 +516,8 @@ vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
 
   if (mp->retval)
     {
-      uword *errp = hash_get (utm->error_string_by_error_number,
-                             -clib_net_to_host_u32 (mp->retval));
-      clib_warning ("connection failed with code: %s", *errp);
+      clib_warning ("connection failed with code: %U", format_api_error,
+                   clib_net_to_host_u32 (mp->retval));
       utm->state = STATE_FAILED;
       return;
     }
@@ -551,7 +579,7 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
     {
       actual_write =
        bytes_to_snd > queue_max_chunk ? queue_max_chunk : bytes_to_snd;
-      rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, actual_write,
+      rv = svm_fifo_enqueue_nowait (tx_fifo, actual_write,
                                    test_data + test_buf_offset);
 
       if (rv > 0)
@@ -564,7 +592,7 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
            {
              /* Fabricate TX event, send to vpp */
              evt.fifo = tx_fifo;
-             evt.event_type = FIFO_EVENT_SERVER_TX;
+             evt.event_type = FIFO_EVENT_APP_TX;
              evt.event_id = serial_number++;
 
              unix_shared_memory_queue_add (utm->vpp_event_queue,
@@ -619,7 +647,7 @@ client_send_data (uri_tcp_test_main_t * utm)
 }
 
 void
-client_connect (uri_tcp_test_main_t * utm)
+client_send_connect (uri_tcp_test_main_t * utm)
 {
   vl_api_connect_uri_t *cmp;
   cmp = vl_msg_api_alloc (sizeof (*cmp));
@@ -632,8 +660,20 @@ client_connect (uri_tcp_test_main_t * utm)
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & cmp);
 }
 
+int
+client_connect (uri_tcp_test_main_t * utm)
+{
+  client_send_connect (utm);
+  if (wait_for_state_change (utm, STATE_READY))
+    {
+      clib_warning ("Connect failed");
+      return -1;
+    }
+  return 0;
+}
+
 void
-client_disconnect (uri_tcp_test_main_t * utm)
+client_send_disconnect (uri_tcp_test_main_t * utm)
 {
   session_t *connected_session;
   vl_api_disconnect_session_t *dmp;
@@ -647,16 +687,29 @@ client_disconnect (uri_tcp_test_main_t * utm)
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & dmp);
 }
 
+int
+client_disconnect (uri_tcp_test_main_t * utm)
+{
+  client_send_disconnect (utm);
+  if (wait_for_state_change (utm, STATE_START))
+    {
+      clib_warning ("Disconnect failed");
+      return -1;
+    }
+  return 0;
+}
+
 static void
 client_test (uri_tcp_test_main_t * utm)
 {
   int i;
 
-  application_attach (utm);
-  client_connect (utm);
+  if (application_attach (utm))
+    return;
 
-  if (wait_for_state_change (utm, STATE_READY))
+  if (client_connect (utm))
     {
+      application_detach (utm);
       return;
     }
 
@@ -671,11 +724,6 @@ client_test (uri_tcp_test_main_t * utm)
   /* Disconnect */
   client_disconnect (utm);
 
-  if (wait_for_state_change (utm, STATE_START))
-    {
-      clib_warning ("Disconnect failed");
-      return;
-    }
   application_detach (utm);
 }
 
@@ -686,9 +734,8 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
 
   if (mp->retval)
     {
-      uword *errp = hash_get (utm->error_string_by_error_number,
-                             -clib_net_to_host_u32 (mp->retval));
-      clib_warning ("bind failed: %s", (char *) *errp);
+      clib_warning ("bind failed: %s", format_api_error,
+                   clib_net_to_host_u32 (mp->retval));
       utm->state = STATE_FAILED;
       return;
     }
@@ -869,7 +916,7 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
   /* Read the bytes */
   do
     {
-      n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
+      n_read = svm_fifo_dequeue_nowait (rx_fifo, vec_len (utm->rx_buf),
                                        utm->rx_buf);
       if (n_read > 0)
        bytes -= n_read;
@@ -882,7 +929,7 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
        {
          do
            {
-             rv = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, utm->rx_buf);
+             rv = svm_fifo_enqueue_nowait (tx_fifo, n_read, utm->rx_buf);
            }
          while (rv <= 0 && !utm->time_to_stop);
 
@@ -891,7 +938,7 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
            {
              /* Fabricate TX event, send to vpp */
              evt.fifo = tx_fifo;
-             evt.event_type = FIFO_EVENT_SERVER_TX;
+             evt.event_type = FIFO_EVENT_APP_TX;
              evt.event_id = e->event_id;
 
              q = utm->vpp_event_queue;
@@ -914,11 +961,11 @@ server_handle_event_queue (uri_tcp_test_main_t * utm)
                                    0 /* nowait */ );
       switch (e->event_type)
        {
-       case FIFO_EVENT_SERVER_RX:
+       case FIFO_EVENT_APP_RX:
          server_handle_fifo_event_rx (utm, e);
          break;
 
-       case FIFO_EVENT_SERVER_EXIT:
+       case FIFO_EVENT_DISCONNECT:
          return;
 
        default:
@@ -936,7 +983,7 @@ server_handle_event_queue (uri_tcp_test_main_t * utm)
 }
 
 void
-server_listen (uri_tcp_test_main_t * utm)
+server_send_listen (uri_tcp_test_main_t * utm)
 {
   vl_api_bind_uri_t *bmp;
   bmp = vl_msg_api_alloc (sizeof (*bmp));
@@ -949,8 +996,20 @@ server_listen (uri_tcp_test_main_t * utm)
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
 }
 
+int
+server_listen (uri_tcp_test_main_t * utm)
+{
+  server_send_listen (utm);
+  if (wait_for_state_change (utm, STATE_READY))
+    {
+      clib_warning ("timeout waiting for STATE_READY");
+      return -1;
+    }
+  return 0;
+}
+
 void
-server_unbind (uri_tcp_test_main_t * utm)
+server_send_unbind (uri_tcp_test_main_t * utm)
 {
   vl_api_unbind_uri_t *ump;
 
@@ -963,31 +1022,33 @@ server_unbind (uri_tcp_test_main_t * utm)
   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & ump);
 }
 
+int
+server_unbind (uri_tcp_test_main_t * utm)
+{
+  server_send_unbind (utm);
+  if (wait_for_state_change (utm, STATE_START))
+    {
+      clib_warning ("timeout waiting for STATE_START");
+      return -1;
+    }
+  return 0;
+}
+
 void
 server_test (uri_tcp_test_main_t * utm)
 {
-  application_attach (utm);
+  if (application_attach (utm))
+    return;
 
   /* Bind to uri */
-  server_listen (utm);
-
-  if (wait_for_state_change (utm, STATE_READY))
-    {
-      clib_warning ("timeout waiting for STATE_READY");
-      return;
-    }
+  if (server_listen (utm))
+    return;
 
   /* Enter handle event loop */
   server_handle_event_queue (utm);
 
   /* Cleanup */
-  server_unbind (utm);
-
-  if (wait_for_state_change (utm, STATE_START))
-    {
-      clib_warning ("timeout waiting for STATE_START");
-      return;
-    }
+  server_send_unbind (utm);
 
   application_detach (utm);
 
index 598052b..266215c 100644 (file)
@@ -164,7 +164,7 @@ setup_signal_handlers (void)
 }
 
 void
-application_attach (uri_udp_test_main_t * utm)
+application_send_attach (uri_udp_test_main_t * utm)
 {
   vl_api_application_attach_t *bmp;
   u32 fifo_size = 3 << 20;
@@ -174,8 +174,8 @@ application_attach (uri_udp_test_main_t * utm)
   bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
   bmp->client_index = utm->my_client_index;
   bmp->context = ntohl (0xfeedface);
-  bmp->options[SESSION_OPTIONS_FLAGS] =
-    SESSION_OPTIONS_FLAGS_USE_FIFO | SESSION_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] =
+    APP_OPTIONS_FLAGS_USE_FIFO | APP_OPTIONS_FLAGS_ADD_SEGMENT;
   bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
   bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
   bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
@@ -307,7 +307,7 @@ cut_through_thread_fn (void *arg)
       /* We read from the tx fifo and write to the rx fifo */
       do
        {
-         actual_transfer = svm_fifo_dequeue_nowait (tx_fifo, 0,
+         actual_transfer = svm_fifo_dequeue_nowait (tx_fifo,
                                                     vec_len (my_copy_buffer),
                                                     my_copy_buffer);
        }
@@ -318,7 +318,7 @@ cut_through_thread_fn (void *arg)
       buffer_offset = 0;
       while (actual_transfer > 0)
        {
-         rv = svm_fifo_enqueue_nowait (rx_fifo, 0, actual_transfer,
+         rv = svm_fifo_enqueue_nowait (rx_fifo, actual_transfer,
                                        my_copy_buffer + buffer_offset);
          if (rv > 0)
            {
@@ -357,7 +357,6 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
   u64 bytes_received = 0, bytes_sent = 0;
   i32 bytes_to_read;
   int rv;
-  int mypid = getpid ();
   f64 before, after, delta, bytes_per_second;
   svm_fifo_t *rx_fifo, *tx_fifo;
   int buffer_offset, bytes_to_send = 0;
@@ -382,8 +381,7 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
       buffer_offset = 0;
       while (bytes_to_send > 0)
        {
-         rv = svm_fifo_enqueue_nowait (tx_fifo, mypid,
-                                       bytes_to_send,
+         rv = svm_fifo_enqueue_nowait (tx_fifo, bytes_to_send,
                                        test_data + buffer_offset);
 
          if (rv > 0)
@@ -402,7 +400,7 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
       buffer_offset = 0;
       while (bytes_to_read > 0)
        {
-         rv = svm_fifo_dequeue_nowait (rx_fifo, mypid,
+         rv = svm_fifo_dequeue_nowait (rx_fifo,
                                        bytes_to_read,
                                        utm->rx_buf + buffer_offset);
          if (rv > 0)
@@ -415,8 +413,8 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
     }
   while (bytes_received < bytes_sent)
     {
-      rv = svm_fifo_dequeue_nowait (rx_fifo, mypid,
-                                   vec_len (utm->rx_buf), utm->rx_buf);
+      rv =
+       svm_fifo_dequeue_nowait (rx_fifo, vec_len (utm->rx_buf), utm->rx_buf);
       if (rv > 0)
        {
 #if CLIB_DEBUG > 0
@@ -459,7 +457,7 @@ uri_udp_client_test (uri_udp_test_main_t * utm)
 {
   session_t *session;
 
-  application_attach (utm);
+  application_send_attach (utm);
   udp_client_connect (utm);
 
   if (wait_for_state_change (utm, STATE_READY))
@@ -559,8 +557,8 @@ vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
                                                         128 * 1024);
   ASSERT (session->server_tx_fifo);
 
-  session->server_rx_fifo->server_session_index = session - utm->sessions;
-  session->server_tx_fifo->server_session_index = session - utm->sessions;
+  session->server_rx_fifo->master_session_index = session - utm->sessions;
+  session->server_tx_fifo->master_session_index = session - utm->sessions;
   utm->cut_through_session_index = session - utm->sessions;
 
   rv = pthread_create (&utm->cut_through_thread_handle,
@@ -805,19 +803,19 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
 
   do
     {
-      nbytes = svm_fifo_dequeue_nowait (rx_fifo, 0,
-                                       vec_len (utm->rx_buf), utm->rx_buf);
+      nbytes = svm_fifo_dequeue_nowait (rx_fifo, vec_len (utm->rx_buf),
+                                       utm->rx_buf);
     }
   while (nbytes <= 0);
   do
     {
-      rv = svm_fifo_enqueue_nowait (tx_fifo, 0, nbytes, utm->rx_buf);
+      rv = svm_fifo_enqueue_nowait (tx_fifo, nbytes, utm->rx_buf);
     }
   while (rv == -2);
 
   /* Fabricate TX event, send to vpp */
   evt.fifo = tx_fifo;
-  evt.event_type = FIFO_EVENT_SERVER_TX;
+  evt.event_type = FIFO_EVENT_APP_TX;
   evt.event_id = e->event_id;
 
   if (svm_fifo_set_event (tx_fifo))
@@ -839,11 +837,11 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
                                    0 /* nowait */ );
       switch (e->event_type)
        {
-       case FIFO_EVENT_SERVER_RX:
+       case FIFO_EVENT_APP_RX:
          server_handle_fifo_event_rx (utm, e);
          break;
 
-       case FIFO_EVENT_SERVER_EXIT:
+       case FIFO_EVENT_DISCONNECT:
          return;
 
        default:
@@ -893,7 +891,7 @@ void
 udp_server_test (uri_udp_test_main_t * utm)
 {
 
-  application_attach (utm);
+  application_send_attach (utm);
 
   /* Bind to uri */
   server_listen (utm);
index 5a45537..ccf9837 100644 (file)
@@ -87,14 +87,17 @@ application_new ()
 void
 application_del (application_t * app)
 {
-  api_main_t *am = &api_main;
-  void *oldheap;
   segment_manager_t *sm;
   u64 handle;
   u32 index, *handles = 0;
   int i;
   vnet_unbind_args_t _a, *a = &_a;
 
+  /*
+   * The app event queue allocated in first segment is cleared with
+   * the segment manager. No need to explicitly free it.
+   */
+
   /*
    * Cleanup segment managers
    */
@@ -120,14 +123,6 @@ application_del (application_t * app)
       vnet_unbind (a);
     }
 
-  /*
-   * Free the event fifo in the /vpe-api shared-memory segment
-   */
-  oldheap = svm_push_data_heap (am->vlib_rp);
-  if (app->event_queue)
-    unix_shared_memory_queue_free (app->event_queue);
-  svm_pop_heap (oldheap);
-
   application_table_del (app);
   pool_put (app_pool, app);
 }
@@ -149,30 +144,14 @@ int
 application_init (application_t * app, u32 api_client_index, u64 * options,
                  session_cb_vft_t * cb_fns)
 {
-  api_main_t *am = &api_main;
   segment_manager_t *sm;
   segment_manager_properties_t *props;
-  void *oldheap;
-  u32 app_evt_queue_size;
+  u32 app_evt_queue_size, first_seg_size;
   int rv;
 
   app_evt_queue_size = options[APP_EVT_QUEUE_SIZE] > 0 ?
     options[APP_EVT_QUEUE_SIZE] : default_app_evt_queue_size;
 
-  /* Allocate event fifo in the /vpe-api shared-memory segment */
-  oldheap = svm_push_data_heap (am->vlib_rp);
-
-  /* Allocate server event queue */
-  app->event_queue =
-    unix_shared_memory_queue_init (app_evt_queue_size,
-                                  sizeof (session_fifo_event_t),
-                                  0 /* consumer pid */ ,
-                                  0
-                                  /* (do not) signal when queue non-empty */
-    );
-
-  svm_pop_heap (oldheap);
-
   /* Setup segment manager */
   sm = segment_manager_new ();
   sm->app_index = app->index;
@@ -181,16 +160,21 @@ application_init (application_t * app, u32 api_client_index, u64 * options,
   props->rx_fifo_size = options[SESSION_OPTIONS_RX_FIFO_SIZE];
   props->tx_fifo_size = options[SESSION_OPTIONS_TX_FIFO_SIZE];
   props->add_segment = props->add_segment_size != 0;
+  props->use_private_segment = options[APP_OPTIONS_FLAGS]
+    & APP_OPTIONS_FLAGS_BUILTIN_APP;
 
-  if ((rv = segment_manager_init (sm, props,
-                                 options[SESSION_OPTIONS_SEGMENT_SIZE])))
+  first_seg_size = options[SESSION_OPTIONS_SEGMENT_SIZE];
+  if ((rv = segment_manager_init (sm, props, first_seg_size)))
     return rv;
 
   app->first_segment_manager = segment_manager_index (sm);
   app->api_client_index = api_client_index;
-  app->flags = options[SESSION_OPTIONS_FLAGS];
+  app->flags = options[APP_OPTIONS_FLAGS];
   app->cb_fns = *cb_fns;
 
+  /* Allocate app event queue in the first shared-memory segment */
+  app->event_queue = segment_manager_alloc_queue (sm, app_evt_queue_size);
+
   /* Check that the obvious things are properly set up */
   application_verify_cb_fns (cb_fns);
 
@@ -451,8 +435,8 @@ application_format_connects (application_t * app, int verbose)
            continue;
 
          fifo = fifos[i];
-         session_index = fifo->server_session_index;
-         thread_index = fifo->server_thread_index;
+         session_index = fifo->master_session_index;
+         thread_index = fifo->master_thread_index;
 
          session = stream_session_get (session_index, thread_index);
          str = format (0, "%U", format_stream_session, session, verbose);
index 6bcee9d..35caae8 100644 (file)
@@ -61,18 +61,6 @@ typedef struct _application
   /** Flags */
   u32 flags;
 
-  /* Stream server mode: accept or connect
-   * TODO REMOVE*/
-  u8 mode;
-
-  /** Index of the listen session or connect session
-   * TODO REMOVE*/
-  u32 session_index;
-
-  /** Session thread index for client connect sessions
-   * TODO REMOVE */
-  u32 thread_index;
-
   /*
    * Binary API interface to external app
    */
index 96d2c62..ad44baa 100644 (file)
@@ -142,7 +142,7 @@ vnet_connect_i (u32 app_index, u32 api_context, session_type_t sst,
        * Server is willing to have a direct fifo connection created
        * instead of going through the state machine, etc.
        */
-      if (server->flags & SESSION_OPTIONS_FLAGS_USE_FIFO)
+      if (server->flags & APP_OPTIONS_FLAGS_USE_FIFO)
        return server->cb_fns.
          redirect_connect_callback (server->api_client_index, mp);
     }
@@ -363,7 +363,11 @@ vnet_disconnect_session (vnet_disconnect_args_t * a)
   if (!s || s->app_index != a->app_index)
     return VNET_API_ERROR_INVALID_VALUE;
 
-  stream_session_disconnect (s);
+  /* We're peeking into another's thread pool. Make sure */
+  ASSERT (s->session_index == index);
+
+  session_send_session_evt_to_thread (a->handle, FIFO_EVENT_DISCONNECT,
+                                     thread_index);
   return 0;
 }
 
@@ -395,24 +399,6 @@ vnet_connect (vnet_connect_args_t * a)
   return vnet_connect_i (a->app_index, a->api_context, sst, &a->tep, a->mp);
 }
 
-int
-vnet_disconnect (vnet_disconnect_args_t * a)
-{
-  stream_session_t *session;
-  u32 session_index, thread_index;
-
-  if (api_parse_session_handle (a->handle, &session_index, &thread_index))
-    {
-      clib_warning ("Invalid handle");
-      return -1;
-    }
-
-  session = stream_session_get (session_index, thread_index);
-  stream_session_disconnect (session);
-
-  return 0;
-}
-
 /*
  * fd.io coding-style-patch-verification: ON
  *
index 2c49753..7d924c1 100644 (file)
@@ -30,10 +30,18 @@ typedef enum _session_api_proto
 
 typedef struct _vnet_app_attach_args_t
 {
+  /** Binary API client index */
   u32 api_client_index;
+
+  /** Application and segment manager options */
   u64 *options;
+
+  /** Session to application callback functions */
   session_cb_vft_t *session_cb_vft;
 
+  /** Flag that indicates if app is builtin */
+  u8 builtin;
+
   /*
    * Results
    */
@@ -110,7 +118,7 @@ typedef struct _vnet_disconnect_args_t
 typedef enum
 {
   APP_EVT_QUEUE_SIZE,
-  SESSION_OPTIONS_FLAGS,
+  APP_OPTIONS_FLAGS,
   SESSION_OPTIONS_SEGMENT_SIZE,
   SESSION_OPTIONS_ADD_SEGMENT_SIZE,
   SESSION_OPTIONS_RX_FIFO_SIZE,
@@ -119,11 +127,30 @@ typedef enum
   SESSION_OPTIONS_N_OPTIONS
 } app_attach_options_index_t;
 
-/** Server can handle delegated connect requests from local clients */
-#define SESSION_OPTIONS_FLAGS_USE_FIFO (1<<0)
+#define foreach_app_options_flags                              \
+  _(USE_FIFO, "Use FIFO with redirects")                       \
+  _(ADD_SEGMENT, "Add segment and signal app if needed")       \
+  _(BUILTIN_APP, "Application is builtin")                     \
+
+typedef enum _app_options
+{
+#define _(sym, str) APP_OPTIONS_##sym,
+  foreach_app_options_flags
+#undef _
+} app_options_t;
+
+typedef enum _app_options_flags
+{
+#define _(sym, str) APP_OPTIONS_FLAGS_##sym = 1 << APP_OPTIONS_##sym,
+  foreach_app_options_flags
+#undef _
+} app_options_flags_t;
 
-/** Server wants vpp to add segments when out of memory for fifos */
-#define SESSION_OPTIONS_FLAGS_ADD_SEGMENT   (1<<1)
+///** Server can handle delegated connect requests from local clients */
+//#define APP_OPTIONS_FLAGS_USE_FIFO    (1<<0)
+//
+///** Server wants vpp to add segments when out of memory for fifos */
+//#define APP_OPTIONS_FLAGS_ADD_SEGMENT   (1<<1)
 
 #define VNET_CONNECT_REDIRECTED        123
 
@@ -138,7 +165,6 @@ int vnet_disconnect_session (vnet_disconnect_args_t * a);
 int vnet_bind (vnet_bind_args_t * a);
 int vnet_connect (vnet_connect_args_t * a);
 int vnet_unbind (vnet_unbind_args_t * a);
-int vnet_disconnect (vnet_disconnect_args_t * a);
 
 int
 api_parse_session_handle (u64 handle, u32 * session_index,
index dd211c5..210754f 100644 (file)
@@ -218,8 +218,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
           *      2) buffer chains */
          if (peek_data)
            {
-             n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
-                                           rx_offset, len_to_deq0, data0);
+             n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset,
+                                           len_to_deq0, data0);
              if (n_bytes_read <= 0)
                goto dequeue_fail;
 
@@ -230,8 +230,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
          else
            {
              n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
-                                                     s0->pid, len_to_deq0,
-                                                     data0);
+                                                     len_to_deq0, data0);
              if (n_bytes_read <= 0)
                goto dequeue_fail;
            }
@@ -301,6 +300,26 @@ session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
                                         n_tx_pkts, 0);
 }
 
+stream_session_t *
+session_event_get_session (session_fifo_event_t * e0, u8 thread_index)
+{
+  svm_fifo_t *f0;
+  stream_session_t *s0;
+  u32 session_index0;
+
+  f0 = e0->fifo;
+  session_index0 = f0->master_session_index;
+
+  /* $$$ add multiple event queues, per vpp worker thread */
+  ASSERT (f0->master_thread_index == thread_index);
+
+  s0 = stream_session_get_if_valid (session_index0, thread_index);
+
+  ASSERT (s0->thread_index == thread_index);
+
+  return s0;
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
@@ -370,34 +389,24 @@ skip_dequeue:
   n_events = vec_len (my_fifo_events);
   for (i = 0; i < n_events; i++)
     {
-      svm_fifo_t *f0;          /* $$$ prefetch 1 ahead maybe */
-      stream_session_t *s0;
-      u32 session_index0;
+      stream_session_t *s0;    /* $$$ prefetch 1 ahead maybe */
       session_fifo_event_t *e0;
 
       e0 = &my_fifo_events[i];
-      f0 = e0->fifo;
-      session_index0 = f0->server_session_index;
-
-      /* $$$ add multiple event queues, per vpp worker thread */
-      ASSERT (f0->server_thread_index == my_thread_index);
 
-      s0 = stream_session_get_if_valid (session_index0, my_thread_index);
-
-      if (CLIB_DEBUG && !s0)
+      switch (e0->event_type)
        {
-         clib_warning ("It's dead, Jim!");
-         continue;
-       }
-
-      if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
-       continue;
+       case FIFO_EVENT_APP_TX:
+         s0 = session_event_get_session (e0, my_thread_index);
 
-      ASSERT (s0->thread_index == my_thread_index);
+         if (CLIB_DEBUG && !s0)
+           {
+             clib_warning ("It's dead, Jim!");
+             continue;
+           }
 
-      switch (e0->event_type)
-       {
-       case FIFO_EVENT_SERVER_TX:
+         if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
+           continue;
          /* Spray packets in per session type frames, since they go to
           * different nodes */
          rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
@@ -408,10 +417,12 @@ skip_dequeue:
            goto done;
 
          break;
-       case FIFO_EVENT_SERVER_EXIT:
+       case FIFO_EVENT_DISCONNECT:
+         s0 = stream_session_get_from_handle (e0->session_handle);
          stream_session_disconnect (s0);
          break;
        case FIFO_EVENT_BUILTIN_RX:
+         s0 = session_event_get_session (e0, my_thread_index);
          svm_fifo_unset_event (s0->server_rx_fifo);
          /* Get session's server */
          app = application_get (s0->app_index);
index 16e5bc5..e053232 100644 (file)
@@ -27,6 +27,11 @@ u32 segment_name_counter = 0;
  */
 segment_manager_t *segment_managers = 0;
 
+/**
+ * Process private segment index
+ */
+u32 private_segment_index = ~0;
+
 /**
  * Default fifo and segment size. TODO config.
  */
@@ -100,6 +105,26 @@ session_manager_add_first_segment (segment_manager_t * sm, u32 segment_size)
   return rv;
 }
 
+static void
+segment_manager_alloc_process_private_segment ()
+{
+  svm_fifo_segment_create_args_t _a, *a = &_a;
+
+  if (private_segment_index != ~0)
+    return;
+
+  memset (a, 0, sizeof (*a));
+  a->segment_name = "process-private-segment";
+  a->segment_size = ~0;
+  a->new_segment_index = ~0;
+
+  if (svm_fifo_segment_create_process_private (a))
+    clib_warning ("Failed to create process private segment");
+
+  private_segment_index = a->new_segment_index;
+  ASSERT (private_segment_index != ~0);
+}
+
 /**
  * Initializes segment manager based on options provided.
  * Returns error if svm segment allocation fails.
@@ -114,7 +139,9 @@ segment_manager_init (segment_manager_t * sm,
   /* app allocates these */
   sm->properties = properties;
 
-  if (first_seg_size > 0)
+  first_seg_size = first_seg_size > 0 ? first_seg_size : default_segment_size;
+
+  if (sm->properties->use_private_segment == 0)
     {
       rv = session_manager_add_first_segment (sm, first_seg_size);
       if (rv)
@@ -123,7 +150,15 @@ segment_manager_init (segment_manager_t * sm,
          return rv;
        }
     }
+  else
+    {
+      if (private_segment_index == ~0)
+       segment_manager_alloc_process_private_segment ();
+      ASSERT (private_segment_index != ~0);
+      vec_add1 (sm->segment_indices, private_segment_index);
+    }
 
+  clib_spinlock_init (&sm->lockp);
   return 0;
 }
 
@@ -162,8 +197,8 @@ segment_manager_del (segment_manager_t * sm)
          stream_session_t *session;
 
          fifo = fifos[i];
-         session_index = fifo->server_session_index;
-         thread_index = fifo->server_thread_index;
+         session_index = fifo->master_session_index;
+         thread_index = fifo->master_thread_index;
 
          session = stream_session_get (session_index, thread_index);
 
@@ -183,7 +218,9 @@ segment_manager_del (segment_manager_t * sm)
                                        deleted_thread_indices[i]);
 
          /* Instead of directly removing the session call disconnect */
-         stream_session_disconnect (session);
+         session_send_session_evt_to_thread (stream_session_handle (session),
+                                             FIFO_EVENT_DISCONNECT,
+                                             deleted_thread_indices[i]);
 
          /*
             stream_session_table_del (smm, session);
@@ -200,6 +237,7 @@ segment_manager_del (segment_manager_t * sm)
       /* svm_fifo_segment_delete (fifo_segment); */
     }
 
+  clib_spinlock_free (&sm->lockp);
   vec_free (deleted_sessions);
   vec_free (deleted_thread_indices);
   pool_put (segment_managers, sm);
@@ -232,9 +270,13 @@ segment_manager_alloc_session_fifos (segment_manager_t * sm,
   u8 added_a_segment = 0;
   int i;
 
-  /* Allocate svm fifos */
   ASSERT (vec_len (sm->segment_indices));
 
+  /* Make sure we don't have multiple threads trying to allocate segments
+   * at the same time. */
+  clib_spinlock_lock (&sm->lockp);
+
+  /* Allocate svm fifos */
 again:
   for (i = 0; i < vec_len (sm->segment_indices); i++)
     {
@@ -283,7 +325,9 @@ again:
            }
 
          if (session_manager_add_segment (sm))
-           return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
+           {
+             return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
+           }
 
          added_a_segment = 1;
          goto again;
@@ -295,14 +339,16 @@ again:
        }
     }
 
-  if (added_a_segment)
-    return segment_manager_notify_app_seg_add (sm, *fifo_segment_index);
-
   /* Backpointers to segment manager */
   sm_index = segment_manager_index (sm);
   (*server_tx_fifo)->segment_manager = sm_index;
   (*server_rx_fifo)->segment_manager = sm_index;
 
+  clib_spinlock_unlock (&sm->lockp);
+
+  if (added_a_segment)
+    return segment_manager_notify_app_seg_add (sm, *fifo_segment_index);
+
   return 0;
 }
 
@@ -313,26 +359,72 @@ segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
   segment_manager_t *sm;
   svm_fifo_segment_private_t *fifo_segment;
 
+  sm = segment_manager_get_if_valid (rx_fifo->segment_manager);
+
+  /* It's possible to have no segment manager if the session was removed
+   * as result of a detach */
+  if (!sm)
+    return;
+
   fifo_segment = svm_fifo_get_segment (svm_segment_index);
   svm_fifo_segment_free_fifo (fifo_segment, rx_fifo);
   svm_fifo_segment_free_fifo (fifo_segment, tx_fifo);
 
-  /* If we have segment manager, try doing some cleanup.
-   * It's possible to have no segment manager if the session was removed
-   * as result of a detach */
-  sm = segment_manager_get_if_valid (rx_fifo->segment_manager);
-  if (sm)
+  /* Remove segment only if it holds no fifos and not the first */
+  if (sm->segment_indices[0] != svm_segment_index
+      && !svm_fifo_segment_has_fifos (fifo_segment))
     {
-      /* Remove segment only if it holds no fifos and not the first */
-      if (sm->segment_indices[0] != svm_segment_index
-         && !svm_fifo_segment_has_fifos (fifo_segment))
-       {
-         svm_fifo_segment_delete (fifo_segment);
-         vec_del1 (sm->segment_indices, svm_segment_index);
-       }
+      svm_fifo_segment_delete (fifo_segment);
+      vec_del1 (sm->segment_indices, svm_segment_index);
     }
 }
 
+/**
+ * Allocates shm queue in the first segment
+ */
+unix_shared_memory_queue_t *
+segment_manager_alloc_queue (segment_manager_t * sm, u32 queue_size)
+{
+  ssvm_shared_header_t *sh;
+  svm_fifo_segment_private_t *segment;
+  unix_shared_memory_queue_t *q;
+  void *oldheap;
+
+  ASSERT (sm->segment_indices != 0);
+
+  segment = svm_fifo_get_segment (sm->segment_indices[0]);
+  sh = segment->ssvm.sh;
+
+  oldheap = ssvm_push_heap (sh);
+  q =
+    unix_shared_memory_queue_init (queue_size, sizeof (session_fifo_event_t),
+                                  0 /* consumer pid */ , 0
+                                  /* signal when queue non-empty */ );
+  ssvm_pop_heap (oldheap);
+  return q;
+}
+
+/**
+ * Frees shm queue allocated in the first segment
+ */
+void
+segment_manager_dealloc_queue (segment_manager_t * sm,
+                              unix_shared_memory_queue_t * q)
+{
+  ssvm_shared_header_t *sh;
+  svm_fifo_segment_private_t *segment;
+  void *oldheap;
+
+  ASSERT (sm->segment_indices != 0);
+
+  segment = svm_fifo_get_segment (sm->segment_indices[0]);
+  sh = segment->ssvm.sh;
+
+  oldheap = ssvm_push_heap (sh);
+  unix_shared_memory_queue_free (q);
+  ssvm_pop_heap (oldheap);
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index 778d604..2710bb5 100644 (file)
 #include <vnet/vnet.h>
 #include <svm/svm_fifo_segment.h>
 
+#include <vlibmemory/unix_shared_memory_queue.h>
+#include <vlibmemory/api.h>
+#include <vppinfra/lock.h>
+
 typedef struct _segment_manager_properties
 {
   /** Session fifo sizes.  */
@@ -30,10 +34,14 @@ typedef struct _segment_manager_properties
   /** Flag that indicates if additional segments should be created */
   u8 add_segment;
 
+  /** Use private memory segment instead of shared memory */
+  u8 use_private_segment;
 } segment_manager_properties_t;
 
 typedef struct _segment_manager
 {
+  clib_spinlock_t lockp;
+
   /** segments mapped by this manager */
   u32 *segment_indices;
 
@@ -95,6 +103,10 @@ segment_manager_alloc_session_fifos (segment_manager_t * sm,
 void
 segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
                               svm_fifo_t * tx_fifo);
+unix_shared_memory_queue_t *segment_manager_alloc_queue (segment_manager_t *
+                                                        sm, u32 queue_size);
+void segment_manager_dealloc_queue (segment_manager_t * sm,
+                                   unix_shared_memory_queue_t * q);
 
 #endif /* SRC_VNET_SESSION_SEGMENT_MANAGER_H_ */
 /*
index e6cfe7d..d17c93f 100644 (file)
@@ -377,33 +377,6 @@ stream_session_lookup_transport6 (ip6_address_t * lcl, ip6_address_t * rmt,
   return 0;
 }
 
-/**
- * Allocate vpp event queue (once) per worker thread
- */
-void
-session_vpp_event_queue_allocate (session_manager_main_t * smm,
-                                 u32 thread_index)
-{
-  api_main_t *am = &api_main;
-  void *oldheap;
-
-  if (smm->vpp_event_queues[thread_index] == 0)
-    {
-      /* Allocate event fifo in the /vpe-api shared-memory segment */
-      oldheap = svm_push_data_heap (am->vlib_rp);
-
-      smm->vpp_event_queues[thread_index] =
-       unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
-                                      sizeof (session_fifo_event_t),
-                                      0 /* consumer pid */ ,
-                                      0
-                                      /* (do not) send signal when queue non-empty */
-       );
-
-      svm_pop_heap (oldheap);
-    }
-}
-
 int
 stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
                         stream_session_t ** ret_s)
@@ -428,11 +401,11 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
 
   /* Initialize backpointers */
   pool_index = s - smm->sessions[thread_index];
-  server_rx_fifo->server_session_index = pool_index;
-  server_rx_fifo->server_thread_index = thread_index;
+  server_rx_fifo->master_session_index = pool_index;
+  server_rx_fifo->master_thread_index = thread_index;
 
-  server_tx_fifo->server_session_index = pool_index;
-  server_tx_fifo->server_thread_index = thread_index;
+  server_tx_fifo->master_session_index = pool_index;
+  server_tx_fifo->master_thread_index = thread_index;
 
   s->server_rx_fifo = server_rx_fifo;
   s->server_tx_fifo = server_tx_fifo;
@@ -485,7 +458,7 @@ stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
   if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo)))
     return -1;
 
-  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, s->pid, len, data);
+  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
 
   if (queue_event)
     {
@@ -527,14 +500,14 @@ stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
                           u32 offset, u32 max_bytes)
 {
   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
-  return svm_fifo_peek (s->server_tx_fifo, s->pid, offset, max_bytes, buffer);
+  return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
 }
 
 u32
 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
 {
   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
-  return svm_fifo_dequeue_drop (s->server_tx_fifo, s->pid, max_bytes);
+  return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
 }
 
 /**
@@ -568,7 +541,7 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block)
     {
       /* Fabricate event */
       evt.fifo = s->server_rx_fifo;
-      evt.event_type = FIFO_EVENT_SERVER_RX;
+      evt.event_type = FIFO_EVENT_APP_RX;
       evt.event_id = serial_number++;
 
       /* Add event to server's event queue */
@@ -899,37 +872,45 @@ stream_session_stop_listen (stream_session_t * s)
   return 0;
 }
 
+void
+session_send_session_evt_to_thread (u64 session_handle,
+                                   fifo_event_type_t evt_type,
+                                   u32 thread_index)
+{
+  static u16 serial_number = 0;
+  session_fifo_event_t evt;
+  unix_shared_memory_queue_t *q;
+
+  /* Fabricate event */
+  evt.session_handle = session_handle;
+  evt.event_type = evt_type;
+  evt.event_id = serial_number++;
+
+  q = session_manager_get_vpp_event_queue (thread_index);
+
+  /* Based on request block (or not) for lack of space */
+  if (PREDICT_TRUE (q->cursize < q->maxsize))
+    unix_shared_memory_queue_add (q, (u8 *) & evt,
+                                 0 /* do wait for mutex */ );
+  else
+    {
+      clib_warning ("queue full");
+      return;
+    }
+}
+
 /**
  * Disconnect session and propagate to transport. This should eventually
  * result in a delete notification that allows us to cleanup session state.
  * Called for both active/passive disconnects.
+ *
+ * Should be called from the session's thread.
  */
 void
 stream_session_disconnect (stream_session_t * s)
 {
-//  session_fifo_event_t evt;
-
   s->session_state = SESSION_STATE_CLOSED;
-  /* RPC to vpp evt queue in the right thread */
-
   tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
-
-//  {
-//  /* Fabricate event */
-//  evt.fifo = s->server_rx_fifo;
-//  evt.event_type = FIFO_EVENT_SERVER_RX;
-//  evt.event_id = serial_number++;
-//
-//  /* Based on request block (or not) for lack of space */
-//  if (PREDICT_TRUE(q->cursize < q->maxsize))
-//    unix_shared_memory_queue_add (app->event_queue, (u8 *) &evt,
-//                                0 /* do wait for mutex */);
-//  else
-//    {
-//      clib_warning("fifo full");
-//      return -1;
-//    }
-//  }
 }
 
 /**
@@ -976,6 +957,33 @@ session_get_transport_vft (u8 type)
   return &tp_vfts[type];
 }
 
+/**
+ * Allocate vpp event queue (once) per worker thread
+ */
+void
+session_vpp_event_queue_allocate (session_manager_main_t * smm,
+                                 u32 thread_index)
+{
+  api_main_t *am = &api_main;
+  void *oldheap;
+
+  if (smm->vpp_event_queues[thread_index] == 0)
+    {
+      /* Allocate event fifo in the /vpe-api shared-memory segment */
+      oldheap = svm_push_data_heap (am->vlib_rp);
+
+      smm->vpp_event_queues[thread_index] =
+       unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
+                                      sizeof (session_fifo_event_t),
+                                      0 /* consumer pid */ ,
+                                      0
+                                      /* (do not) send signal when queue non-empty */
+       );
+
+      svm_pop_heap (oldheap);
+    }
+}
+
 static clib_error_t *
 session_manager_main_enable (vlib_main_t * vm)
 {
@@ -1043,6 +1051,18 @@ session_manager_main_enable (vlib_main_t * vm)
   return 0;
 }
 
+void
+session_node_enable_disable (u8 is_en)
+{
+  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
+  /* *INDENT-OFF* */
+  foreach_vlib_main (({
+    vlib_node_set_state (this_vlib_main, session_queue_node.index,
+                         state);
+  }));
+  /* *INDENT-ON* */
+}
+
 clib_error_t *
 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
 {
@@ -1051,16 +1071,14 @@ vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
       if (session_manager_main.is_enabled)
        return 0;
 
-      vlib_node_set_state (vm, session_queue_node.index,
-                          VLIB_NODE_STATE_POLLING);
+      session_node_enable_disable (is_en);
 
       return session_manager_main_enable (vm);
     }
   else
     {
       session_manager_main.is_enabled = 0;
-      vlib_node_set_state (vm, session_queue_node.index,
-                          VLIB_NODE_STATE_DISABLED);
+      session_node_enable_disable (is_en);
     }
 
   return 0;
index 6e4ea96..8cd72f3 100644 (file)
@@ -17,9 +17,6 @@
 
 #include <vnet/session/transport.h>
 #include <vlibmemory/unix_shared_memory_queue.h>
-#include <vlibmemory/api.h>
-#include <vppinfra/sparse_vec.h>
-#include <svm/svm_fifo_segment.h>
 #include <vnet/session/session_debug.h>
 #include <vnet/session/segment_manager.h>
 
 
 typedef enum
 {
-  FIFO_EVENT_SERVER_RX,
-  FIFO_EVENT_SERVER_TX,
+  FIFO_EVENT_APP_RX,
+  FIFO_EVENT_APP_TX,
   FIFO_EVENT_TIMEOUT,
-  FIFO_EVENT_SERVER_EXIT,
+  FIFO_EVENT_DISCONNECT,
   FIFO_EVENT_BUILTIN_RX
 } fifo_event_type_t;
 
@@ -96,7 +93,11 @@ typedef enum
 
 /* *INDENT-OFF* */
 typedef CLIB_PACKED (struct {
-  svm_fifo_t * fifo;
+  union
+    {
+      svm_fifo_t * fifo;
+      u64 session_handle;
+    };
   u8 event_type;
   u16 event_id;
 }) session_fifo_event_t;
@@ -370,7 +371,9 @@ int stream_session_listen (stream_session_t * s, transport_endpoint_t * tep);
 int stream_session_stop_listen (stream_session_t * s);
 void stream_session_disconnect (stream_session_t * s);
 void stream_session_cleanup (stream_session_t * s);
-
+void session_send_session_evt_to_thread (u64 session_handle,
+                                        fifo_event_type_t evt_type,
+                                        u32 thread_index);
 u8 *format_stream_session (u8 * s, va_list * args);
 
 void session_register_transport (u8 type, const transport_proto_vft_t * vft);
index 8116b67..79d67a2 100755 (executable)
@@ -96,7 +96,7 @@ send_session_accept_callback (stream_session_t * s)
   memset (mp, 0, sizeof (*mp));
 
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
-
+  mp->context = server->index;
   listener = listen_session_get (s->session_type, s->listener_index);
   tp_vft = session_get_transport_vft (s->session_type);
   tc = tp_vft->get_connection (s->connection_index, s->thread_index);
@@ -270,23 +270,6 @@ static session_cb_vft_t uri_session_cb_vft = {
   .redirect_connect_callback = redirect_connect_callback
 };
 
-static int
-api_session_not_valid (u32 session_index, u32 thread_index)
-{
-  session_manager_main_t *smm = vnet_get_session_manager_main ();
-  stream_session_t *pool;
-
-  if (thread_index >= vec_len (smm->sessions))
-    return VNET_API_ERROR_INVALID_VALUE;
-
-  pool = smm->sessions[thread_index];
-
-  if (pool_is_free_index (pool, session_index))
-    return VNET_API_ERROR_INVALID_VALUE_2;
-
-  return 0;
-}
-
 static void
 vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
 {
@@ -324,9 +307,9 @@ vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
   rv = vnet_application_attach (a);
 
 done:
+
   /* *INDENT-OFF* */
   REPLY_MACRO2 (VL_API_APPLICATION_ATTACH_REPLY, ({
-    rmp->retval = rv;
     if (!rv)
       {
        rmp->segment_name_length = 0;
@@ -558,24 +541,33 @@ static void
 vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
 {
   stream_session_t *s;
-  int rv;
   u32 session_index, thread_index;
-  session_index = stream_session_index_from_handle (mp->handle);
-  thread_index = stream_session_thread_from_handle (mp->handle);
-  if (api_session_not_valid (session_index, thread_index))
-    return;
-
-  s = stream_session_get (session_index, thread_index);
-  rv = mp->retval;
+  vnet_disconnect_args_t _a, *a = &_a;
 
-  if (rv)
+  /* Server isn't interested, kill the session */
+  if (mp->retval)
     {
-      /* Server isn't interested, kill the session */
-      stream_session_disconnect (s);
-      return;
+      a->app_index = mp->context;
+      a->handle = mp->handle;
+      vnet_disconnect_session (a);
+    }
+  else
+    {
+      stream_session_parse_handle (mp->handle, &session_index, &thread_index);
+      s = stream_session_get_if_valid (session_index, thread_index);
+      if (!s)
+       {
+         clib_warning ("session doesn't exist");
+         return;
+       }
+      if (s->app_index != mp->context)
+       {
+         clib_warning ("app doesn't own session");
+         return;
+       }
+      /* XXX volatile? */
+      s->session_state = SESSION_STATE_READY;
     }
-
-  s->session_state = SESSION_STATE_READY;
 }
 
 static void
index f8fbf28..276beb2 100644 (file)
@@ -62,8 +62,7 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
   bytes_this_chunk = bytes_this_chunk < s->bytes_to_send
     ? bytes_this_chunk : s->bytes_to_send;
 
-  rv = svm_fifo_enqueue_nowait (s->server_tx_fifo, 0 /*pid */ ,
-                               bytes_this_chunk,
+  rv = svm_fifo_enqueue_nowait (s->server_tx_fifo, bytes_this_chunk,
                                test_data + test_buf_offset);
 
   /* If we managed to enqueue data... */
@@ -95,7 +94,7 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
        {
          /* Fabricate TX event, send to vpp */
          evt.fifo = s->server_tx_fifo;
-         evt.event_type = FIFO_EVENT_SERVER_TX;
+         evt.event_type = FIFO_EVENT_APP_TX;
          evt.event_id = serial_number++;
 
          unix_shared_memory_queue_add (tm->vpp_event_queue, (u8 *) & evt,
@@ -113,7 +112,7 @@ receive_test_chunk (tclient_main_t * tm, session_t * s)
   /* Allow enqueuing of new event */
   // svm_fifo_unset_event (rx_fifo);
 
-  n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (tm->rx_buf),
+  n_read = svm_fifo_dequeue_nowait (rx_fifo, vec_len (tm->rx_buf),
                                    tm->rx_buf);
   if (n_read > 0)
     {
@@ -457,6 +456,8 @@ attach_builtin_test_clients ()
 
   options[SESSION_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
   options[SESSION_OPTIONS_SEGMENT_SIZE] = (2 << 30);   /*$$$$ config / arg */
+  options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP;
+
   a->options = options;
 
   return vnet_application_attach (a);
index 8308e3d..3468269 100644 (file)
@@ -180,7 +180,7 @@ builtin_server_rx_callback (stream_session_t * s)
   vec_validate (bsm->rx_buf, max_transfer - 1);
   _vec_len (bsm->rx_buf) = max_transfer;
 
-  actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, 0, max_transfer,
+  actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, max_transfer,
                                             bsm->rx_buf);
   ASSERT (actual_transfer == max_transfer);
 
@@ -190,8 +190,7 @@ builtin_server_rx_callback (stream_session_t * s)
    * Echo back
    */
 
-  n_written =
-    svm_fifo_enqueue_nowait (tx_fifo, 0, actual_transfer, bsm->rx_buf);
+  n_written = svm_fifo_enqueue_nowait (tx_fifo, actual_transfer, bsm->rx_buf);
 
   if (n_written != max_transfer)
     clib_warning ("short trout!");
@@ -200,7 +199,7 @@ builtin_server_rx_callback (stream_session_t * s)
     {
       /* Fabricate TX event, send to vpp */
       evt.fifo = tx_fifo;
-      evt.event_type = FIFO_EVENT_SERVER_TX;
+      evt.event_type = FIFO_EVENT_APP_TX;
       evt.event_id = serial_number++;
 
       unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
@@ -288,6 +287,7 @@ server_attach ()
   a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 128 << 20;
   a->options[SESSION_OPTIONS_RX_FIFO_SIZE] = 1 << 16;
   a->options[SESSION_OPTIONS_TX_FIFO_SIZE] = 1 << 16;
+  a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP;
   a->segment_name = segment_name;
   a->segment_name_length = ARRAY_LEN (segment_name);
 
index 1298258..245a35a 100644 (file)
@@ -487,7 +487,8 @@ u8 *
 format_tcp_connection (u8 * s, va_list * args)
 {
   tcp_connection_t *tc = va_arg (*args, tcp_connection_t *);
-
+  if (!tc)
+    return s;
   if (tc->c_is_ip4)
     {
       s = format (s, "[#%d][%s] %U:%d->%U:%d", tc->c_thread_index, "T",
@@ -747,12 +748,14 @@ void
 tcp_initialize_timer_wheels (tcp_main_t * tm)
 {
   tw_timer_wheel_16t_2w_512sl_t *tw;
-  vec_foreach (tw, tm->timer_wheels)
-  {
+  /* *INDENT-OFF* */
+  foreach_vlib_main (({
+    tw = &tm->timer_wheels[ii];
     tw_timer_wheel_init_16t_2w_512sl (tw, tcp_expired_timers_dispatch,
                                      100e-3 /* timer period 100ms */ , ~0);
-    tw->last_run_time = vlib_time_now (tm->vlib_main);
-  }
+    tw->last_run_time = vlib_time_now (this_vlib_main);
+  }));
+  /* *INDENT-ON* */
 }
 
 clib_error_t *
index 97679aa..3bd5387 100644 (file)
@@ -1011,8 +1011,8 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
 
   clib_warning ("ooo: offset %d len %d", offset, data_len);
 
-  rv = svm_fifo_enqueue_with_offset (s0->server_rx_fifo, s0->pid, offset,
-                                    data_len, vlib_buffer_get_current (b));
+  rv = svm_fifo_enqueue_with_offset (s0->server_rx_fifo, offset, data_len,
+                                    vlib_buffer_get_current (b));
 
   /* Nothing written */
   if (rv)
@@ -2392,8 +2392,8 @@ tcp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
            {
              t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
              clib_memcpy (&t0->tcp_header, tcp0, sizeof (t0->tcp_header));
-             clib_memcpy (&t0->tcp_connection, tc0,
-                          sizeof (t0->tcp_connection));
+             if (tc0)
+               clib_memcpy (&t0->tcp_connection, tc0, sizeof (*tc0));
            }
 
          vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
index a7be8bd..4e1a7aa 100644 (file)
@@ -1558,7 +1558,6 @@ tcp46_send_reset_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
          vlib_buffer_t *b0;
          tcp_tx_trace_t *t0;
          tcp_header_t *th0;
-         tcp_connection_t *tc0;
          u32 error0 = TCP_ERROR_RST_SENT, next0 = TCP_RESET_NEXT_IP_LOOKUP;
 
          bi0 = from[0];
@@ -1592,13 +1591,8 @@ tcp46_send_reset_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
                th0 = ip4_next_header ((ip4_header_t *) th0);
              else
                th0 = ip6_next_header ((ip6_header_t *) th0);
-             tc0 =
-               tcp_connection_get (vnet_buffer (b0)->tcp.connection_index,
-                                   my_thread_index);
              t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
              clib_memcpy (&t0->tcp_header, th0, sizeof (t0->tcp_header));
-             clib_memcpy (&t0->tcp_connection, tc0,
-                          sizeof (t0->tcp_connection));
            }
 
          vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
index 890e50b..0146154 100644 (file)
@@ -351,8 +351,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Enqueue an initial (un-dequeued) chunk
    */
-  rv = svm_fifo_enqueue_nowait (f, 0 /* pid */ ,
-                               sizeof (u32), (u8 *) test_data);
+  rv = svm_fifo_enqueue_nowait (f, sizeof (u32), (u8 *) test_data);
   TCP_TEST ((rv == sizeof (u32)), "enqueued %d", rv);
   TCP_TEST ((f->tail == 4), "fifo tail %u", f->tail);
 
@@ -364,7 +363,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
     {
       offset = (2 * i + 1) * sizeof (u32);
       data = (u8 *) (test_data + (2 * i + 1));
-      rv = svm_fifo_enqueue_with_offset (f, 0, offset, sizeof (u32), data);
+      rv = svm_fifo_enqueue_with_offset (f, offset, sizeof (u32), data);
       if (verbose)
        vlib_cli_output (vm, "add [%d] [%d, %d]", 2 * i + 1, offset,
                         offset + sizeof (u32));
@@ -393,7 +392,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
     {
       offset = (2 * i + 0) * sizeof (u32);
       data = (u8 *) (test_data + (2 * i + 0));
-      rv = svm_fifo_enqueue_with_offset (f, 0, offset, sizeof (u32), data);
+      rv = svm_fifo_enqueue_with_offset (f, offset, sizeof (u32), data);
       if (verbose)
        vlib_cli_output (vm, "add [%d] [%d, %d]", 2 * i, offset,
                         offset + sizeof (u32));
@@ -418,8 +417,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Enqueue the missing u32
    */
-  rv = svm_fifo_enqueue_nowait (f, 0 /* pid */ , sizeof (u32),
-                               (u8 *) (test_data + 2));
+  rv = svm_fifo_enqueue_nowait (f, sizeof (u32), (u8 *) (test_data + 2));
   if (verbose)
     vlib_cli_output (vm, "fifo after missing link: %U", format_svm_fifo, f,
                     1);
@@ -432,8 +430,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
    */
   for (i = 0; i < 7; i++)
     {
-      rv = svm_fifo_dequeue_nowait (f, 0 /* pid */ , sizeof (u32),
-                                   (u8 *) & data_word);
+      rv = svm_fifo_dequeue_nowait (f, sizeof (u32), (u8 *) & data_word);
       if (rv != sizeof (u32))
        {
          clib_warning ("bytes dequeues %u", rv);
@@ -457,7 +454,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
     {
       offset = (2 * i + 1) * sizeof (u32);
       data = (u8 *) (test_data + (2 * i + 1));
-      rv = svm_fifo_enqueue_with_offset (f, 0, offset, sizeof (u32), data);
+      rv = svm_fifo_enqueue_with_offset (f, offset, sizeof (u32), data);
       if (verbose)
        vlib_cli_output (vm, "add [%d] [%d, %d]", 2 * i + 1, offset,
                         offset + sizeof (u32));
@@ -468,13 +465,13 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
        }
     }
 
-  rv = svm_fifo_enqueue_with_offset (f, 0, 8, 21, data);
+  rv = svm_fifo_enqueue_with_offset (f, 8, 21, data);
   TCP_TEST ((rv == 0), "ooo enqueued %u", rv);
   TCP_TEST ((svm_fifo_number_ooo_segments (f) == 1),
            "number of ooo segments %u", svm_fifo_number_ooo_segments (f));
 
   vec_validate (data_buf, vec_len (data));
-  svm_fifo_peek (f, 0, 0, vec_len (data), data_buf);
+  svm_fifo_peek (f, 0, vec_len (data), data_buf);
   if (compare_data (data_buf, data, 8, vec_len (data), &j))
     {
       TCP_TEST (0, "[%d] peeked %u expected %u", j, data_buf[j], data[j]);
@@ -491,7 +488,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
     {
       offset = (2 * i + 1) * sizeof (u32);
       data = (u8 *) (test_data + (2 * i + 1));
-      rv = svm_fifo_enqueue_with_offset (f, 0, offset, sizeof (u32), data);
+      rv = svm_fifo_enqueue_with_offset (f, offset, sizeof (u32), data);
       if (verbose)
        vlib_cli_output (vm, "add [%d] [%d, %d]", 2 * i + 1, offset,
                         offset + sizeof (u32));
@@ -502,13 +499,13 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
        }
     }
 
-  rv = svm_fifo_enqueue_nowait (f, 0, 29, data);
+  rv = svm_fifo_enqueue_nowait (f, 29, data);
   TCP_TEST ((rv == 32), "ooo enqueued %u", rv);
   TCP_TEST ((svm_fifo_number_ooo_segments (f) == 0),
            "number of ooo segments %u", svm_fifo_number_ooo_segments (f));
 
   vec_validate (data_buf, vec_len (data));
-  svm_fifo_peek (f, 0, 0, vec_len (data), data_buf);
+  svm_fifo_peek (f, 0, vec_len (data), data_buf);
   if (compare_data (data_buf, data, 0, vec_len (data), &j))
     {
       TCP_TEST (0, "[%d] peeked %u expected %u", j, data_buf[j], data[j]);
@@ -551,7 +548,7 @@ tcp_test_fifo2 (vlib_main_t * vm)
     {
       tp = vp + i;
       data64 = tp->offset;
-      rv = svm_fifo_enqueue_with_offset (f, 0, tp->offset, tp->len,
+      rv = svm_fifo_enqueue_with_offset (f, tp->offset, tp->len,
                                         (u8 *) & data64);
     }
 
@@ -565,7 +562,7 @@ tcp_test_fifo2 (vlib_main_t * vm)
            "first ooo seg length %u", ooo_seg->length);
 
   data64 = 0;
-  rv = svm_fifo_enqueue_nowait (f, 0, sizeof (u32), (u8 *) & data64);
+  rv = svm_fifo_enqueue_nowait (f, sizeof (u32), (u8 *) & data64);
   TCP_TEST ((rv == 3000), "bytes to be enqueued %u", rv);
 
   svm_fifo_free (f);
@@ -581,7 +578,7 @@ tcp_test_fifo2 (vlib_main_t * vm)
     {
       tp = &test_data[i];
       data64 = tp->offset;
-      rv = svm_fifo_enqueue_with_offset (f, 0, tp->offset, tp->len,
+      rv = svm_fifo_enqueue_with_offset (f, tp->offset, tp->len,
                                         (u8 *) & data64);
       if (rv)
        {
@@ -599,7 +596,7 @@ tcp_test_fifo2 (vlib_main_t * vm)
            "first ooo seg length %u", ooo_seg->length);
 
   data64 = 0;
-  rv = svm_fifo_enqueue_nowait (f, 0, sizeof (u32), (u8 *) & data64);
+  rv = svm_fifo_enqueue_nowait (f, sizeof (u32), (u8 *) & data64);
 
   TCP_TEST ((rv == 3000), "bytes to be enqueued %u", rv);
 
@@ -755,7 +752,7 @@ tcp_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
   for (i = 0; i < vec_len (generate); i++)
     {
       tp = generate + i;
-      rv = svm_fifo_enqueue_with_offset (f, 0, fifo_initial_offset
+      rv = svm_fifo_enqueue_with_offset (f, fifo_initial_offset
                                         + tp->offset, tp->len,
                                         (u8 *) data_pattern + tp->offset);
     }
@@ -776,7 +773,7 @@ tcp_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
       u32 bytes_to_enq = 1;
       if (in_seq_all)
        bytes_to_enq = total_size;
-      rv = svm_fifo_enqueue_nowait (f, 0, bytes_to_enq, data_pattern + 0);
+      rv = svm_fifo_enqueue_nowait (f, bytes_to_enq, data_pattern + 0);
 
       if (verbose)
        vlib_cli_output (vm, "in-order enqueue returned %d", rv);
@@ -793,7 +790,7 @@ tcp_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
    * Test if peeked data is the same as original data
    */
   vec_validate (data_buf, vec_len (data_pattern));
-  svm_fifo_peek (f, 0, 0, vec_len (data_pattern), data_buf);
+  svm_fifo_peek (f, 0, vec_len (data_pattern), data_buf);
   if (compare_data (data_buf, data_pattern, 0, vec_len (data_pattern), &j))
     {
       TCP_TEST (0, "[%d] peeked %u expected %u", j, data_buf[j],
@@ -806,11 +803,11 @@ tcp_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
    */
   if (drop)
     {
-      svm_fifo_dequeue_drop (f, 0, vec_len (data_pattern));
+      svm_fifo_dequeue_drop (f, vec_len (data_pattern));
     }
   else
     {
-      svm_fifo_dequeue_nowait (f, 0, vec_len (data_pattern), data_buf);
+      svm_fifo_dequeue_nowait (f, vec_len (data_pattern), data_buf);
       if (compare_data
          (data_buf, data_pattern, 0, vec_len (data_pattern), &j))
        {
index 8565f04..18684d5 100644 (file)
@@ -59,10 +59,10 @@ builtin_server_rx_callback (stream_session_t * s)
   vec_validate (my_copy_buffer, this_transfer - 1);
   _vec_len (my_copy_buffer) = this_transfer;
 
-  actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, 0, this_transfer,
+  actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, this_transfer,
                                             my_copy_buffer);
   ASSERT (actual_transfer == this_transfer);
-  actual_transfer = svm_fifo_enqueue_nowait (tx_fifo, 0, this_transfer,
+  actual_transfer = svm_fifo_enqueue_nowait (tx_fifo, this_transfer,
                                             my_copy_buffer);
   ASSERT (actual_transfer == this_transfer);
 
@@ -72,7 +72,7 @@ builtin_server_rx_callback (stream_session_t * s)
     {
       /* Fabricate TX event, send to ourselves */
       evt.fifo = tx_fifo;
-      evt.event_type = FIFO_EVENT_SERVER_TX;
+      evt.event_type = FIFO_EVENT_APP_TX;
       evt.event_id = 0;
       q = session_manager_get_vpp_event_queue (s->thread_index);
       unix_shared_memory_queue_add (q, (u8 *) & evt,
@@ -110,6 +110,8 @@ attach_builtin_uri_server ()
 
   options[SESSION_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
   options[SESSION_OPTIONS_SEGMENT_SIZE] = (2 << 30);   /*$$$$ config / arg */
+  options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP;
+
   a->options = options;
 
   return vnet_application_attach (a);
index 810278e..e6b4f8f 100644 (file)
@@ -145,8 +145,7 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
                  goto trace0;
                }
 
-             svm_fifo_enqueue_nowait (f0, 0 /* pid */ ,
-                                      udp_len0 - sizeof (*udp0),
+             svm_fifo_enqueue_nowait (f0, udp_len0 - sizeof (*udp0),
                                       (u8 *) (udp0 + 1));
 
              b0->error = node->errors[SESSION_ERROR_ENQUEUED];
@@ -255,7 +254,7 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
        {
          /* Fabricate event */
          evt.fifo = s0->server_rx_fifo;
-         evt.event_type = FIFO_EVENT_SERVER_RX;
+         evt.event_type = FIFO_EVENT_APP_RX;
          evt.event_id = serial_number++;
 
          /* Add event to server's event queue */