{
old_state = s->session_state;
s->session_state = SESSION_STATE_READY;
- if (!svm_fifo_is_empty (s->rx_fifo))
+
+ if (!svm_fifo_is_empty_prod (s->rx_fifo))
app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
/* Closed while waiting for app to reply. Resend disconnect */
if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
{
- application_t *app = application_get (app_wrk->app_index);
- app->cb_fns.session_disconnect_callback (s);
+ app_worker_close_notify (app_wrk, s);
s->session_state = old_state;
return;
}
app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
- app->cb_fns.session_disconnect_callback (s);
+ app_worker_close_notify (app_wrk, s);
}
vlib_node_registration_t session_queue_node;
}
else
{
- if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+ if (ctx->transport_vft->transport_options.tx_type ==
+ TRANSPORT_TX_DGRAM)
{
svm_fifo_t *f = ctx->s->tx_fifo;
session_dgram_hdr_t *hdr = &ctx->hdr;
}
}
else
- n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->tx_fifo,
- len_to_deq, data);
+ n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
+ len_to_deq, data);
}
ASSERT (n_bytes_read == len_to_deq);
chain_b->current_length = n_bytes_read;
}
else
{
- if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+ if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
{
session_dgram_hdr_t *hdr = &ctx->hdr;
svm_fifo_t *f = ctx->s->tx_fifo;
}
else
{
- n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->tx_fifo,
- len_to_deq, data0);
+ n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
+ len_to_deq, data0);
ASSERT (n_bytes_read > 0);
}
}
u32 max_segs, u8 peek_data)
{
u32 n_bytes_per_buf, n_bytes_per_seg;
- ctx->max_dequeue = svm_fifo_max_dequeue (ctx->s->tx_fifo);
+ ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
if (peek_data)
{
/* Offset in rx fifo from where to peek data */
}
else
{
- if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+ if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
{
if (ctx->max_dequeue <= sizeof (ctx->hdr))
{
if (svm_fifo_set_event (ctx->s->tx_fifo))
vec_add1 (wrk->pending_event_vector, *e);
- if (!peek_data && ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
+ if (!peek_data
+ && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
{
/* Fix dgram pre header */
if (ctx->max_len_to_snd < ctx->max_dequeue)
svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
sizeof (session_dgram_pre_hdr_t));
/* More data needs to be read */
- else if (svm_fifo_max_dequeue (ctx->s->tx_fifo) > 0)
+ else if (svm_fifo_max_dequeue_cons (ctx->s->tx_fifo) > 0)
if (svm_fifo_set_event (ctx->s->tx_fifo))
vec_add1 (wrk->pending_event_vector, *e);
+
+ if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd))
+ session_dequeue_notify (ctx->s);
}
return SESSION_TX_OK;
}
for (i = 0; i < n_events; i++)
{
- session_t *s; /* $$$ prefetch 1 ahead maybe */
session_event_t *e;
- u8 need_tx_ntf;
+ session_t *s;
e = &fifo_events[i];
switch (e->event_type)
* different nodes */
rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, e,
&n_tx_packets);
- if (PREDICT_TRUE (rv == SESSION_TX_OK))
- {
- need_tx_ntf = svm_fifo_needs_tx_ntf (s->tx_fifo,
- wrk->ctx.max_len_to_snd);
- if (PREDICT_FALSE (need_tx_ntf))
- session_dequeue_notify (s);
- }
- else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
+ if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
{
vlib_node_increment_counter (vm, node->node_index,
SESSION_QUEUE_ERROR_NO_BUFFER, 1);
continue;
}
break;
+ case SESSION_IO_EVT_RX:
+ s = session_event_get_session (e, thread_index);
+ if (!s)
+ break;
+ transport_app_rx_evt (session_get_transport_proto (s),
+ s->connection_index, s->thread_index);
+ break;
case SESSION_CTRL_EVT_CLOSE:
s = session_get_from_handle_if_valid (e->session_handle);
if (PREDICT_FALSE (!s))
* and the tx queue is still not empty, try to wait for some
* dispatch cycles */
if (!e->postponed
- || (e->postponed < 200 && svm_fifo_max_dequeue (s->tx_fifo)))
+ || (e->postponed < 200
+ && svm_fifo_max_dequeue_cons (s->tx_fifo)))
{
e->postponed += 1;
vec_add1 (wrk->pending_disconnects, *e);