Session layer improvements
[vpp.git] / src / vnet / session / session.c
index e6cfe7d..d17c93f 100644 (file)
@@ -377,33 +377,6 @@ stream_session_lookup_transport6 (ip6_address_t * lcl, ip6_address_t * rmt,
   return 0;
 }
 
-/**
- * Allocate vpp event queue (once) per worker thread
- */
-void
-session_vpp_event_queue_allocate (session_manager_main_t * smm,
-                                 u32 thread_index)
-{
-  api_main_t *am = &api_main;
-  void *oldheap;
-
-  if (smm->vpp_event_queues[thread_index] == 0)
-    {
-      /* Allocate event fifo in the /vpe-api shared-memory segment */
-      oldheap = svm_push_data_heap (am->vlib_rp);
-
-      smm->vpp_event_queues[thread_index] =
-       unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
-                                      sizeof (session_fifo_event_t),
-                                      0 /* consumer pid */ ,
-                                      0
-                                      /* (do not) send signal when queue non-empty */
-       );
-
-      svm_pop_heap (oldheap);
-    }
-}
-
 int
 stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
                         stream_session_t ** ret_s)
@@ -428,11 +401,11 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
 
   /* Initialize backpointers */
   pool_index = s - smm->sessions[thread_index];
-  server_rx_fifo->server_session_index = pool_index;
-  server_rx_fifo->server_thread_index = thread_index;
+  server_rx_fifo->master_session_index = pool_index;
+  server_rx_fifo->master_thread_index = thread_index;
 
-  server_tx_fifo->server_session_index = pool_index;
-  server_tx_fifo->server_thread_index = thread_index;
+  server_tx_fifo->master_session_index = pool_index;
+  server_tx_fifo->master_thread_index = thread_index;
 
   s->server_rx_fifo = server_rx_fifo;
   s->server_tx_fifo = server_tx_fifo;
@@ -485,7 +458,7 @@ stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
   if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo)))
     return -1;
 
-  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, s->pid, len, data);
+  enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
 
   if (queue_event)
     {
@@ -527,14 +500,14 @@ stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
                           u32 offset, u32 max_bytes)
 {
   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
-  return svm_fifo_peek (s->server_tx_fifo, s->pid, offset, max_bytes, buffer);
+  return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
 }
 
 u32
 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
 {
   stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
-  return svm_fifo_dequeue_drop (s->server_tx_fifo, s->pid, max_bytes);
+  return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
 }
 
 /**
@@ -568,7 +541,7 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block)
     {
       /* Fabricate event */
       evt.fifo = s->server_rx_fifo;
-      evt.event_type = FIFO_EVENT_SERVER_RX;
+      evt.event_type = FIFO_EVENT_APP_RX;
       evt.event_id = serial_number++;
 
       /* Add event to server's event queue */
@@ -899,37 +872,45 @@ stream_session_stop_listen (stream_session_t * s)
   return 0;
 }
 
+void
+session_send_session_evt_to_thread (u64 session_handle,
+                                   fifo_event_type_t evt_type,
+                                   u32 thread_index)
+{
+  static u16 serial_number = 0;
+  session_fifo_event_t evt;
+  unix_shared_memory_queue_t *q;
+
+  /* Fabricate event */
+  evt.session_handle = session_handle;
+  evt.event_type = evt_type;
+  evt.event_id = serial_number++;
+
+  q = session_manager_get_vpp_event_queue (thread_index);
+
+  /* Based on request block (or not) for lack of space */
+  if (PREDICT_TRUE (q->cursize < q->maxsize))
+    unix_shared_memory_queue_add (q, (u8 *) & evt,
+                                 0 /* do wait for mutex */ );
+  else
+    {
+      clib_warning ("queue full");
+      return;
+    }
+}
+
 /**
  * Disconnect session and propagate to transport. This should eventually
  * result in a delete notification that allows us to cleanup session state.
  * Called for both active/passive disconnects.
+ *
+ * Should be called from the session's thread.
  */
 void
 stream_session_disconnect (stream_session_t * s)
 {
-//  session_fifo_event_t evt;
-
   s->session_state = SESSION_STATE_CLOSED;
-  /* RPC to vpp evt queue in the right thread */
-
   tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
-
-//  {
-//  /* Fabricate event */
-//  evt.fifo = s->server_rx_fifo;
-//  evt.event_type = FIFO_EVENT_SERVER_RX;
-//  evt.event_id = serial_number++;
-//
-//  /* Based on request block (or not) for lack of space */
-//  if (PREDICT_TRUE(q->cursize < q->maxsize))
-//    unix_shared_memory_queue_add (app->event_queue, (u8 *) &evt,
-//                                0 /* do wait for mutex */);
-//  else
-//    {
-//      clib_warning("fifo full");
-//      return -1;
-//    }
-//  }
 }
 
 /**
@@ -976,6 +957,33 @@ session_get_transport_vft (u8 type)
   return &tp_vfts[type];
 }
 
+/**
+ * Allocate vpp event queue (once) per worker thread
+ */
+void
+session_vpp_event_queue_allocate (session_manager_main_t * smm,
+                                 u32 thread_index)
+{
+  api_main_t *am = &api_main;
+  void *oldheap;
+
+  if (smm->vpp_event_queues[thread_index] == 0)
+    {
+      /* Allocate event fifo in the /vpe-api shared-memory segment */
+      oldheap = svm_push_data_heap (am->vlib_rp);
+
+      smm->vpp_event_queues[thread_index] =
+       unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
+                                      sizeof (session_fifo_event_t),
+                                      0 /* consumer pid */ ,
+                                      0
+                                      /* (do not) send signal when queue non-empty */
+       );
+
+      svm_pop_heap (oldheap);
+    }
+}
+
 static clib_error_t *
 session_manager_main_enable (vlib_main_t * vm)
 {
@@ -1043,6 +1051,18 @@ session_manager_main_enable (vlib_main_t * vm)
   return 0;
 }
 
+void
+session_node_enable_disable (u8 is_en)
+{
+  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
+  /* *INDENT-OFF* */
+  foreach_vlib_main (({
+    vlib_node_set_state (this_vlib_main, session_queue_node.index,
+                         state);
+  }));
+  /* *INDENT-ON* */
+}
+
 clib_error_t *
 vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
 {
@@ -1051,16 +1071,14 @@ vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
       if (session_manager_main.is_enabled)
        return 0;
 
-      vlib_node_set_state (vm, session_queue_node.index,
-                          VLIB_NODE_STATE_POLLING);
+      session_node_enable_disable (is_en);
 
       return session_manager_main_enable (vm);
     }
   else
     {
       session_manager_main.is_enabled = 0;
-      vlib_node_set_state (vm, session_queue_node.index,
-                          VLIB_NODE_STATE_DISABLED);
+      session_node_enable_disable (is_en);
     }
 
   return 0;