return 0;
}
-/** Enqueue buffer chain tail */
+/**
+ * Discards bytes from buffer chain
+ *
+ * It discards n_bytes_to_drop starting at first buffer after chain_b
+ */
+always_inline void
+session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
+ vlib_buffer_t ** chain_b,
+ u32 n_bytes_to_drop)
+{
+ vlib_buffer_t *next = *chain_b;
+ u32 to_drop = n_bytes_to_drop;
+ ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
+ while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
+ {
+ next = vlib_get_buffer (vm, next->next_buffer);
+ if (next->current_length > to_drop)
+ {
+ vlib_buffer_advance (next, to_drop);
+ to_drop = 0;
+ }
+ else
+ {
+ to_drop -= next->current_length;
+ next->current_length = 0;
+ }
+ }
+ *chain_b = next;
+
+ if (to_drop == 0)
+ b->total_length_not_including_first_buffer -= n_bytes_to_drop;
+}
+
+/**
+ * Enqueue buffer chain tail
+ */
always_inline int
session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
u32 offset, u8 is_in_order)
{
vlib_buffer_t *chain_b;
- u32 chain_bi = b->next_buffer;
+ u32 chain_bi, len, diff;
vlib_main_t *vm = vlib_get_main ();
- u8 *data, len;
- u16 written = 0;
+ u8 *data;
+ u32 written = 0;
int rv = 0;
+ if (is_in_order && offset)
+ {
+ diff = offset - b->current_length;
+ if (diff > b->total_length_not_including_first_buffer)
+ return 0;
+ chain_b = b;
+ session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
+ chain_bi = vlib_get_buffer_index (vm, chain_b);
+ }
+ else
+ chain_bi = b->next_buffer;
+
do
{
chain_b = vlib_get_buffer (vm, chain_bi);
data = vlib_buffer_get_current (chain_b);
len = chain_b->current_length;
+ if (!len)
+ continue;
if (is_in_order)
{
rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
- if (rv < len)
+ if (rv == len)
+ {
+ written += rv;
+ }
+ else if (rv < len)
{
return (rv > 0) ? (written + rv) : written;
}
- written += rv;
+ else if (rv > len)
+ {
+ written += rv;
+
+ /* written more than what was left in chain */
+ if (written > b->total_length_not_including_first_buffer)
+ return written;
+
+ /* drop the bytes that have already been delivered */
+ session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
+ }
}
else
{
rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
data);
if (rv)
- return -1;
+ {
+ clib_warning ("failed to enqueue multi-buffer seg");
+ return -1;
+ }
offset += len;
}
}
u32 offset, u8 queue_event, u8 is_in_order)
{
stream_session_t *s;
- int enqueued = 0, rv;
+ int enqueued = 0, rv, in_order_off;
s = stream_session_get (tc->s_index, tc->thread_index);
if (is_in_order)
{
- enqueued =
- svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
- vlib_buffer_get_current (b));
- if (PREDICT_FALSE
- ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0))
+ enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
+ b->current_length,
+ vlib_buffer_get_current (b));
+ if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
+ && enqueued >= 0))
{
- rv = session_enqueue_chain_tail (s, b, 0, 1);
- if (rv <= 0)
- return enqueued;
- enqueued += rv;
+ in_order_off = enqueued > b->current_length ? enqueued : 0;
+ rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
+ if (rv > 0)
+ enqueued += rv;
}
}
else
b->current_length,
vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
- rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
- if (rv)
- return -1;
+ session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
+ /* if something was enqueued, report even this as success for ooo
+ * segment handling */
+ return rv;
}
if (queue_event)
stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
- if (s->session_state != SESSION_STATE_READY)
+ if (!s->server_tx_fifo)
return 0;
return svm_fifo_max_dequeue (s->server_tx_fifo);
}
application_t *app;
stream_session_t *new_s = 0;
u64 handle;
- u32 api_context = 0;
+ u32 opaque = 0;
int error = 0;
+ u8 st;
+ st = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4);
handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
tc->lcl_port, tc->rmt_port,
- tc->transport_proto);
+ st);
if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
{
clib_warning ("This can't be good!");
return -1;
}
- /* Get the app's index from the handle we stored when opening connection */
- app = application_get (handle >> 32);
- api_context = tc->s_index;
+ /* Get the app's index from the handle we stored when opening connection
+ * and the opaque (api_context for external apps) from transport session
+ * index*/
+ app = application_get_if_valid (handle >> 32);
+ if (!app)
+ return -1;
+
+ opaque = tc->s_index;
if (!is_fail)
{
}
/* Notify client application */
- if (app->cb_fns.session_connected_callback (app->index, api_context, new_s,
+ if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
is_fail))
{
clib_warning ("failed to notify app");
/* App might've been removed already */
s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
if (!s)
- {
- return;
- }
+ return;
stream_session_delete (s);
}