session: send tx notification to app
[vpp.git] / src / vnet / session / session.c
index ee02526..e486c2b 100644 (file)
@@ -142,7 +142,8 @@ session_alloc_for_connection (transport_connection_t * tc)
   stream_session_t *s;
   u32 thread_index = tc->thread_index;
 
-  ASSERT (thread_index == vlib_get_thread_index ());
+  ASSERT (thread_index == vlib_get_thread_index ()
+         || transport_protocol_is_cl (tc->proto));
 
   s = session_alloc (thread_index);
   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
@@ -413,7 +414,7 @@ stream_session_no_space (transport_connection_t * tc, u32 thread_index,
 }
 
 u32
-stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
+session_tx_fifo_max_dequeue (transport_connection_t * tc)
 {
   stream_session_t *s = session_get (tc->s_index, tc->thread_index);
   if (!s->server_tx_fifo)
@@ -451,12 +452,10 @@ session_enqueue_notify (stream_session_t * s, u8 block)
   session_fifo_event_t evt;
   svm_queue_t *q;
 
-  if (PREDICT_FALSE (s->session_state >= SESSION_STATE_CLOSING))
+  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
     {
       /* Session is closed so app will never clean up. Flush rx fifo */
-      u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
-      if (to_dequeue)
-       svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
+      svm_fifo_dequeue_drop_all (s->server_rx_fifo);
       return 0;
     }
 
@@ -504,6 +503,32 @@ session_enqueue_notify (stream_session_t * s, u8 block)
   return 0;
 }
 
+int
+session_dequeue_notify (stream_session_t * s)
+{
+  application_t *app;
+  svm_queue_t *q;
+
+  app = application_get (s->app_index);
+  if (application_is_builtin (app))
+    return 0;
+
+  q = app->event_queue;
+  if (PREDICT_TRUE (q->cursize < q->maxsize))
+    {
+      session_fifo_event_t evt = {
+       .event_type = FIFO_EVENT_APP_TX,
+       .fifo = s->server_tx_fifo
+      };
+      svm_queue_add (app->event_queue, (u8 *) & evt, SVM_Q_WAIT);
+    }
+  else
+    {
+      return -1;
+    }
+  return 0;
+}
+
 /**
  * Flushes queue of sessions that are to be notified of new data
  * enqueued events.
@@ -726,10 +751,13 @@ stream_session_disconnect_notify (transport_connection_t * tc)
   s = session_get (tc->s_index, tc->thread_index);
   server = application_get (s->app_index);
   server->cb_fns.session_disconnect_callback (s);
+  s->session_state = SESSION_STATE_CLOSING;
 }
 
 /**
  * Cleans up session and lookup table.
+ *
+ * Transport connection must still be valid.
  */
 void
 stream_session_delete (stream_session_t * s)
@@ -1064,13 +1092,23 @@ stream_session_disconnect (stream_session_t * s)
   session_manager_main_t *smm = &session_manager_main;
   session_fifo_event_t *evt;
 
-  if (!s || s->session_state >= SESSION_STATE_CLOSING)
+  if (!s)
     return;
+
+  if (s->session_state >= SESSION_STATE_CLOSING)
+    {
+      /* Session already closed. Clear the tx fifo */
+      if (s->session_state == SESSION_STATE_CLOSED)
+       svm_fifo_dequeue_drop_all (s->server_tx_fifo);
+      return;
+    }
+
   s->session_state = SESSION_STATE_CLOSING;
 
   /* If we are in the handler thread, or being called with the worker barrier
-   * held (api/cli), just append a new event pending disconnects vector. */
-  if (thread_index > 0 || !vlib_get_current_process (vlib_get_main ()))
+   * held (api/cli), just append a new event to pending disconnects vector. */
+  if ((thread_index == 0 && !vlib_get_current_process (vlib_get_main ()))
+      || thread_index == s->thread_index)
     {
       ASSERT (s->thread_index == thread_index || thread_index == 0);
       vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
@@ -1102,23 +1140,24 @@ stream_session_disconnect_transport (stream_session_t * s)
 /**
  * Cleanup transport and session state.
  *
- * Notify transport of the cleanup, wait for a delete notify to actually
- * remove the session state.
+ * Notify transport of the cleanup and free the session. This should
+ * be called only if transport reported some error and is already
+ * closed.
  */
 void
 stream_session_cleanup (stream_session_t * s)
 {
-  int rv;
-
   s->session_state = SESSION_STATE_CLOSED;
 
-  /* Delete from the main lookup table to avoid more enqueues */
-  rv = session_lookup_del_session (s);
-  if (rv)
-    clib_warning ("hash delete error, rv %d", rv);
-
+  /* Delete from main lookup table before we axe the the transport */
+  session_lookup_del_session (s);
   tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index,
                                                    s->thread_index);
+  /* Since we called cleanup, no delete notification will come. So, make
+   * sure the session is properly freed. */
+  segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
+                                s->server_tx_fifo);
+  session_free (s);
 }
 
 transport_service_type_t