stream_session_t *s;
u32 thread_index = tc->thread_index;
- ASSERT (thread_index == vlib_get_thread_index ());
+ ASSERT (thread_index == vlib_get_thread_index ()
+ || transport_protocol_is_cl (tc->proto));
s = session_alloc (thread_index);
s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
}
u32
-stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
+session_tx_fifo_max_dequeue (transport_connection_t * tc)
{
stream_session_t *s = session_get (tc->s_index, tc->thread_index);
if (!s->server_tx_fifo)
session_fifo_event_t evt;
svm_queue_t *q;
- if (PREDICT_FALSE (s->session_state >= SESSION_STATE_CLOSING))
+ if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
{
/* Session is closed so app will never clean up. Flush rx fifo */
- u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
- if (to_dequeue)
- svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
+ svm_fifo_dequeue_drop_all (s->server_rx_fifo);
return 0;
}
- /* Get session's server */
app = application_get_if_valid (s->app_index);
-
if (PREDICT_FALSE (app == 0))
{
clib_warning ("invalid s->app_index = %d", s->app_index);
return 0;
}
+int
+session_dequeue_notify (stream_session_t * s)
+{
+ application_t *app;
+ svm_queue_t *q;
+
+ app = application_get_if_valid (s->app_index);
+ if (PREDICT_FALSE (!app))
+ return -1;
+
+ if (application_is_builtin (app))
+ return 0;
+
+ q = app->event_queue;
+ if (PREDICT_TRUE (q->cursize < q->maxsize))
+ {
+ session_fifo_event_t evt = {
+ .event_type = FIFO_EVENT_APP_TX,
+ .fifo = s->server_tx_fifo
+ };
+ svm_queue_add (app->event_queue, (u8 *) & evt, SVM_Q_WAIT);
+ }
+ else
+ {
+ return -1;
+ }
+ return 0;
+}
+
/**
* Flushes queue of sessions that are to be notified of new data
* enqueued events.
s = session_get (tc->s_index, tc->thread_index);
server = application_get (s->app_index);
server->cb_fns.session_disconnect_callback (s);
+ s->session_state = SESSION_STATE_CLOSING;
}
/**
* Cleans up session and lookup table.
+ *
+ * Transport connection must still be valid.
*/
void
stream_session_delete (stream_session_t * s)
session_manager_main_t *smm = &session_manager_main;
session_fifo_event_t *evt;
- if (!s || s->session_state >= SESSION_STATE_CLOSING)
+ if (!s)
return;
+
+ if (s->session_state >= SESSION_STATE_CLOSING)
+ {
+ /* Session already closed. Clear the tx fifo */
+ if (s->session_state == SESSION_STATE_CLOSED)
+ svm_fifo_dequeue_drop_all (s->server_tx_fifo);
+ return;
+ }
+
s->session_state = SESSION_STATE_CLOSING;
/* If we are in the handler thread, or being called with the worker barrier
- * held (api/cli), just append a new event pending disconnects vector. */
- if (thread_index > 0 || !vlib_get_current_process (vlib_get_main ()))
+ * held (api/cli), just append a new event to pending disconnects vector. */
+ if ((thread_index == 0 && !vlib_get_current_process (vlib_get_main ()))
+ || thread_index == s->thread_index)
{
ASSERT (s->thread_index == thread_index || thread_index == 0);
vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
/**
* Cleanup transport and session state.
*
- * Notify transport of the cleanup, wait for a delete notify to actually
- * remove the session state.
+ * Notify transport of the cleanup and free the session. This should
+ * be called only if transport reported some error and is already
+ * closed.
*/
void
stream_session_cleanup (stream_session_t * s)
{
- int rv;
-
s->session_state = SESSION_STATE_CLOSED;
- /* Delete from the main lookup table to avoid more enqueues */
- rv = session_lookup_del_session (s);
- if (rv)
- clib_warning ("hash delete error, rv %d", rv);
-
+ /* Delete from main lookup table before we axe the the transport */
+ session_lookup_del_session (s);
tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index,
s->thread_index);
+ /* Since we called cleanup, no delete notification will come. So, make
+ * sure the session is properly freed. */
+ segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
+ s->server_tx_fifo);
+ session_free (s);
}
transport_service_type_t
eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
eqs->requested_va = smm->session_baseva;
- ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD);
+ if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
+ {
+ clib_warning ("failed to initialize queue segment");
+ return;
+ }
}
if (smm->evt_qs_use_memfd_seg)