From: Florin Coras Date: Mon, 8 May 2017 02:12:02 +0000 (-0700) Subject: Add support for tcp/session buffer chains X-Git-Tag: v17.07-rc1~226 X-Git-Url: https://gerrit.fd.io/r/gitweb?p=vpp.git;a=commitdiff_plain;h=f6d68ed2db2bcd41c9b7ddde5e411073c1566c29 Add support for tcp/session buffer chains Change-Id: I01c6e3dc3a1b2785df37bb66b19c4b5cbb8f3211 Signed-off-by: Florin Coras --- diff --git a/src/scripts/vnet/uri/dummy_app.py b/src/scripts/vnet/uri/dummy_app.py index 50333923d6e..ff00f2fc8c6 100644 --- a/src/scripts/vnet/uri/dummy_app.py +++ b/src/scripts/vnet/uri/dummy_app.py @@ -6,14 +6,28 @@ import time # action can be reflect or drop action = "drop" +test = 0 + +def test_data (data, n_rcvd): + n_read = len (data); + for i in range(n_read): + expected = (n_rcvd + i) & 0xff + byte_got = ord (data[i]) + if (byte_got != expected): + print("Difference at byte {}. Expected {} got {}" + .format(n_rcvd + i, expected, byte_got)) + return n_read def handle_connection (connection, client_address): print("Received connection from {}".format(repr(client_address))) + n_rcvd = 0 try: while True: data = connection.recv(4096) if not data: break; + if (test == 1): + n_rcvd += test_data (data, n_rcvd) if (action != "drop"): connection.sendall(data) finally: @@ -78,8 +92,9 @@ def run(mode, ip, port): if __name__ == "__main__": if (len(sys.argv)) < 4: - raise Exception("Usage: ./dummy_app []") - if (len(sys.argv) == 5): + raise Exception("Usage: ./dummy_app [ ]") + if (len(sys.argv) == 6): action = sys.argv[4] + test = int(sys.argv[5]) run (sys.argv[1], sys.argv[2], int(sys.argv[3])) diff --git a/src/uri/uri_socket_server.c b/src/uri/uri_socket_server.c index 64d3b49252a..2366f420126 100644 --- a/src/uri/uri_socket_server.c +++ b/src/uri/uri_socket_server.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -72,32 +73,59 @@ setup_signal_handler (void) int main (int argc, char *argv[]) { - int sockfd, portno, n, sent, accfd; + int sockfd, portno, n, sent, accfd, reuse; + socklen_t client_addr_len; struct sockaddr_in serv_addr; + struct sockaddr_in client; struct hostent *server; u8 *rx_buffer = 0; - if (0 && argc < 3) + if (argc > 1 && argc < 3) { - fformat (stderr, "usage %s hostname port\n", argv[0]); + fformat (stderr, "usage %s host port\n", argv[0]); exit (0); } + if (argc >= 3) + { + portno = atoi (argv[2]); + server = gethostbyname (argv[1]); + if (server == NULL) + { + clib_unix_warning ("gethostbyname"); + exit (1); + } + } + else + { + /* Defaults */ + portno = 1234; + server = gethostbyname ("6.0.1.1"); + if (server == NULL) + { + clib_unix_warning ("gethostbyname"); + exit (1); + } + } + + setup_signal_handler (); - portno = 1234; // atoi(argv[2]); sockfd = socket (AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { clib_unix_error ("socket"); exit (1); } - server = gethostbyname ("6.0.1.1"); - if (server == NULL) + + reuse = 1; + if (setsockopt (sockfd, SOL_SOCKET, SO_REUSEADDR, (const char *) &reuse, + sizeof (reuse)) < 0) { - clib_unix_warning ("gethostbyname"); + clib_unix_error ("setsockopt(SO_REUSEADDR) failed"); exit (1); } + bzero ((char *) &serv_addr, sizeof (serv_addr)); serv_addr.sin_family = AF_INET; bcopy ((char *) server->h_addr, @@ -123,12 +151,15 @@ main (int argc, char *argv[]) if (signal_received) break; - accfd = accept (sockfd, 0 /* don't care */ , 0); + client_addr_len = sizeof (struct sockaddr); + accfd = accept (sockfd, (struct sockaddr *) &client, &client_addr_len); if (accfd < 0) { clib_unix_warning ("accept"); continue; } + fformat (stderr, "Accepted connection from: %s : %d\n", + inet_ntoa (client.sin_addr), client.sin_port); while (1) { n = recv (accfd, rx_buffer, vec_len (rx_buffer), 0 /* flags */ ); diff --git a/src/vnet/session/node.c b/src/vnet/session/node.c index 2d12ee2bac9..ce7c38683f5 100644 --- a/src/vnet/session/node.c +++ b/src/vnet/session/node.c @@ -70,6 +70,58 @@ static u32 session_type_to_next[] = { SESSION_QUEUE_NEXT_IP6_LOOKUP, }; +always_inline void +session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm, + u8 thread_index, svm_fifo_t * fifo, + vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg, + u32 * left_to_snd0, u16 * n_bufs, u32 * rx_offset, + u16 deq_per_buf, u8 peek_data) +{ + vlib_buffer_t *chain_b0, *prev_b0; + u32 chain_bi0; + u16 len_to_deq0, n_bytes_read; + u8 *data0, j; + + chain_bi0 = bi0; + chain_b0 = b0; + for (j = 1; j < n_bufs_per_seg; j++) + { + prev_b0 = chain_b0; + len_to_deq0 = clib_min (*left_to_snd0, deq_per_buf); + + *n_bufs -= 1; + chain_bi0 = smm->tx_buffers[thread_index][*n_bufs]; + _vec_len (smm->tx_buffers[thread_index]) = *n_bufs; + + chain_b0 = vlib_get_buffer (vm, chain_bi0); + chain_b0->current_data = 0; + data0 = vlib_buffer_get_current (chain_b0); + if (peek_data) + { + n_bytes_read = svm_fifo_peek (fifo, *rx_offset, len_to_deq0, data0); + *rx_offset += n_bytes_read; + } + else + { + n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0); + } + ASSERT (n_bytes_read == len_to_deq0); + chain_b0->current_length = n_bytes_read; + b0->total_length_not_including_first_buffer += chain_b0->current_length; + + /* update previous buffer */ + prev_b0->next_buffer = chain_bi0; + prev_b0->flags |= VLIB_BUFFER_NEXT_PRESENT; + + /* update current buffer */ + chain_b0->next_buffer = 0; + + *left_to_snd0 -= n_bytes_read; + if (*left_to_snd0 == 0) + break; + } +} + always_inline int session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, session_manager_main_t * smm, @@ -78,16 +130,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, int *n_tx_packets, u8 peek_data) { u32 n_trace = vlib_get_trace_count (vm, node); - u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0; - u32 n_frame_bytes, n_frames_per_evt; + u32 left_to_snd0, max_len_to_snd0, len_to_deq0, snd_space0; + u32 n_bufs_per_evt, n_frames_per_evt; transport_connection_t *tc0; transport_proto_vft_t *transport_vft; u32 next_index, next0, *to_next, n_left_to_next, bi0; vlib_buffer_t *b0; - u32 rx_offset = 0, max_dequeue0; - u16 snd_mss0; + u32 rx_offset = 0, max_dequeue0, n_bytes_per_seg; + u16 snd_mss0, n_bufs_per_seg, n_bufs; u8 *data0; int i, n_bytes_read; + u32 n_bytes_per_buf, deq_per_buf; next_index = next0 = session_type_to_next[s0->session_type]; @@ -134,8 +187,15 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, max_len_to_snd0 = snd_space0; } - n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE; - n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes); + n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm, + VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX); + n_bytes_per_seg = MAX_HDRS_LEN + snd_mss0; + n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf); + n_bufs_per_evt = (ceil ((double) max_len_to_snd0 / n_bytes_per_seg)) + * n_bufs_per_seg; + n_frames_per_evt = ceil ((double) n_bufs_per_evt / VLIB_FRAME_SIZE); + + deq_per_buf = clib_min (snd_mss0, n_bytes_per_buf); n_bufs = vec_len (smm->tx_buffers[thread_index]); left_to_snd0 = max_len_to_snd0; @@ -146,9 +206,9 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, { vec_validate (smm->tx_buffers[thread_index], n_bufs + VLIB_FRAME_SIZE - 1); - n_bufs += - vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs], - VLIB_FRAME_SIZE); + n_bufs += vlib_buffer_alloc (vm, + &smm->tx_buffers[thread_index][n_bufs], + VLIB_FRAME_SIZE); /* buffer shortage * XXX 0.9 because when debugging we might not get a full frame */ @@ -165,11 +225,14 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, } vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); - while (left_to_snd0 && n_left_to_next) + while (left_to_snd0 && n_left_to_next >= n_bufs_per_seg) { + /* + * Handle first buffer in chain separately + */ + /* Get free buffer */ - n_bufs--; - bi0 = smm->tx_buffers[thread_index][n_bufs]; + bi0 = smm->tx_buffers[thread_index][--n_bufs]; _vec_len (smm->tx_buffers[thread_index]) = n_bufs; b0 = vlib_get_buffer (vm, bi0); @@ -177,52 +240,19 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID | VNET_BUFFER_LOCALLY_ORIGINATED; b0->current_data = 0; + b0->total_length_not_including_first_buffer = 0; /* RX on the local interface. tx in default fib */ vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0; vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0; - /* usual speculation, or the enqueue_x1 macro will barf */ - to_next[0] = bi0; - to_next += 1; - n_left_to_next -= 1; - - VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); - if (PREDICT_FALSE (n_trace > 0)) - { - session_queue_trace_t *t0; - vlib_trace_buffer (vm, node, next_index, b0, - 1 /* follow_chain */ ); - vlib_set_trace_count (vm, node, --n_trace); - t0 = vlib_add_trace (vm, node, b0, sizeof (*t0)); - t0->session_index = s0->session_index; - t0->server_thread_index = s0->thread_index; - } + len_to_deq0 = clib_min (left_to_snd0, deq_per_buf); - len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0; - - /* *INDENT-OFF* */ - SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({ - ed->data[0] = e0->event_id; - ed->data[1] = max_dequeue0; - ed->data[2] = len_to_deq0; - ed->data[3] = left_to_snd0; - })); - /* *INDENT-ON* */ - - /* Make room for headers */ data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN); - - /* Dequeue the data - * TODO 1) peek instead of dequeue - * 2) buffer chains */ if (peek_data) { n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset, len_to_deq0, data0); - if (n_bytes_read <= 0) - goto dequeue_fail; - /* Keep track of progress locally, transport is also supposed to * increment it independently when pushing the header */ rx_offset += n_bytes_read; @@ -231,18 +261,56 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, { n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo, len_to_deq0, data0); - if (n_bytes_read <= 0) - goto dequeue_fail; } - b0->current_length = n_bytes_read; + if (n_bytes_read <= 0) + goto dequeue_fail; - /* Ask transport to push header */ - transport_vft->push_header (tc0, b0); + b0->current_length = n_bytes_read; left_to_snd0 -= n_bytes_read; *n_tx_packets = *n_tx_packets + 1; + /* + * Fill in the remaining buffers in the chain, if any + */ + if (PREDICT_FALSE (n_bufs_per_seg > 1)) + session_tx_fifo_chain_tail (smm, vm, thread_index, + s0->server_tx_fifo, b0, bi0, + n_bufs_per_seg, &left_to_snd0, + &n_bufs, &rx_offset, deq_per_buf, + peek_data); + + /* Ask transport to push header after current_length and + * total_length_not_including_first_buffer are updated */ + transport_vft->push_header (tc0, b0); + + /* *INDENT-OFF* */ + SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({ + ed->data[0] = e0->event_id; + ed->data[1] = max_dequeue0; + ed->data[2] = len_to_deq0; + ed->data[3] = left_to_snd0; + })); + /* *INDENT-ON* */ + + /* usual speculation, or the enqueue_x1 macro will barf */ + to_next[0] = bi0; + to_next += 1; + n_left_to_next -= 1; + + VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); + if (PREDICT_FALSE (n_trace > 0)) + { + session_queue_trace_t *t0; + vlib_trace_buffer (vm, node, next_index, b0, + 1 /* follow_chain */ ); + vlib_set_trace_count (vm, node, --n_trace); + t0 = vlib_add_trace (vm, node, b0, sizeof (*t0)); + t0->session_index = s0->session_index; + t0->server_thread_index = s0->thread_index; + } + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, n_left_to_next, bi0, next0); diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index e92bb440601..6e129dde018 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -432,33 +432,97 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc, return 0; } +/** Enqueue buffer chain tail */ +always_inline int +session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b, + u32 offset, u8 is_in_order) +{ + vlib_buffer_t *chain_b; + u32 chain_bi = b->next_buffer; + vlib_main_t *vm = vlib_get_main (); + u8 *data, len; + u16 written = 0; + int rv = 0; + + do + { + chain_b = vlib_get_buffer (vm, chain_bi); + data = vlib_buffer_get_current (chain_b); + len = chain_b->current_length; + if (is_in_order) + { + rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data); + if (rv < len) + { + return (rv > 0) ? (written + rv) : written; + } + written += rv; + } + else + { + rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len, + data); + if (rv) + return -1; + offset += len; + } + } + while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT) + ? chain_b->next_buffer : 0)); + + if (is_in_order) + return written; + + return 0; +} + /* * Enqueue data for delivery to session peer. Does not notify peer of enqueue * event but on request can queue notification events for later delivery by * calling stream_server_flush_enqueue_events(). * * @param tc Transport connection which is to be enqueued data - * @param data Data to be enqueued - * @param len Length of data to be enqueued + * @param b Buffer to be enqueued + * @param offset Offset at which to start enqueueing if out-of-order * @param queue_event Flag to indicate if peer is to be notified or if event * is to be queued. The former is useful when more data is * enqueued and only one event is to be generated. + * @param is_in_order Flag to indicate if data is in order * @return Number of bytes enqueued or a negative value if enqueueing failed. */ int -stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len, - u8 queue_event) +stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, + u32 offset, u8 queue_event, u8 is_in_order) { stream_session_t *s; - int enqueued; + int enqueued = 0, rv; s = stream_session_get (tc->s_index, tc->thread_index); - /* Make sure there's enough space left. We might've filled the pipes */ - if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo))) - return -1; - - enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data); + if (is_in_order) + { + enqueued = + svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length, + vlib_buffer_get_current (b)); + if (PREDICT_FALSE + ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0)) + { + rv = session_enqueue_chain_tail (s, b, 0, 1); + if (rv <= 0) + return enqueued; + enqueued += rv; + } + } + else + { + rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, + b->current_length, + vlib_buffer_get_current (b)); + if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv)) + rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0); + if (rv) + return -1; + } if (queue_event) { @@ -476,7 +540,10 @@ stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len, } } - return enqueued; + if (is_in_order) + return enqueued; + + return 0; } /** Check if we have space in rx fifo to push more bytes */ diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index f41a8a96eb5..f152a2be0e2 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -345,8 +345,8 @@ stream_session_fifo_size (transport_connection_t * tc) } int -stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len, - u8 queue_event); +stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, + u32 offset, u8 queue_event, u8 is_in_order); u32 stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer, u32 offset, u32 max_bytes); diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c index d268251cb38..ceb00fc3e41 100644 --- a/src/vnet/tcp/tcp_input.c +++ b/src/vnet/tcp/tcp_input.c @@ -993,9 +993,8 @@ tcp_session_enqueue_data (tcp_connection_t * tc, vlib_buffer_t * b, return TCP_ERROR_PURE_ACK; } - written = stream_session_enqueue_data (&tc->connection, - vlib_buffer_get_current (b), - data_len, 1 /* queue event */ ); + written = stream_session_enqueue_data (&tc->connection, b, 0, + 1 /* queue event */ , 1); TCP_EVT_DBG (TCP_EVT_INPUT, tc, 0, data_len, written); @@ -1053,12 +1052,10 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b, return TCP_ERROR_PURE_ACK; } - s0 = stream_session_get (tc->c_s_index, tc->c_thread_index); - /* Enqueue out-of-order data with absolute offset */ - rv = svm_fifo_enqueue_with_offset (s0->server_rx_fifo, - vnet_buffer (b)->tcp.seq_number, - data_len, vlib_buffer_get_current (b)); + rv = stream_session_enqueue_data (&tc->connection, b, + vnet_buffer (b)->tcp.seq_number, + 0 /* queue event */ , 0); /* Nothing written */ if (rv) @@ -1075,6 +1072,8 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b, ooo_segment_t *newest; u32 start, end; + s0 = stream_session_get (tc->c_s_index, tc->c_thread_index); + /* Get the newest segment from the fifo */ newest = svm_fifo_newest_ooo_segment (s0->server_rx_fifo); start = ooo_segment_offset (s0->server_rx_fifo, newest); @@ -2543,6 +2542,7 @@ do { \ _(FIN_WAIT_1, TCP_FLAG_FIN, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE); /* FIN confirming that the peer (app) has closed */ _(FIN_WAIT_2, TCP_FLAG_FIN, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE); + _(FIN_WAIT_2, TCP_FLAG_ACK, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE); _(FIN_WAIT_2, TCP_FLAG_FIN | TCP_FLAG_ACK, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE); _(LAST_ACK, TCP_FLAG_ACK, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE); diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c index 2a1b140750c..33e599ece32 100644 --- a/src/vnet/tcp/tcp_output.c +++ b/src/vnet/tcp/tcp_output.c @@ -46,7 +46,7 @@ typedef struct tcp_connection_t tcp_connection; } tcp_tx_trace_t; -u16 dummy_mtu = 400; +u16 dummy_mtu = 1460; u8 * format_tcp_tx_trace (u8 * s, va_list * args) @@ -923,7 +923,7 @@ tcp_push_hdr_i (tcp_connection_t * tc, vlib_buffer_t * b, u8 tcp_hdr_opts_len, opts_write_len, flags; tcp_header_t *th; - data_len = b->current_length; + data_len = b->current_length + b->total_length_not_including_first_buffer; vnet_buffer (b)->tcp.flags = 0; if (compute_opts)