break;
}
+ if (CLIB_DEBUG)
+ {
+ f->master_session_index = ~0;
+ f->master_thread_index = ~0;
+ }
+
ssvm_pop_heap (oldheap);
ssvm_unlock_non_recursive (sh);
}
s->server_tx_fifo = server_tx_fifo;
/* Initialize state machine, such as it is... */
- s->session_type = tc->proto;
+ s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
+ tc->is_ip4);
s->session_state = SESSION_STATE_CONNECTING;
s->svm_segment_index = fifo_segment_index;
s->thread_index = thread_index;
}
int
-stream_session_connect_notify (transport_connection_t * tc, u8 sst,
- u8 is_fail)
+stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
{
application_t *app;
stream_session_t *new_s = 0;
handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
tc->lcl_port, tc->rmt_port,
- tc->proto);
+ tc->transport_proto);
if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
{
clib_warning ("This can't be good!");
new_s->app_index = app->index;
}
- /* Notify client */
+ /* Notify client application */
if (app->cb_fns.session_connected_callback (app->index, api_context, new_s,
is_fail))
{
}
/* Cleanup session lookup */
- stream_session_half_open_table_del (sst, tc);
+ stream_session_half_open_table_del (tc);
return error;
}
handle = (((u64) app_index) << 32) | (u64) tc->c_index;
/* Add to the half-open lookup table */
- stream_session_half_open_table_add (st, tc, handle);
+ stream_session_half_open_table_add (tc, handle);
*res = tc;
u32 offset, u32 max_bytes);
u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes);
-int stream_session_connect_notify (transport_connection_t * tc, u8 sst,
- u8 is_fail);
+int stream_session_connect_notify (transport_connection_t * tc, u8 is_fail);
void stream_session_init_fifos_pointers (transport_connection_t * tc,
u32 rx_pointer, u32 tx_pointer);
u8 *str = 0;
tp_vft = session_get_transport_vft (ss->session_type);
- if (verbose == 1)
+ if (verbose == 1 && ss->session_state >= SESSION_STATE_ACCEPTING)
str = format (0, "%-10u%-10u%-10lld",
svm_fifo_max_dequeue (ss->server_rx_fifo),
svm_fifo_max_enqueue (ss->server_tx_fifo),
make_v4_ss_kv_from_tc (session_kv4_t * kv, transport_connection_t * t)
{
return make_v4_ss_kv (kv, &t->lcl_ip.ip4, &t->rmt_ip.ip4, t->lcl_port,
- t->rmt_port, t->proto);
+ t->rmt_port, t->transport_proto);
}
always_inline void
make_v6_ss_kv_from_tc (session_kv6_t * kv, transport_connection_t * t)
{
make_v6_ss_kv (kv, &t->lcl_ip.ip6, &t->rmt_ip.ip6, t->lcl_port,
- t->rmt_port, t->proto);
+ t->rmt_port, t->transport_proto);
}
/*
session_kv4_t kv4;
session_kv6_t kv6;
- switch (tc->proto)
+ if (tc->is_ip4)
{
- case SESSION_TYPE_IP4_UDP:
- case SESSION_TYPE_IP4_TCP:
make_v4_ss_kv_from_tc (&kv4, tc);
kv4.value = value;
clib_bihash_add_del_16_8 (&sl->v4_session_hash, &kv4, 1 /* is_add */ );
- break;
- case SESSION_TYPE_IP6_UDP:
- case SESSION_TYPE_IP6_TCP:
+ }
+ else
+ {
make_v6_ss_kv_from_tc (&kv6, tc);
kv6.value = value;
clib_bihash_add_del_48_8 (&sl->v6_session_hash, &kv6, 1 /* is_add */ );
- break;
- default:
- clib_warning ("Session type not supported");
- ASSERT (0);
}
}
stream_session_table_add_for_tc (tc, value);
}
-void
-stream_session_half_open_table_add (session_type_t sst,
- transport_connection_t * tc, u64 value)
-{
- session_lookup_t *sl = &session_lookup;
- session_kv4_t kv4;
- session_kv6_t kv6;
-
- switch (sst)
- {
- case SESSION_TYPE_IP4_UDP:
- case SESSION_TYPE_IP4_TCP:
- make_v4_ss_kv_from_tc (&kv4, tc);
- kv4.value = value;
- clib_bihash_add_del_16_8 (&sl->v4_half_open_hash, &kv4,
- 1 /* is_add */ );
- break;
- case SESSION_TYPE_IP6_UDP:
- case SESSION_TYPE_IP6_TCP:
- make_v6_ss_kv_from_tc (&kv6, tc);
- kv6.value = value;
- clib_bihash_add_del_48_8 (&sl->v6_half_open_hash, &kv6,
- 1 /* is_add */ );
- break;
- default:
- clib_warning ("Session type not supported");
- ASSERT (0);
- }
-}
-
int
stream_session_table_del_for_tc (transport_connection_t * tc)
{
session_lookup_t *sl = &session_lookup;
session_kv4_t kv4;
session_kv6_t kv6;
- switch (tc->proto)
+
+ if (tc->is_ip4)
{
- case SESSION_TYPE_IP4_UDP:
- case SESSION_TYPE_IP4_TCP:
make_v4_ss_kv_from_tc (&kv4, tc);
return clib_bihash_add_del_16_8 (&sl->v4_session_hash, &kv4,
0 /* is_add */ );
- break;
- case SESSION_TYPE_IP6_UDP:
- case SESSION_TYPE_IP6_TCP:
+ }
+ else
+ {
make_v6_ss_kv_from_tc (&kv6, tc);
return clib_bihash_add_del_48_8 (&sl->v6_session_hash, &kv6,
0 /* is_add */ );
- break;
- default:
- clib_warning ("Session type not supported");
- ASSERT (0);
}
return 0;
return stream_session_table_del_for_tc (ts);
}
+
void
-stream_session_half_open_table_del (u8 sst, transport_connection_t * tc)
+stream_session_half_open_table_add (transport_connection_t * tc, u64 value)
{
session_lookup_t *sl = &session_lookup;
session_kv4_t kv4;
session_kv6_t kv6;
- switch (sst)
+ if (tc->is_ip4)
+ {
+ make_v4_ss_kv_from_tc (&kv4, tc);
+ kv4.value = value;
+ clib_bihash_add_del_16_8 (&sl->v4_half_open_hash, &kv4,
+ 1 /* is_add */ );
+ }
+ else
+ {
+ make_v6_ss_kv_from_tc (&kv6, tc);
+ kv6.value = value;
+ clib_bihash_add_del_48_8 (&sl->v6_half_open_hash, &kv6,
+ 1 /* is_add */ );
+ }
+}
+
+void
+stream_session_half_open_table_del (transport_connection_t * tc)
+{
+ session_lookup_t *sl = &session_lookup;
+ session_kv4_t kv4;
+ session_kv6_t kv6;
+
+ if (tc->is_ip4)
{
- case SESSION_TYPE_IP4_UDP:
- case SESSION_TYPE_IP4_TCP:
make_v4_ss_kv_from_tc (&kv4, tc);
clib_bihash_add_del_16_8 (&sl->v4_half_open_hash, &kv4,
0 /* is_add */ );
- break;
- case SESSION_TYPE_IP6_UDP:
- case SESSION_TYPE_IP6_TCP:
+ }
+ else
+ {
make_v6_ss_kv_from_tc (&kv6, tc);
clib_bihash_add_del_48_8 (&sl->v6_half_open_hash, &kv6,
0 /* is_add */ );
- break;
- default:
- clib_warning ("Session type not supported");
- ASSERT (0);
}
}
void stream_session_table_add_for_tc (transport_connection_t * tc, u64 value);
int stream_session_table_del_for_tc (transport_connection_t * tc);
int stream_session_table_del (stream_session_t * s);
-void stream_session_half_open_table_del (u8 sst, transport_connection_t * tc);
-void stream_session_half_open_table_add (session_type_t sst,
- transport_connection_t * tc,
+void stream_session_half_open_table_del (transport_connection_t * tc);
+void stream_session_half_open_table_add (transport_connection_t * tc,
u64 value);
void session_lookup_init (void);
ip46_address_t lcl_ip; /**< Local IP */
u16 lcl_port; /**< Local port */
u16 rmt_port; /**< Remote port */
- u8 proto; /**< Protocol id (also session type) */
+ u8 transport_proto; /**< Protocol id */
+ u8 is_ip4; /**< Flag if IP4 connection */
u32 vrf; /**< FIB table id */
u32 s_index; /**< Parent session index */
u32 c_index; /**< Connection index in transport pool */
- u8 is_ip4; /**< Flag if IP4 connection */
u32 thread_index; /**< Worker-thread index */
fib_node_index_t rmt_fei; /**< FIB entry index for rmt */
#define c_rmt_ip6 connection.rmt_ip.ip6
#define c_lcl_port connection.lcl_port
#define c_rmt_port connection.rmt_port
-#define c_proto connection.proto
+#define c_transport_proto connection.transport_proto
#define c_vrf connection.vrf
#define c_state connection.state
#define c_s_index connection.s_index
* @param vft - virtual function table
*/
void
-session_register_transport (u8 session_type,
+session_register_transport (transport_proto_t transport_proto, u8 is_ip4,
const transport_proto_vft_t * vft)
{
+ u8 session_type;
+ session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
+
vec_validate (tp_vfts, session_type);
tp_vfts[session_type] = *vft;
void transport_endpoint_table_del (transport_endpoint_table_t * ht,
transport_endpoint_t * te);
-void session_register_transport (u8 session_type,
+void session_register_transport (transport_proto_t transport_proto, u8 is_ip4,
const transport_proto_vft_t * vft);
transport_proto_vft_t *session_get_transport_vft (u8 session_type);
#define TCP_BUILTIN_CLIENT_DBG (0)
+static void
+signal_evt_to_cli_i (int *code)
+{
+ tclient_main_t *tm = &tclient_main;
+ ASSERT (vlib_get_thread_index () == 0);
+ vlib_process_signal_event (tm->vlib_main, tm->cli_node_index, *code, 0);
+}
+
+static void
+signal_evt_to_cli (int code)
+{
+ if (vlib_get_thread_index () != 0)
+ vl_api_rpc_call_main_thread (signal_evt_to_cli_i, (u8 *) & code,
+ sizeof (code));
+ else
+ signal_evt_to_cli_i (&code);
+}
+
static void
send_test_chunk (tclient_main_t * tm, session_t * s)
{
u32 bytes_this_chunk;
session_fifo_event_t evt;
static int serial_number = 0;
+ svm_fifo_t *txf;
int rv;
ASSERT (vec_len (test_data) > 0);
bytes_this_chunk = bytes_this_chunk < s->bytes_to_send
? bytes_this_chunk : s->bytes_to_send;
- rv = svm_fifo_enqueue_nowait (s->server_tx_fifo, bytes_this_chunk,
+ txf = s->server_tx_fifo;
+ rv = svm_fifo_enqueue_nowait (txf, bytes_this_chunk,
test_data + test_buf_offset);
/* If we managed to enqueue data... */
}
/* Poke the session layer */
- if (svm_fifo_set_event (s->server_tx_fifo))
+ if (svm_fifo_set_event (txf))
{
/* Fabricate TX event, send to vpp */
- evt.fifo = s->server_tx_fifo;
+ evt.fifo = txf;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = serial_number++;
- if (unix_shared_memory_queue_add (tm->vpp_event_queue, (u8 *) & evt,
- 0 /* do wait for mutex */ ))
+ if (unix_shared_memory_queue_add
+ (tm->vpp_event_queue[txf->master_thread_index], (u8 *) & evt,
+ 0 /* do wait for mutex */ ))
clib_warning ("could not enqueue event");
}
}
{
svm_fifo_t *rx_fifo = s->server_rx_fifo;
int n_read, test_bytes = 0;
+ u32 my_thread_index = vlib_get_thread_index ();
/* Allow enqueuing of new event */
// svm_fifo_unset_event (rx_fifo);
if (test_bytes)
{
- n_read = svm_fifo_dequeue_nowait (rx_fifo, vec_len (tm->rx_buf),
- tm->rx_buf);
+ n_read = svm_fifo_dequeue_nowait (rx_fifo,
+ vec_len (tm->rx_buf[my_thread_index]),
+ tm->rx_buf[my_thread_index]);
}
else
{
int i;
for (i = 0; i < n_read; i++)
{
- if (tm->rx_buf[i] != ((s->bytes_received + i) & 0xff))
+ if (tm->rx_buf[my_thread_index][i]
+ != ((s->bytes_received + i) & 0xff))
{
clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
- n_read, s->bytes_received + i, tm->rx_buf[i],
+ n_read, s->bytes_received + i,
+ tm->rx_buf[my_thread_index][i],
((s->bytes_received + i) & 0xff));
}
}
if (s)
{
- stream_session_disconnect (s);
+ vnet_disconnect_args_t _a, *a = &_a;
+ a->handle = stream_session_handle (s);
+ a->app_index = tm->app_index;
+ vnet_disconnect_session (a);
+
vec_delete (connections_this_batch, 1, i);
i--;
__sync_fetch_and_add (&tm->ready_connections, -1);
/* Kick the debug CLI process */
if (tm->ready_connections == 0)
{
- tm->test_end_time = vlib_time_now (vm);
- vlib_process_signal_event (vm, tm->cli_node_index,
- 2, 0 /* data */ );
+ signal_evt_to_cli (2);
}
}
}
tcp_test_clients_init (vlib_main_t * vm)
{
tclient_main_t *tm = &tclient_main;
- vlib_thread_main_t *thread_main = vlib_get_thread_main ();
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ u32 num_threads;
int i;
tclient_api_hookup (vm);
if (create_api_loopback (tm))
return -1;
+ num_threads = 1 /* main thread */ + vtm->n_threads;
+
/* Init test data. Big buffer */
vec_validate (tm->connect_test_data, 1024 * 1024 - 1);
for (i = 0; i < vec_len (tm->connect_test_data); i++)
tm->connect_test_data[i] = i & 0xff;
- tm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
- vec_validate (tm->rx_buf, vec_len (tm->connect_test_data) - 1);
+ vec_validate (tm->rx_buf, num_threads - 1);
+ for (i = 0; i < num_threads; i++)
+ vec_validate (tm->rx_buf[i], vec_len (tm->connect_test_data) - 1);
tm->is_init = 1;
- tm->vlib_main = vm;
- vec_validate (tm->connection_index_by_thread, thread_main->n_vlib_mains);
- vec_validate (tm->connections_this_batch_by_thread,
- thread_main->n_vlib_mains);
+ vec_validate (tm->connection_index_by_thread, vtm->n_vlib_mains);
+ vec_validate (tm->connections_this_batch_by_thread, vtm->n_vlib_mains);
+ vec_validate (tm->vpp_event_queue, vtm->n_vlib_mains);
+
return 0;
}
tclient_main_t *tm = &tclient_main;
session_t *session;
u32 session_index;
- int i;
+ u8 thread_index = vlib_get_thread_index ();
+
+ ASSERT (s->thread_index == thread_index);
if (is_fail)
{
clib_warning ("connection %d failed!", api_context);
- vlib_process_signal_event (tm->vlib_main, tm->cli_node_index, -1,
- 0 /* data */ );
- return -1;
+ signal_evt_to_cli (-1);
+ return 0;
}
- tm->our_event_queue = session_manager_get_vpp_event_queue (s->thread_index);
- tm->vpp_event_queue = session_manager_get_vpp_event_queue (s->thread_index);
+ if (!tm->vpp_event_queue[thread_index])
+ tm->vpp_event_queue[thread_index] =
+ session_manager_get_vpp_event_queue (thread_index);
/*
* Setup session
*/
+ clib_spinlock_lock_if_init (&tm->sessions_lock);
pool_get (tm->sessions, session);
+ clib_spinlock_unlock_if_init (&tm->sessions_lock);
+
memset (session, 0, sizeof (*session));
session_index = session - tm->sessions;
session->bytes_to_send = tm->bytes_to_send;
session->server_tx_fifo->client_session_index = session_index;
session->vpp_session_handle = stream_session_handle (s);
- /* Add it to the session lookup table */
- hash_set (tm->session_index_by_vpp_handles, session->vpp_session_handle,
- session_index);
-
- if (tm->ready_connections == tm->expected_connections - 1)
- {
- vlib_thread_main_t *thread_main = vlib_get_thread_main ();
- int thread_index;
-
- thread_index = 0;
- for (i = 0; i < pool_elts (tm->sessions); i++)
- {
- vec_add1 (tm->connection_index_by_thread[thread_index], i);
- thread_index++;
- if (thread_index == thread_main->n_vlib_mains)
- thread_index = 0;
- }
- }
+ vec_add1 (tm->connection_index_by_thread[thread_index], session_index);
__sync_fetch_and_add (&tm->ready_connections, 1);
if (tm->ready_connections == tm->expected_connections)
{
tm->run_test = 1;
- tm->test_start_time = vlib_time_now (tm->vlib_main);
/* Signal the CLI process that the action is starting... */
- vlib_process_signal_event (tm->vlib_main, tm->cli_node_index, 1,
- 0 /* data */ );
+ signal_evt_to_cli (1);
}
return 0;
tm->connections_per_batch = 1000;
tm->private_segment_count = 0;
tm->private_segment_size = 0;
-
+ tm->vlib_main = vm;
+ if (thread_main->n_vlib_mains > 1)
+ clib_spinlock_init (&tm->sessions_lock);
vec_free (tm->connect_uri);
while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
start_tx_pthread ();
#endif
+ vlib_worker_thread_barrier_sync (vm);
vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
+ vlib_worker_thread_barrier_release (vm);
if (tm->test_client_attached == 0)
{
clients_connect (vm, uri, n_clients);
/* Park until the sessions come up, or ten seconds elapse... */
- vlib_process_wait_for_event_or_clock (vm, 10.0 /* timeout, seconds */ );
+ vlib_process_wait_for_event_or_clock (vm, 10 /* timeout, seconds */ );
event_type = vlib_process_get_events (vm, &event_data);
-
switch (event_type)
{
case ~0:
goto cleanup;
case 1:
+ tm->test_start_time = vlib_time_now (tm->vlib_main);
vlib_cli_output (vm, "Test started at %.6f", tm->test_start_time);
break;
/* Now wait for the sessions to finish... */
vlib_process_wait_for_event_or_clock (vm, cli_timeout);
event_type = vlib_process_get_events (vm, &event_data);
-
switch (event_type)
{
case ~0:
goto cleanup;
case 2:
+ tm->test_end_time = vlib_time_now (vm);
vlib_cli_output (vm, "Test finished at %.6f", tm->test_end_time);
break;
vec_reset_length (tm->connection_index_by_thread[i]);
vec_reset_length (tm->connections_this_batch_by_thread[i]);
}
+
pool_free (tm->sessions);
return 0;
.short_help = "test tcp clients [nclients %d]"
"[iterations %d] [bytes %d] [uri tcp://6.0.1.1/1234]",
.function = test_tcp_clients_command_fn,
+ .is_mp_safe = 1,
};
/* *INDENT-ON* */
* Application setup parameters
*/
unix_shared_memory_queue_t *vl_input_queue; /**< vpe input queue */
- unix_shared_memory_queue_t *our_event_queue; /**< Our event queue */
- unix_shared_memory_queue_t *vpp_event_queue; /**< $$$ single thread */
+ unix_shared_memory_queue_t **vpp_event_queue;
u32 cli_node_index; /**< cli process node index */
u32 my_client_index; /**< loopback API client handle */
/*
* Test state variables
*/
- session_t *sessions; /**< Sessions pool */
- u8 *rx_buf; /**< intermediate rx buffer */
- uword *session_index_by_vpp_handles; /**< Hash table for disconnecting */
+ session_t *sessions; /**< Session pool, shared */
+ clib_spinlock_t sessions_lock;
+ u8 **rx_buf; /**< intermediate rx buffers */
u8 *connect_test_data; /**< Pre-computed test data */
u32 **connection_index_by_thread;
u32 **connections_this_batch_by_thread; /**< active connection batch */
{
listener->c_lcl_ip4.as_u32 = lcl->ip.ip4.as_u32;
listener->c_is_ip4 = 1;
- listener->c_proto = SESSION_TYPE_IP4_TCP;
}
else
{
clib_memcpy (&listener->c_lcl_ip6, &lcl->ip.ip6,
sizeof (ip6_address_t));
- listener->c_proto = SESSION_TYPE_IP6_TCP;
- }
+ }
+ listener->c_transport_proto = TRANSPORT_PROTO_TCP;
listener->c_s_index = session_index;
listener->state = TCP_STATE_LISTEN;
return &tc->connection;
}
+always_inline void
+transport_endpoint_del (u32 tepi)
+{
+ tcp_main_t *tm = vnet_get_tcp_main ();
+ clib_spinlock_lock_if_init (&tm->local_endpoints_lock);
+ pool_put_index (tm->local_endpoints, tepi);
+ clib_spinlock_unlock_if_init (&tm->local_endpoints_lock);
+}
+
+always_inline transport_endpoint_t *
+transport_endpoint_new (void)
+{
+ tcp_main_t *tm = vnet_get_tcp_main ();
+ transport_endpoint_t *tep;
+ pool_get (tm->local_endpoints, tep);
+ return tep;
+}
+
+/**
+ * Cleanup half-open connection
+ *
+ */
+void
+tcp_half_open_connection_del (tcp_connection_t * tc)
+{
+ tcp_main_t *tm = vnet_get_tcp_main ();
+ clib_spinlock_lock_if_init (&tm->half_open_lock);
+ pool_put_index (tm->half_open_connections, tc->c_c_index);
+ if (CLIB_DEBUG)
+ memset (tc, 0xFA, sizeof (*tc));
+ clib_spinlock_unlock_if_init (&tm->half_open_lock);
+}
+
+/**
+ * Try to cleanup half-open connection
+ *
+ * If called from a thread that doesn't own tc, the call won't have any
+ * effect.
+ *
+ * @param tc - connection to be cleaned up
+ * @return non-zero if cleanup failed.
+ */
+int
+tcp_half_open_connection_cleanup (tcp_connection_t * tc)
+{
+ /* Make sure this is the owning thread */
+ if (tc->c_thread_index != vlib_get_thread_index ())
+ return 1;
+ tcp_timer_reset (tc, TCP_TIMER_ESTABLISH);
+ tcp_timer_reset (tc, TCP_TIMER_RETRANSMIT_SYN);
+ tcp_half_open_connection_del (tc);
+ return 0;
+}
+
+tcp_connection_t *
+tcp_half_open_connection_new (void)
+{
+ tcp_main_t *tm = vnet_get_tcp_main ();
+ tcp_connection_t *tc = 0;
+ pool_get (tm->half_open_connections, tc);
+ memset (tc, 0, sizeof (*tc));
+ tc->c_c_index = tc - tm->half_open_connections;
+ return tc;
+}
+
/**
* Cleans up connection state.
*
/* Cleanup local endpoint if this was an active connect */
tepi = transport_endpoint_lookup (&tm->local_endpoints_table, &tc->c_lcl_ip,
tc->c_lcl_port);
-
- /*XXX lock */
if (tepi != TRANSPORT_ENDPOINT_INVALID_INDEX)
{
tep = pool_elt_at_index (tm->local_endpoints, tepi);
transport_endpoint_table_del (&tm->local_endpoints_table, tep);
- pool_put (tm->local_endpoints, tep);
+ transport_endpoint_del (tepi);
}
- /* Make sure all timers are cleared */
- tcp_connection_timers_reset (tc);
-
- /* Check if half-open */
+ /* Check if connection is not yet fully established */
if (tc->state == TCP_STATE_SYN_SENT)
{
- tcp_half_open_connection_del (tc);
+ /* Try to remove the half-open connection. If this is not the owning
+ * thread, tc won't be removed. Retransmit or establish timers will
+ * eventually expire and call again cleanup on the right thread. */
+ tcp_half_open_connection_cleanup (tc);
}
else
{
int thread_index = tc->c_thread_index;
+
+ /* Make sure all timers are cleared */
+ tcp_connection_timers_reset (tc);
+
/* Poison the entry */
if (CLIB_DEBUG > 0)
memset (tc, 0xFA, sizeof (*tc));
tcp_connection_cleanup (tc);
}
-/**
- * Cleanup half-open connection
- */
-void
-tcp_half_open_connection_del (tcp_connection_t * tc)
-{
- tcp_main_t *tm = vnet_get_tcp_main ();
- if (CLIB_DEBUG)
- memset (tc, 0xFA, sizeof (*tc));
- clib_spinlock_lock_if_init (&tm->half_open_lock);
- pool_put (tm->half_open_connections, tc);
- clib_spinlock_unlock_if_init (&tm->half_open_lock);
-}
-
-tcp_connection_t *
-tcp_half_open_connection_new ()
-{
- tcp_main_t *tm = vnet_get_tcp_main ();
- tcp_connection_t *tc = 0;
- clib_spinlock_lock_if_init (&tm->half_open_lock);
- pool_get (tm->half_open_connections, tc);
- clib_spinlock_unlock_if_init (&tm->half_open_lock);
- memset (tc, 0, sizeof (*tc));
- return tc;
-}
-
tcp_connection_t *
tcp_connection_new (u8 thread_index)
{
tcp_connection_cleanup (tc);
break;
case TCP_STATE_SYN_SENT:
- /* XXX remove sst from call */
- stream_session_connect_notify (&tc->connection, tc->connection.proto,
- 1 /* fail */ );
+ stream_session_connect_notify (&tc->connection, 1 /* fail */ );
tcp_connection_cleanup (tc);
break;
case TCP_STATE_ESTABLISHED:
stream_session_reset_notify (&tc->connection);
/* Wait for cleanup from session layer but not forever */
- tcp_timer_set (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
+ tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
break;
case TCP_STATE_CLOSED:
return;
* table to mark the pair as used.
*/
int
-tcp_allocate_local_port (tcp_main_t * tm, ip46_address_t * ip)
+tcp_allocate_local_port (ip46_address_t * ip)
{
+ tcp_main_t *tm = vnet_get_tcp_main ();
transport_endpoint_t *tep;
u32 time_now, tei;
u16 min = 1024, max = 65535; /* XXX configurable ? */
/* Only support active opens from thread 0 */
ASSERT (vlib_get_thread_index () == 0);
- /* Start at random point or max */
- pool_get (tm->local_endpoints, tep);
- clib_memcpy (&tep->ip, ip, sizeof (*ip));
-
/* Search for first free slot */
for (; tries >= 0; tries--)
{
break;
}
- tep->port = port;
-
/* Look it up */
- tei = transport_endpoint_lookup (&tm->local_endpoints_table, &tep->ip,
- tep->port);
+ tei = transport_endpoint_lookup (&tm->local_endpoints_table, ip, port);
/* If not found, we're done */
if (tei == TRANSPORT_ENDPOINT_INVALID_INDEX)
{
+ clib_spinlock_lock_if_init (&tm->local_endpoints_lock);
+ tep = transport_endpoint_new ();
+ clib_memcpy (&tep->ip, ip, sizeof (*ip));
+ tep->port = port;
transport_endpoint_table_add (&tm->local_endpoints_table, tep,
tep - tm->local_endpoints);
+ clib_spinlock_unlock_if_init (&tm->local_endpoints_lock);
+
return tep->port;
}
}
- /* No free ports */
- pool_put (tm->local_endpoints, tep);
return -1;
}
}
/* Allocate source port */
- lcl_port = tcp_allocate_local_port (tm, &lcl_addr);
+ lcl_port = tcp_allocate_local_port (&lcl_addr);
if (lcl_port < 1)
{
clib_warning ("Failed to allocate src port");
/*
* Create connection and send SYN
*/
-
+ clib_spinlock_lock_if_init (&tm->half_open_lock);
tc = tcp_half_open_connection_new ();
-
clib_memcpy (&tc->c_rmt_ip, &rmt->ip, sizeof (ip46_address_t));
clib_memcpy (&tc->c_lcl_ip, &lcl_addr, sizeof (ip46_address_t));
tc->c_rmt_port = clib_host_to_net_u16 (rmt->port);
tc->c_lcl_port = clib_host_to_net_u16 (lcl_port);
- tc->c_c_index = tc - tm->half_open_connections;
tc->c_is_ip4 = rmt->is_ip4;
- tc->c_proto = rmt->is_ip4 ? SESSION_TYPE_IP4_TCP : SESSION_TYPE_IP6_TCP;
+ tc->c_transport_proto = TRANSPORT_PROTO_TCP;
tc->c_vrf = rmt->vrf;
/* The other connection vars will be initialized after SYN ACK */
tcp_connection_timers_init (tc);
TCP_EVT_DBG (TCP_EVT_OPEN, tc);
tc->state = TCP_STATE_SYN_SENT;
tcp_send_syn (tc);
+ clib_spinlock_unlock_if_init (&tm->half_open_lock);
return tc->c_c_index;
}
tcp_timer_establish_handler (u32 conn_index)
{
tcp_connection_t *tc;
- u8 sst;
tc = tcp_half_open_connection_get (conn_index);
tc->timers[TCP_TIMER_ESTABLISH] = TCP_TIMER_HANDLE_INVALID;
ASSERT (tc->state == TCP_STATE_SYN_SENT);
-
- sst = tc->c_is_ip4 ? SESSION_TYPE_IP4_TCP : SESSION_TYPE_IP6_TCP;
- stream_session_connect_notify (&tc->connection, sst, 1 /* fail */ );
-
+ stream_session_connect_notify (&tc->connection, 1 /* fail */ );
tcp_connection_cleanup (tc);
}
tcp_connection_t *tc;
tc = tcp_connection_get (conn_index, thread_index);
+ if (!tc)
+ return;
tc->timers[TCP_TIMER_WAITCLOSE] = TCP_TIMER_HANDLE_INVALID;
/* Session didn't come back with a close(). Send FIN either way
ip4_register_protocol (IP_PROTOCOL_TCP, tcp4_input_node.index);
/* Register as transport with session layer */
- session_register_transport (SESSION_TYPE_IP4_TCP, &tcp_proto);
- session_register_transport (SESSION_TYPE_IP6_TCP, &tcp_proto);
+ session_register_transport (TRANSPORT_PROTO_TCP, 1, &tcp_proto);
+ session_register_transport (TRANSPORT_PROTO_TCP, 0, &tcp_proto);
/*
* Initialize data structures
200000 /* $$$$ config parameter nbuckets */ ,
(64 << 20) /*$$$ config parameter table size */ );
if (num_threads > 1)
- clib_spinlock_init (&tm->half_open_lock);
+ {
+ clib_spinlock_init (&tm->half_open_lock);
+ clib_spinlock_init (&tm->local_endpoints_lock);
+ }
return error;
}
_(SENT_RCV_WND0, "Sent 0 receive window") \
_(RECOVERY, "Recovery on") \
_(FAST_RECOVERY, "Fast Recovery on") \
- _(FR_1_SMSS, "Sent 1 SMSS")
+ _(FR_1_SMSS, "Sent 1 SMSS") \
+ _(HALF_OPEN_DONE, "Half-open completed")
typedef enum _tcp_connection_flag_bits
{
/* Local endpoints lookup table */
transport_endpoint_table_t local_endpoints_table;
+ clib_spinlock_t local_endpoints_lock;
/* Congestion control algorithms registered */
tcp_cc_algorithm_t *cc_algos;
always_inline tcp_connection_t *
tcp_connection_get (u32 conn_index, u32 thread_index)
{
- if (pool_is_free_index (tcp_main.connections[thread_index], conn_index))
+ if (PREDICT_FALSE
+ (pool_is_free_index (tcp_main.connections[thread_index], conn_index)))
return 0;
return pool_elt_at_index (tcp_main.connections[thread_index], conn_index);
}
void tcp_connection_close (tcp_connection_t * tc);
void tcp_connection_cleanup (tcp_connection_t * tc);
void tcp_connection_del (tcp_connection_t * tc);
-void tcp_half_open_connection_del (tcp_connection_t * tc);
+int tcp_half_open_connection_cleanup (tcp_connection_t * tc);
tcp_connection_t *tcp_connection_new (u8 thread_index);
void tcp_connection_reset (tcp_connection_t * tc);
always_inline tcp_connection_t *
tcp_half_open_connection_get (u32 conn_index)
{
- if (pool_is_free_index (tcp_main.half_open_connections, conn_index))
- return 0;
- return pool_elt_at_index (tcp_main.half_open_connections, conn_index);
+ tcp_connection_t *tc = 0;
+ clib_spinlock_lock_if_init (&tcp_main.half_open_lock);
+ if (!pool_is_free_index (tcp_main.half_open_connections, conn_index))
+ tc = pool_elt_at_index (tcp_main.half_open_connections, conn_index);
+ clib_spinlock_unlock_if_init (&tcp_main.half_open_lock);
+ return tc;
}
void tcp_make_ack (tcp_connection_t * ts, vlib_buffer_t * b);
#include <vlib/vlib.h>
#define TCP_DEBUG (1)
-#define TCP_DEBUG_SM (2)
-#define TCP_DEBUG_CC (0)
-#define TCP_DEBUG_CC_STAT (0)
+#define TCP_DEBUG_SM (0)
+#define TCP_DEBUG_CC (1)
+#define TCP_DEBUG_CC_STAT (1)
#define foreach_tcp_dbg_evt \
_(INIT, "") \
ed->data[0] = _tc->c_c_index; \
}
+#define TCP_EVT_SYN_RCVD_HANDLER(_tc, ...) \
+{ \
+ TCP_EVT_INIT_HANDLER(_tc, 0); \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "syn-rx: irs %u", \
+ .format_args = "i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 1); \
+ ed->data[0] = _tc->irs; \
+ TCP_EVT_STATE_CHANGE_HANDLER(_tc); \
+}
+
#define TCP_EVT_UNBIND_HANDLER(_tc, ...) \
{ \
TCP_EVT_DEALLOC_HANDLER(_tc); \
ed->data[0] = _tc->state; \
}
-#define TCP_EVT_SYN_RCVD_HANDLER(_tc, ...) \
-{ \
- TCP_EVT_INIT_HANDLER(_tc, 0); \
- ELOG_TYPE_DECLARE (_e) = \
- { \
- .format = "syn-rx: irs %u", \
- .format_args = "i4", \
- }; \
- DECLARE_ETD(_tc, _e, 1); \
- ed->data[0] = _tc->irs; \
- TCP_EVT_STATE_CHANGE_HANDLER(_tc); \
-}
-
#define TCP_EVT_SYN_SENT_HANDLER(_tc, ...) \
{ \
ELOG_TYPE_DECLARE (_e) = \
* in CLOSE-WAIT, set timer (reuse WAITCLOSE). */
tc0->state = TCP_STATE_CLOSE_WAIT;
TCP_EVT_DBG (TCP_EVT_FIN_RCVD, tc0);
- tc0->rcv_nxt += (vnet_buffer (b0)->tcp.data_len == 0);
+ if (vnet_buffer (b0)->tcp.data_len == 0)
+ {
+ tc0->rcv_nxt += 1;
+ next0 = TCP_ESTABLISHED_NEXT_DROP;
+ }
stream_session_disconnect_notify (&tc0->connection);
- tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
+ tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
}
done:
tcp_main_t *tm = vnet_get_tcp_main ();
u32 n_left_from, next_index, *from, *to_next;
u32 my_thread_index = vm->thread_index, errors = 0;
- u8 sst = is_ip4 ? SESSION_TYPE_IP4_TCP : SESSION_TYPE_IP6_TCP;
from = vlib_frame_vector_args (from_frame);
n_left_from = from_frame->n_vectors;
if (tcp_options_parse (tcp0, &tc0->rcv_opts))
goto drop;
- /* Stop connection establishment and retransmit timers */
- tcp_timer_reset (tc0, TCP_TIMER_ESTABLISH);
- tcp_timer_reset (tc0, TCP_TIMER_RETRANSMIT_SYN);
-
/* Valid SYN or SYN-ACK. Move connection from half-open pool to
* current thread pool. */
pool_get (tm->connections[my_thread_index], new_tc0);
new_tc0->c_thread_index = my_thread_index;
new_tc0->rcv_nxt = vnet_buffer (b0)->tcp.seq_end;
new_tc0->irs = seq0;
- tcp_half_open_connection_del (tc0);
+ new_tc0->timers[TCP_TIMER_ESTABLISH] = TCP_TIMER_HANDLE_INVALID;
+ new_tc0->timers[TCP_TIMER_RETRANSMIT_SYN] =
+ TCP_TIMER_HANDLE_INVALID;
+
+ /* If this is not the owning thread, wait for syn retransmit to
+ * expire and cleanup then */
+ if (tcp_half_open_connection_cleanup (tc0))
+ tc0->flags |= TCP_CONN_HALF_OPEN_DONE;
if (tcp_opts_tstamp (&new_tc0->rcv_opts))
{
/* Notify app that we have connection. If session layer can't
* allocate session send reset */
- if (stream_session_connect_notify (&new_tc0->connection, sst,
- 0))
+ if (stream_session_connect_notify (&new_tc0->connection, 0))
{
+ tcp_send_reset (new_tc0, b0, is_ip4);
tcp_connection_cleanup (new_tc0);
- tcp_send_reset (tc0, b0, is_ip4);
goto drop;
}
new_tc0->state = TCP_STATE_SYN_RCVD;
/* Notify app that we have connection */
- if (stream_session_connect_notify
- (&new_tc0->connection, sst, 0))
+ if (stream_session_connect_notify (&new_tc0->connection, 0))
{
tcp_connection_cleanup (new_tc0);
tcp_send_reset (tc0, b0, is_ip4);
if (tc0->snd_una == tc0->snd_una_max)
{
ASSERT (tcp_fin (tcp0));
+ tc0->rcv_nxt += 1;
tc0->state = TCP_STATE_FIN_WAIT_2;
TCP_EVT_DBG (TCP_EVT_STATE_CHANGE, tc0);
* acknowledged ("ok") but do not delete the TCB. */
if (tcp_rcv_ack (tc0, b0, tcp0, &next0, &error0))
goto drop;
+
/* check if rtx queue is empty and ack CLOSE TODO */
break;
case TCP_STATE_CLOSE_WAIT:
/* Got FIN, send ACK! */
tc0->state = TCP_STATE_TIME_WAIT;
tcp_connection_timers_reset (tc0);
- tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
+ tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
tcp_make_ack (tc0, b0);
next0 = tcp_next_output (is_ip4);
TCP_EVT_DBG (TCP_EVT_STATE_CHANGE, tc0);
if ((tmp =
stream_session_half_open_lookup (&tc->c_lcl_ip, &tc->c_rmt_ip,
tc->c_lcl_port, tc->c_rmt_port,
- tc->c_proto)))
+ tc->c_transport_proto)))
{
if (tmp->lcl_port == hdr->dst_port
&& tmp->rmt_port == hdr->src_port)
if (is_syn)
{
tc = tcp_half_open_connection_get (index);
+ tc->timers[TCP_TIMER_RETRANSMIT_SYN] = TCP_TIMER_HANDLE_INVALID;
}
else
{
tc = tcp_connection_get (index, thread_index);
+ tc->timers[TCP_TIMER_RETRANSMIT] = TCP_TIMER_HANDLE_INVALID;
}
- /* Make sure timer handle is set to invalid */
- tc->timers[TCP_TIMER_RETRANSMIT] = TCP_TIMER_HANDLE_INVALID;
-
if (!tcp_in_recovery (tc) && tc->rto_boff > 0
&& tc->state >= TCP_STATE_ESTABLISHED)
{
/* Retransmit for SYN/SYNACK */
else if (tc->state == TCP_STATE_SYN_RCVD || tc->state == TCP_STATE_SYN_SENT)
{
+ /* Half-open connection actually moved to established but we were
+ * waiting for syn retransmit to pop to call cleanup from the right
+ * thread. */
+ if (tc->flags & TCP_CONN_HALF_OPEN_DONE)
+ {
+ ASSERT (tc->state == TCP_STATE_SYN_SENT);
+ if (tcp_half_open_connection_cleanup (tc))
+ {
+ clib_warning ("could not remove half-open connection");
+ ASSERT (0);
+ }
+ return;
+ }
+
/* Try without increasing RTO a number of times. If this fails,
* start growing RTO exponentially */
if (tc->rto_boff > TCP_RTO_SYN_RETRIES)
tc->connection.rmt_ip.ip4.as_u32 = clib_host_to_net_u32 (0x06000103);
tc->connection.lcl_port = 35051;
tc->connection.rmt_port = 53764;
- tc->connection.proto = 0;
+ tc->connection.transport_proto = 0;
clib_memcpy (tc1, &tc->connection, sizeof (*tc1));
pool_get (session_manager_main.sessions[0], s);
tc->connection.rmt_ip.ip4.as_u32 = clib_host_to_net_u32 (0x06000102);
tc->connection.lcl_port = 38225;
tc->connection.rmt_port = 53764;
- tc->connection.proto = 0;
+ tc->connection.transport_proto = 0;
clib_memcpy (tc2, &tc->connection, sizeof (*tc2));
/*
tconn = stream_session_lookup_transport_wt4 (&tc1->lcl_ip.ip4,
&tc1->rmt_ip.ip4,
tc1->lcl_port, tc1->rmt_port,
- tc1->proto, 0);
+ tc1->transport_proto, 0);
cmp = (memcmp (&tconn->rmt_ip, &tc1->rmt_ip, sizeof (tc1->rmt_ip)) == 0);
TCP_TEST ((cmp), "rmt ip is identical %d", cmp);
TCP_TEST ((tconn->lcl_port == tc1->lcl_port),
tconn = stream_session_lookup_transport_wt4 (&tc2->lcl_ip.ip4,
&tc2->rmt_ip.ip4,
tc2->lcl_port, tc2->rmt_port,
- tc2->proto, 0);
+ tc2->transport_proto, 0);
TCP_TEST ((tconn == 0), "lookup result should be null");
/*
tconn = stream_session_lookup_transport_wt4 (&tc1->lcl_ip.ip4,
&tc1->rmt_ip.ip4,
tc1->lcl_port, tc1->rmt_port,
- tc1->proto, 0);
+ tc1->transport_proto, 0);
TCP_TEST ((tconn == 0), "lookup result should be null");
tconn = stream_session_lookup_transport_wt4 (&tc2->lcl_ip.ip4,
&tc2->rmt_ip.ip4,
tc2->lcl_port, tc2->rmt_port,
- tc2->proto, 0);
+ tc2->transport_proto, 0);
TCP_TEST ((tconn == 0), "lookup result should be null");
/*
tconn = stream_session_lookup_transport_wt4 (&tc2->lcl_ip.ip4,
&tc2->rmt_ip.ip4,
tc2->lcl_port, tc2->rmt_port,
- tc2->proto, 0);
+ tc2->transport_proto, 0);
TCP_TEST ((tconn == 0), "lookup result should be null");
return 0;
memset (listener, 0, sizeof (udp_connection_t));
listener->c_lcl_port = clib_host_to_net_u16 (lcl->port);
listener->c_lcl_ip4.as_u32 = lcl->ip.ip4.as_u32;
- listener->c_proto = SESSION_TYPE_IP4_UDP;
+ listener->c_transport_proto = TRANSPORT_PROTO_UDP;
udp_register_dst_port (um->vlib_main, lcl->port, udp4_uri_input_node.index,
1 /* is_ipv4 */ );
return 0;
pool_get (um->udp_listeners, listener);
listener->c_lcl_port = clib_host_to_net_u16 (lcl->port);
clib_memcpy (&listener->c_lcl_ip6, &lcl->ip.ip6, sizeof (ip6_address_t));
- listener->c_proto = SESSION_TYPE_IP6_UDP;
+ listener->c_transport_proto = TRANSPORT_PROTO_UDP;
udp_register_dst_port (um->vlib_main, lcl->port,
udp4_uri_input_node.index, 0 /* is_ipv4 */ );
return 0;
/* Register as transport with URI */
- session_register_transport (SESSION_TYPE_IP4_UDP, &udp4_proto);
- session_register_transport (SESSION_TYPE_IP6_UDP, &udp6_proto);
+ session_register_transport (TRANSPORT_PROTO_UDP, 1, &udp4_proto);
+ session_register_transport (TRANSPORT_PROTO_UDP, 0, &udp6_proto);
/*
* Initialize data structures
us->c_rmt_ip4.as_u32 = ip0->src_address.as_u32;
us->c_lcl_port = udp0->dst_port;
us->c_rmt_port = udp0->src_port;
- us->c_proto = SESSION_TYPE_IP4_UDP;
+ us->c_transport_proto = TRANSPORT_PROTO_UDP;
us->c_c_index = us - um->udp_sessions[my_thread_index];
/*