s = format (s, "cursize %u nitems %u has_event %d\n",
f->cursize, f->nitems, f->has_event);
- s = format (s, " head %d tail %d\n", f->head, f->tail);
+ s = format (s, " head %d tail %d segment manager %u\n", f->head, f->tail,
+ f->segment_manager);
if (verbose > 1)
s = format
session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
u32 thread_index, void *fp, void *rpc_args)
{
- u32 tries = 0;
session_fifo_event_t evt = { {0}, };
svm_queue_t *q;
+ u32 tries = 0, max_tries;
evt.event_type = evt_type;
if (evt_type == FIFO_EVENT_RPC)
q = session_manager_get_vpp_event_queue (thread_index);
while (svm_queue_add (q, (u8 *) & evt, 1))
{
- if (tries++ == 3)
+ max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
+ if (tries++ == max_tries)
{
SESSION_DBG ("failed to enqueue evt");
break;
session_fifo_event_t evt;
svm_queue_t *q;
- if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+ if (PREDICT_FALSE (s->session_state >= SESSION_STATE_CLOSING))
{
/* Session is closed so app will never clean up. Flush rx fifo */
u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
void
stream_session_disconnect (stream_session_t * s)
{
- if (!s || s->session_state == SESSION_STATE_CLOSED)
+ u32 thread_index = vlib_get_thread_index ();
+ session_manager_main_t *smm = &session_manager_main;
+ session_fifo_event_t *evt;
+
+ if (!s || s->session_state >= SESSION_STATE_CLOSING)
return;
- s->session_state = SESSION_STATE_CLOSED;
- session_send_session_evt_to_thread (session_handle (s),
- FIFO_EVENT_DISCONNECT, s->thread_index);
+ s->session_state = SESSION_STATE_CLOSING;
+
+ /* If we are in the handler thread, or being called with the worker barrier
+ * held (api/cli), just append a new event pending disconnects vector. */
+ if (thread_index > 0 || !vlib_get_current_process (vlib_get_main ()))
+ {
+ ASSERT (s->thread_index == thread_index || thread_index == 0);
+ vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+ memset (evt, 0, sizeof (*evt));
+ evt->session_handle = session_handle (s);
+ evt->event_type = FIFO_EVENT_DISCONNECT;
+ }
+ else
+ session_send_session_evt_to_thread (session_handle (s),
+ FIFO_EVENT_DISCONNECT,
+ s->thread_index);
}
/**
vlib_frame_t * frame)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
- session_fifo_event_t *my_pending_event_vector, *pending_disconnects, *e;
+ session_fifo_event_t *my_pending_event_vector, *e;
session_fifo_event_t *my_fifo_events;
u32 n_to_dequeue, n_events;
svm_queue_t *q;
/* min number of events we can dequeue without blocking */
n_to_dequeue = q->cursize;
my_pending_event_vector = smm->pending_event_vector[thread_index];
- pending_disconnects = smm->pending_disconnects[thread_index];
if (!n_to_dequeue && !vec_len (my_pending_event_vector)
- && !vec_len (pending_disconnects))
+ && !vec_len (smm->pending_disconnects[thread_index]))
return 0;
SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
case FIFO_EVENT_DISCONNECT:
/* Make sure stream disconnects run after the pending list is drained */
s0 = session_get_from_handle (e0->session_handle);
- if (!e0->postponed || svm_fifo_max_dequeue (s0->server_tx_fifo))
+ if (!e0->postponed)
{
e0->postponed = 1;
vec_add1 (smm->pending_disconnects[thread_index], *e0);
continue;
}
+ /* If tx queue is still not empty, wait a bit */
+ if (svm_fifo_max_dequeue (s0->server_tx_fifo)
+ && e0->postponed < 200)
+ {
+ e0->postponed += 1;
+ vec_add1 (smm->pending_disconnects[thread_index], *e0);
+ continue;
+ }
stream_session_disconnect_transport (s0);
break;