Memfd backed shared memory segments can only be negotiated over sockets.
For such scenarios, the existing redirect mechanism that establishes
cut-through sessions does not work anymore as the two peer application
do not share such a socket.
This patch adds support for local sessions, as opposed to sessions
backed by a transport connection, in a way that is almost transparent to
the two applications by reusing the existing binary api messages.
Moreover, all segment allocations are now entirely done through the
segment manager valloc, so segment overlaps due to independent
allocations previously required for redirects are completely avoided.
The one notable characteristic of local sessions (cut-through from app
perspective) notification messages is that they carry pointers to two
event queues, one for each app peer, instead of one. For
transport-backed sessions one of the queues can be inferred but for
local session they cannot.
Change-Id: Ia443fb63e2d9d8e43490275062a708f039038175
Signed-off-by: Florin Coras <fcoras@cisco.com>
clib_mem_vm_map_t mapa = { 0 };
u8 junk = 0, *ssvm_filename;
ssvm_shared_header_t *sh;
- uword page_size;
+ uword page_size, requested_va = 0;
void *oldheap;
if (ssvm->ssvm_size == 0)
page_size = clib_mem_vm_get_page_size (ssvm_fd);
if (ssvm->requested_va)
- clib_mem_vm_randomize_va (&ssvm->requested_va, min_log2 (page_size));
+ {
+ requested_va = ssvm->requested_va;
+ clib_mem_vm_randomize_va (&requested_va, min_log2 (page_size));
+ }
- mapa.requested_va = ssvm->requested_va;
+ mapa.requested_va = requested_va;
mapa.size = ssvm->ssvm_size;
mapa.fd = ssvm_fd;
if (clib_mem_vm_ext_map (&mapa))
#define FIFO_SEGMENT_MAX_FIFO_SIZE (8<<20) /* 8mb max fifo size */
#define FIFO_SEGMENT_ALLOC_CHUNK_SIZE 32 /* Allocation quantum */
-#define FIFO_SEGMENT_F_IS_PREALLOCATED 1 << 0 /* Segment is preallocated */
-#define FIFO_SEGMENT_F_WILL_DELETE 1 << 1 /* Segment will be removed */
+#define FIFO_SEGMENT_F_IS_PREALLOCATED (1 << 0)
+#define FIFO_SEGMENT_F_WILL_DELETE (1 << 1)
typedef struct
{
rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
rmp->retval = rv;
rmp->handle = mp->handle;
+ rmp->context = mp->context;
vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
if (session)
svm_fifo_segment_main_t *segment_main;
u8 *connect_test_data;
+
+ uword *segments_table;
} uri_udp_test_main_t;
#if CLIB_DEBUG > 0
bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
bmp->client_index = utm->my_client_index;
bmp->context = ntohl (0xfeedface);
- bmp->options[APP_OPTIONS_FLAGS] =
- APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT;
+ bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ADD_SEGMENT;
+ bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
+ bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 2;
bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = fifo_size;
bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = fifo_size;
/* We read from the tx fifo and write to the rx fifo */
do
{
- actual_transfer = svm_fifo_dequeue_nowait (tx_fifo,
+ actual_transfer = svm_fifo_dequeue_nowait (rx_fifo,
vec_len (my_copy_buffer),
my_copy_buffer);
}
buffer_offset = 0;
while (actual_transfer > 0)
{
- rv = svm_fifo_enqueue_nowait (rx_fifo, actual_transfer,
+ rv = svm_fifo_enqueue_nowait (tx_fifo, actual_transfer,
my_copy_buffer + buffer_offset);
if (rv > 0)
{
static void
vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
{
+ uri_udp_test_main_t *utm = &uri_udp_test_main;
svm_fifo_segment_create_args_t _a, *a = &_a;
+ svm_fifo_segment_private_t *seg;
+ u8 *seg_name;
int rv;
memset (a, 0, sizeof (*a));
mp->segment_name);
return;
}
- clib_warning ("Mapped new segment '%s' size %d", mp->segment_name,
- mp->segment_size);
+ seg = svm_fifo_segment_get_segment (a->new_segment_indices[0]);
+ clib_warning ("Mapped new segment '%s' size %d", seg->ssvm.name,
+ seg->ssvm.ssvm_size);
+ seg_name = format (0, "%s", (char *) mp->segment_name);
+ hash_set_mem (utm->segments_table, seg_name, a->new_segment_indices[0]);
+ vec_free (seg_name);
+}
+
+static void
+vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp)
+{
+ uri_udp_test_main_t *utm = &uri_udp_test_main;
+ svm_fifo_segment_private_t *seg;
+ u64 *seg_indexp;
+ u8 *seg_name;
+
+
+ seg_name = format (0, "%s", mp->segment_name);
+ seg_indexp = hash_get_mem (utm->segments_table, seg_name);
+ if (!seg_indexp)
+ {
+ clib_warning ("segment not mapped: %s", seg_name);
+ return;
+ }
+ hash_unset_mem (utm->segments_table, seg_name);
+ seg = svm_fifo_segment_get_segment ((u32) seg_indexp[0]);
+ svm_fifo_segment_delete (seg);
+ clib_warning ("Unmapped segment '%s'", seg_name);
+ vec_free (seg_name);
}
/**
svm_fifo_t *rx_fifo, *tx_fifo;
session_t *session;
static f64 start_time;
+ u32 session_index;
+ int rv = 0;
if (start_time == 0.0)
start_time = clib_time_now (&utm->clib_time);
utm->vpp_event_queue =
uword_to_pointer (mp->vpp_event_queue_address, svm_queue_t *);
- pool_get (utm->sessions, session);
-
rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
- rx_fifo->client_session_index = session - utm->sessions;
tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
- tx_fifo->client_session_index = session - utm->sessions;
- session->server_rx_fifo = rx_fifo;
- session->server_tx_fifo = tx_fifo;
+ pool_get (utm->sessions, session);
+ memset (session, 0, sizeof (*session));
+ session_index = session - utm->sessions;
- hash_set (utm->session_index_by_vpp_handles, mp->handle,
- session - utm->sessions);
+ /* Cut-through case */
+ if (mp->server_event_queue_address)
+ {
+ clib_warning ("cut-through session");
+ utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address,
+ svm_queue_t *);
+ rx_fifo->master_session_index = session_index;
+ tx_fifo->master_session_index = session_index;
+ utm->cut_through_session_index = session_index;
+ session->server_rx_fifo = rx_fifo;
+ session->server_tx_fifo = tx_fifo;
+
+ rv = pthread_create (&utm->cut_through_thread_handle,
+ NULL /*attr */ , cut_through_thread_fn, 0);
+ if (rv)
+ {
+ clib_warning ("pthread_create returned %d", rv);
+ rv = VNET_API_ERROR_SYSCALL_ERROR_1;
+ }
+ }
+ else
+ {
+ rx_fifo->client_session_index = session_index;
+ tx_fifo->client_session_index = session_index;
+ session->server_rx_fifo = rx_fifo;
+ session->server_tx_fifo = tx_fifo;
+ }
+ hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index);
if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
{
f64 now = clib_time_now (&utm->clib_time);
rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
rmp->handle = mp->handle;
rmp->context = mp->context;
+ rmp->retval = rv;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
CLIB_MEMORY_BARRIER ();
rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
rmp->retval = rv;
rmp->handle = mp->handle;
+ rmp->context = mp->context;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
}
return;
}
- /* We've been redirected */
- if (mp->segment_name_length > 0)
- {
- svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
- svm_fifo_segment_create_args_t _a, *a = &_a;
- u32 segment_index;
- svm_fifo_segment_private_t *seg;
- int rv;
-
- memset (a, 0, sizeof (*a));
- a->segment_name = (char *) mp->segment_name;
-
- sleep (1);
-
- rv = svm_fifo_segment_attach (a);
- if (rv)
- {
- clib_warning ("sm_fifo_segment_create ('%v') failed",
- mp->segment_name);
- return;
- }
-
- segment_index = a->new_segment_indices[0];
- vec_add2 (utm->seg, seg, 1);
- memcpy (seg, sm->segments + segment_index, sizeof (*seg));
- sleep (1);
- }
-
pool_get (utm->sessions, session);
session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo,
svm_fifo_t *);
svm_fifo_t *);
ASSERT (session->server_tx_fifo);
- if (mp->segment_name_length > 0)
- utm->cut_through_session_index = session - utm->sessions;
+ /* Cut-through case */
+ if (mp->client_event_queue_address)
+ {
+ clib_warning ("cut-through session");
+ utm->cut_through_session_index = session - utm->sessions;
+ utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+ svm_queue_t *);
+ utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address,
+ svm_queue_t *);
+ }
else
{
utm->connected_session = session - utm->sessions;
_(ACCEPT_SESSION, accept_session) \
_(DISCONNECT_SESSION, disconnect_session) \
_(MAP_ANOTHER_SEGMENT, map_another_segment) \
+_(UNMAP_SEGMENT, unmap_segment) \
_(APPLICATION_ATTACH_REPLY, application_attach_reply) \
_(APPLICATION_DETACH_REPLY, application_detach_reply) \
utm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
utm->my_pid = getpid ();
utm->configured_segment_size = 1 << 20;
+ utm->segments_table = hash_create_vec (0, sizeof (u8), sizeof (u64));
clib_time_init (&utm->clib_time);
init_error_string_table (utm);
nobase_include_HEADERS += \
vcl/sock_test.h
+
+# vi:syntax=automake
.session_disconnect_callback = echo_server_session_disconnect_callback,
.session_connected_callback = echo_server_session_connected_callback,
.add_segment_callback = echo_server_add_segment_callback,
- .redirect_connect_callback = echo_server_redirect_connect_callback,
.builtin_server_rx_callback = echo_server_rx_callback,
.session_reset_callback = echo_server_session_reset_callback
};
u8 server_uri_set = 0, *appns_id = 0;
u64 tmp, appns_flags = 0, appns_secret = 0;
char *default_uri = "tcp://0.0.0.0/1234";
- int rv;
+ int rv, is_stop = 0;
esm->no_echo = 0;
esm->fifo_size = 64 << 10;
appns_flags |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
else if (unformat (input, "secret %lu", &appns_secret))
;
+ else if (unformat (input, "stop"))
+ is_stop = 1;
else
return clib_error_return (0, "failed: unknown input `%U'",
format_unformat_error, input);
}
+ if (is_stop)
+ {
+ if (esm->app_index == (u32) ~ 0)
+ {
+ clib_warning ("server not running");
+ return clib_error_return (0, "failed: server not running");
+ }
+ rv = echo_server_detach ();
+ if (rv)
+ {
+ clib_warning ("failed: detach");
+ return clib_error_return (0, "failed: server detach %d", rv);
+ }
+ return 0;
+ }
+
vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
if (!server_uri_set)
return -1;
}
-static int
-http_server_redirect_connect_callback (u32 client_index, void *mp)
-{
- clib_warning ("called...");
- return -1;
-}
-
static session_cb_vft_t http_server_session_cb_vft = {
.session_accept_callback = http_server_session_accept_callback,
.session_disconnect_callback = http_server_session_disconnect_callback,
.session_connected_callback = http_server_session_connected_callback,
.add_segment_callback = http_server_add_segment_callback,
- .redirect_connect_callback = http_server_redirect_connect_callback,
.builtin_server_rx_callback = http_server_rx_callback,
.session_reset_callback = http_server_session_reset_callback
};
return -1;
}
-static int
-proxy_redirect_connect_callback (u32 client_index, void *mp)
-{
- clib_warning ("called...");
- return -1;
-}
-
static int
proxy_rx_callback (stream_session_t * s)
{
.session_disconnect_callback = proxy_disconnect_callback,
.session_connected_callback = proxy_connected_callback,
.add_segment_callback = proxy_add_segment_callback,
- .redirect_connect_callback = proxy_redirect_connect_callback,
.builtin_server_rx_callback = proxy_rx_callback,
.session_reset_callback = proxy_reset_callback
};
*/
static uword *app_by_api_client_index;
-/**
- * Default application event queue size
- */
-static u32 default_app_evt_queue_size = 128;
-
static u8 *
app_get_name_from_reg_index (application_t * app)
{
app->index = application_get_index (app);
app->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
app->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
+ app->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
if (CLIB_DEBUG > 1)
clib_warning ("[%d] New app (%d)", getpid (), app->index);
return app;
application_del (application_t * app)
{
vnet_unbind_args_t _a, *a = &_a;
- segment_manager_t *sm;
u64 handle, *handles = 0;
+ segment_manager_t *sm;
u32 index;
int i;
segment_manager_del (sm);
}
}
+
+ /*
+ * Local connections cleanup
+ */
+ application_local_sessions_del (app);
+
application_table_del (app);
pool_put (app_pool, app);
}
application_init (application_t * app, u32 api_client_index, u64 * options,
session_cb_vft_t * cb_fns)
{
- u32 app_evt_queue_size, first_seg_size, prealloc_fifo_pairs;
ssvm_segment_type_t seg_type = SSVM_SEGMENT_MEMFD;
+ u32 first_seg_size, prealloc_fifo_pairs;
segment_manager_properties_t *props;
vl_api_registration_t *reg;
segment_manager_t *sm;
props->rx_fifo_size = options[APP_OPTIONS_RX_FIFO_SIZE];
if (options[APP_OPTIONS_TX_FIFO_SIZE])
props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE];
+ if (options[APP_OPTIONS_EVT_QUEUE_SIZE])
+ props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE];
props->segment_type = seg_type;
- app_evt_queue_size = options[APP_OPTIONS_EVT_QUEUE_SIZE] > 0 ?
- options[APP_OPTIONS_EVT_QUEUE_SIZE] : default_app_evt_queue_size;
first_seg_size = options[APP_OPTIONS_SEGMENT_SIZE];
prealloc_fifo_pairs = options[APP_OPTIONS_PREALLOC_FIFO_PAIRS];
- if ((rv = segment_manager_init (sm, first_seg_size, app_evt_queue_size,
- prealloc_fifo_pairs)))
+ if ((rv = segment_manager_init (sm, first_seg_size, prealloc_fifo_pairs)))
return rv;
sm->first_is_protected = 1;
app->cb_fns = *cb_fns;
app->ns_index = options[APP_OPTIONS_NAMESPACE];
app->listeners_table = hash_create (0, sizeof (u64));
+ app->local_connects = hash_create (0, sizeof (u64));
app->proxied_transports = options[APP_OPTIONS_PROXY_TRANSPORT];
app->event_queue = segment_manager_event_queue (sm);
/* Add app to lookup by api_client_index table */
application_table_add (app);
+ /*
+ * Segment manager for local sessions
+ */
+ sm = segment_manager_new ();
+ sm->app_index = app->index;
+ app->local_segment_manager = segment_manager_index (sm);
+
return 0;
}
*/
int
application_start_listen (application_t * srv, session_endpoint_t * sep,
- u64 * res)
+ session_handle_t * res)
{
segment_manager_t *sm;
stream_session_t *s;
- u64 handle;
+ session_handle_t handle;
session_type_t sst;
sst = session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4);
* Stop listening on session associated to handle
*/
int
-application_stop_listen (application_t * srv, u64 handle)
+application_stop_listen (application_t * srv, session_handle_t handle)
{
stream_session_t *listener;
uword *indexp;
return segment_manager_get (*smp);
}
+segment_manager_t *
+application_get_local_segment_manager (application_t * app)
+{
+ return segment_manager_get (app->local_segment_manager);
+}
+
+segment_manager_t *
+application_get_local_segment_manager_w_session (application_t * app,
+ local_session_t * ls)
+{
+ stream_session_t *listener;
+ if (application_local_session_listener_has_transport (ls))
+ {
+ listener = listen_session_get (ls->listener_session_type,
+ ls->listener_index);
+ return application_get_listen_segment_manager (app, listener);
+ }
+ return segment_manager_get (app->local_segment_manager);
+}
+
int
application_is_proxy (application_t * app)
{
return &app->sm_properties;
}
+local_session_t *
+application_alloc_local_session (application_t * app)
+{
+ local_session_t *s;
+ pool_get (app->local_sessions, s);
+ memset (s, 0, sizeof (*s));
+ s->app_index = app->index;
+ s->session_index = s - app->local_sessions;
+ s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
+ return s;
+}
+
+void
+application_free_local_session (application_t * app, local_session_t * s)
+{
+ pool_put (app->local_sessions, s);
+ if (CLIB_DEBUG)
+ memset (s, 0xfc, sizeof (*s));
+}
+
+local_session_t *
+application_get_local_session (application_t * app, u32 session_index)
+{
+ return pool_elt_at_index (app->local_sessions, session_index);
+}
+
+local_session_t *
+application_get_local_session_from_handle (session_handle_t handle)
+{
+ application_t *server;
+ u32 session_index, server_index;
+ local_session_parse_handle (handle, &server_index, &session_index);
+ server = application_get (server_index);
+ return application_get_local_session (server, session_index);
+}
+
+always_inline void
+application_local_listener_session_endpoint (local_session_t * ll,
+ session_endpoint_t * sep)
+{
+ sep->transport_proto =
+ session_type_transport_proto (ll->listener_session_type);
+ sep->port = ll->port;
+ sep->is_ip4 = ll->listener_session_type & 1;
+}
+
+int
+application_start_local_listen (application_t * server,
+ session_endpoint_t * sep,
+ session_handle_t * handle)
+{
+ session_handle_t lh;
+ local_session_t *ll;
+ u32 table_index;
+
+ table_index = application_local_session_table (server);
+
+ /* An exact sep match, as opposed to session_lookup_local_listener */
+ lh = session_lookup_endpoint_listener (table_index, sep, 1);
+ if (lh != SESSION_INVALID_HANDLE)
+ return VNET_API_ERROR_ADDRESS_IN_USE;
+
+ pool_get (server->local_listen_sessions, ll);
+ memset (ll, 0, sizeof (*ll));
+ ll->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
+ ll->app_index = server->index;
+ ll->session_index = ll - server->local_listen_sessions;
+ ll->port = sep->port;
+ /* Store the original session type for the unbind */
+ ll->listener_session_type =
+ session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4);
+
+ *handle = application_local_session_handle (ll);
+ session_lookup_add_session_endpoint (table_index, sep, *handle);
+
+ return 0;
+}
+
+/**
+ * Clean up local session table. If we have a listener session use it to
+ * find the port and proto. If not, the handle must be a local table handle
+ * so parse it.
+ */
+int
+application_stop_local_listen (application_t * server, session_handle_t lh)
+{
+ session_endpoint_t sep = SESSION_ENDPOINT_NULL;
+ u32 table_index, ll_index, server_index;
+ stream_session_t *sl = 0;
+ local_session_t *ll, *ls;
+
+ table_index = application_local_session_table (server);
+
+ /* We have both local and global table binds. Figure from global what
+ * the sep we should be cleaning up is.
+ */
+ if (!session_handle_is_local (lh))
+ {
+ sl = listen_session_get_from_handle (lh);
+ if (!sl || listen_session_get_local_session_endpoint (sl, &sep))
+ {
+ clib_warning ("broken listener");
+ return -1;
+ }
+ lh = session_lookup_endpoint_listener (table_index, &sep, 0);
+ if (lh == SESSION_INVALID_HANDLE)
+ return -1;
+ }
+
+ local_session_parse_handle (lh, &server_index, &ll_index);
+ ASSERT (server->index == server_index);
+ if (!(ll = application_get_local_listen_session (server, ll_index)))
+ {
+ clib_warning ("no local listener");
+ return -1;
+ }
+ application_local_listener_session_endpoint (ll, &sep);
+ session_lookup_del_session_endpoint (table_index, &sep);
+
+ /* *INDENT-OFF* */
+ pool_foreach (ls, server->local_sessions, ({
+ if (ls->listener_index == ll->session_index)
+ application_local_session_disconnect (server->index, ls);
+ }));
+ /* *INDENT-ON* */
+ pool_put_index (server->local_listen_sessions, ll->session_index);
+
+ return 0;
+}
+
+int
+application_local_session_connect (u32 table_index, application_t * client,
+ application_t * server,
+ local_session_t * ll, u32 opaque)
+{
+ u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
+ segment_manager_properties_t *props, *cprops;
+ int rv, has_transport, seg_index;
+ svm_fifo_segment_private_t *seg;
+ segment_manager_t *sm;
+ local_session_t *ls;
+ svm_queue_t *sq, *cq;
+
+ ls = application_alloc_local_session (server);
+
+ props = application_segment_manager_properties (server);
+ cprops = application_segment_manager_properties (client);
+ evt_q_elts = props->evt_q_size + cprops->evt_q_size;
+ evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t);
+ seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
+
+ has_transport = session_has_transport ((stream_session_t *) ll);
+ if (!has_transport)
+ {
+ /* Local sessions don't have backing transport */
+ ls->port = ll->port;
+ sm = application_get_local_segment_manager (server);
+ }
+ else
+ {
+ stream_session_t *sl = (stream_session_t *) ll;
+ transport_connection_t *tc;
+ tc = listen_session_get_transport (sl);
+ ls->port = tc->lcl_port;
+ sm = application_get_listen_segment_manager (server, sl);
+ }
+
+ seg_index = segment_manager_add_segment (sm, seg_size);
+ if (seg_index < 0)
+ {
+ clib_warning ("failed to add new cut-through segment");
+ return seg_index;
+ }
+ seg = segment_manager_get_segment_w_lock (sm, seg_index);
+ sq = segment_manager_alloc_queue (seg, props->evt_q_size);
+ cq = segment_manager_alloc_queue (seg, cprops->evt_q_size);
+ ls->server_evt_q = pointer_to_uword (sq);
+ ls->client_evt_q = pointer_to_uword (cq);
+ rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
+ props->tx_fifo_size,
+ &ls->server_rx_fifo,
+ &ls->server_tx_fifo);
+ if (rv)
+ {
+ clib_warning ("failed to add fifos in cut-through segment");
+ segment_manager_segment_reader_unlock (sm);
+ goto failed;
+ }
+ ls->server_rx_fifo->master_session_index = ls->session_index;
+ ls->server_tx_fifo->master_session_index = ls->session_index;
+ ls->server_rx_fifo->master_thread_index = ~0;
+ ls->server_tx_fifo->master_thread_index = ~0;
+ ls->svm_segment_index = seg_index;
+ ls->listener_index = ll->session_index;
+ ls->client_index = client->index;
+ ls->client_opaque = opaque;
+ ls->listener_session_type = ll->session_type;
+
+ if ((rv = server->cb_fns.add_segment_callback (server->api_client_index,
+ &seg->ssvm)))
+ {
+ clib_warning ("failed to notify server of new segment");
+ segment_manager_segment_reader_unlock (sm);
+ goto failed;
+ }
+ segment_manager_segment_reader_unlock (sm);
+ if ((rv = server->cb_fns.session_accept_callback ((stream_session_t *) ls)))
+ {
+ clib_warning ("failed to send accept cut-through notify to server");
+ goto failed;
+ }
+ if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN)
+ application_local_session_connect_notify (ls);
+
+ return 0;
+
+failed:
+ if (!has_transport)
+ segment_manager_del_segment (sm, seg);
+ return rv;
+}
+
+static uword
+application_client_local_connect_key (local_session_t * ls)
+{
+ return ((uword) ls->app_index << 32 | (uword) ls->session_index);
+}
+
+static void
+application_client_local_connect_key_parse (uword key, u32 * app_index,
+ u32 * session_index)
+{
+ *app_index = key >> 32;
+ *session_index = key & 0xFFFFFFFF;
+}
+
+int
+application_local_session_connect_notify (local_session_t * ls)
+{
+ svm_fifo_segment_private_t *seg;
+ application_t *client, *server;
+ segment_manager_t *sm;
+ int rv, is_fail = 0;
+ uword client_key;
+
+ client = application_get (ls->client_index);
+ server = application_get (ls->app_index);
+ sm = application_get_local_segment_manager_w_session (server, ls);
+ seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index);
+ if ((rv = client->cb_fns.add_segment_callback (client->api_client_index,
+ &seg->ssvm)))
+ {
+ clib_warning ("failed to notify client %u of new segment",
+ ls->client_index);
+ segment_manager_segment_reader_unlock (sm);
+ application_local_session_disconnect (ls->client_index, ls);
+ is_fail = 1;
+ }
+ else
+ {
+ segment_manager_segment_reader_unlock (sm);
+ }
+
+ client->cb_fns.session_connected_callback (client->index, ls->client_opaque,
+ (stream_session_t *) ls,
+ is_fail);
+
+ client_key = application_client_local_connect_key (ls);
+ hash_set (client->local_connects, client_key, client_key);
+ return 0;
+}
+
+int
+application_local_session_disconnect (u32 app_index, local_session_t * ls)
+{
+ svm_fifo_segment_private_t *seg;
+ application_t *client, *server;
+ segment_manager_t *sm;
+ uword client_key;
+
+ client = application_get_if_valid (ls->client_index);
+ server = application_get (ls->app_index);
+
+ if (ls->session_state == SESSION_STATE_CLOSED)
+ {
+ cleanup:
+ client_key = application_client_local_connect_key (ls);
+ sm = application_get_local_segment_manager_w_session (server, ls);
+ seg = segment_manager_get_segment (sm, ls->svm_segment_index);
+
+ if (client)
+ {
+ hash_unset (client->local_connects, client_key);
+ client->cb_fns.del_segment_callback (client->api_client_index,
+ &seg->ssvm);
+ }
+
+ server->cb_fns.del_segment_callback (server->api_client_index,
+ &seg->ssvm);
+ segment_manager_del_segment (sm, seg);
+ application_free_local_session (server, ls);
+ return 0;
+ }
+
+ if (app_index == ls->client_index)
+ {
+ send_local_session_disconnect_callback (ls->app_index, ls);
+ }
+ else
+ {
+ if (!client)
+ {
+ goto cleanup;
+ }
+ else if (ls->session_state < SESSION_STATE_READY)
+ {
+ client->cb_fns.session_connected_callback (client->index,
+ ls->client_opaque,
+ (stream_session_t *) ls,
+ 1 /* is_fail */ );
+ ls->session_state = SESSION_STATE_CLOSED;
+ goto cleanup;
+ }
+ else
+ {
+ send_local_session_disconnect_callback (ls->client_index, ls);
+ }
+ }
+
+ ls->session_state = SESSION_STATE_CLOSED;
+
+ return 0;
+}
+
+void
+application_local_sessions_del (application_t * app)
+{
+ u32 index, server_index, session_index, table_index;
+ segment_manager_t *sm;
+ u64 handle, *handles = 0;
+ local_session_t *ls, *ll;
+ application_t *server;
+ session_endpoint_t sep;
+ int i;
+
+ /*
+ * Local listens. Don't bother with local sessions, we clean them lower
+ */
+ table_index = application_local_session_table (app);
+ /* *INDENT-OFF* */
+ pool_foreach (ll, app->local_listen_sessions, ({
+ application_local_listener_session_endpoint (ll, &sep);
+ session_lookup_del_session_endpoint (table_index, &sep);
+ }));
+ /* *INDENT-ON* */
+
+ /*
+ * Local sessions
+ */
+ if (app->local_sessions)
+ {
+ /* *INDENT-OFF* */
+ pool_foreach (ls, app->local_sessions, ({
+ application_local_session_disconnect (app->index, ls);
+ }));
+ /* *INDENT-ON* */
+ }
+
+ /*
+ * Local connects
+ */
+ vec_reset_length (handles);
+ /* *INDENT-OFF* */
+ hash_foreach (handle, index, app->local_connects, ({
+ vec_add1 (handles, handle);
+ }));
+ /* *INDENT-ON* */
+
+ for (i = 0; i < vec_len (handles); i++)
+ {
+ application_client_local_connect_key_parse (handles[i], &server_index,
+ &session_index);
+ server = application_get_if_valid (server_index);
+ if (server)
+ {
+ ls = application_get_local_session (server, session_index);
+ application_local_session_disconnect (app->index, ls);
+ }
+ }
+
+ sm = segment_manager_get (app->local_segment_manager);
+ sm->app_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
+ segment_manager_del (sm);
+}
+
u8 *
format_application_listener (u8 * s, va_list * args)
{
application_t *app = va_arg (*args, application_t *);
u64 handle = va_arg (*args, u64);
- u32 index = va_arg (*args, u32);
+ u32 sm_index = va_arg (*args, u32);
int verbose = va_arg (*args, int);
stream_session_t *listener;
u8 *app_name, *str;
if (verbose)
{
s = format (s, "%-40s%-20s%-15u%-15u%-10u", str, app_name,
- app->api_client_index, handle, index);
+ app->api_client_index, handle, sm_index);
}
else
s = format (s, "%-40s%-20s", str, app_name);
vec_free (app_name);
}
+void
+application_format_local_sessions (application_t * app, int verbose)
+{
+ vlib_main_t *vm = vlib_get_main ();
+ local_session_t *ls;
+ transport_proto_t tp;
+ u8 *conn = 0;
+
+ /* Header */
+ if (app == 0)
+ {
+ vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp",
+ "ClientApp");
+ return;
+ }
+
+ /* *INDENT-OFF* */
+ pool_foreach (ls, app->local_listen_sessions, ({
+ tp = session_type_transport_proto(ls->listener_session_type);
+ conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
+ ls->port);
+ vlib_cli_output (vm, "%-40v%-15u%-20s", conn, ls->app_index, "*");
+ vec_reset_length (conn);
+ }));
+ pool_foreach (ls, app->local_sessions, ({
+ tp = session_type_transport_proto(ls->listener_session_type);
+ conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
+ ls->port);
+ vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_index,
+ ls->client_index);
+ vec_reset_length (conn);
+ }));
+ /* *INDENT-ON* */
+
+ vec_free (conn);
+}
+
+void
+application_format_local_connects (application_t * app, int verbose)
+{
+ vlib_main_t *vm = vlib_get_main ();
+ u32 app_index, session_index;
+ application_t *server;
+ local_session_t *ls;
+ uword client_key;
+ u64 value;
+
+ /* Header */
+ if (app == 0)
+ {
+ if (verbose)
+ vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App",
+ "Peer App", "SegManager");
+ else
+ vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App",
+ "Peer App");
+ return;
+ }
+
+ /* *INDENT-OFF* */
+ hash_foreach (client_key, value, app->local_connects, ({
+ application_client_local_connect_key_parse (client_key, &app_index,
+ &session_index);
+ server = application_get (app_index);
+ ls = application_get_local_session (server, session_index);
+ vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_index, ls->client_index);
+ }));
+ /* *INDENT-ON* */
+}
+
u8 *
format_application (u8 * s, va_list * args)
{
return s;
}
+
+void
+application_format_all_listeners (vlib_main_t * vm, int do_local, int verbose)
+{
+ application_t *app;
+ u32 sm_index;
+ u64 handle;
+
+ if (!pool_elts (app_pool))
+ {
+ vlib_cli_output (vm, "No active server bindings");
+ return;
+ }
+
+ if (do_local)
+ {
+ application_format_local_sessions (0, verbose);
+ /* *INDENT-OFF* */
+ pool_foreach (app, app_pool, ({
+ if (!pool_elts (app->local_sessions)
+ && !pool_elts(app->local_connects))
+ continue;
+ application_format_local_sessions (app, verbose);
+ }));
+ /* *INDENT-ON* */
+ }
+ else
+ {
+ vlib_cli_output (vm, "%U", format_application_listener, 0 /* header */ ,
+ 0, 0, verbose);
+
+ /* *INDENT-OFF* */
+ pool_foreach (app, app_pool, ({
+ if (hash_elts (app->listeners_table) == 0)
+ continue;
+ hash_foreach (handle, sm_index, app->listeners_table, ({
+ vlib_cli_output (vm, "%U", format_application_listener, app,
+ handle, sm_index, verbose);
+ }));
+ }));
+ /* *INDENT-ON* */
+ }
+}
+
+void
+application_format_all_clients (vlib_main_t * vm, int do_local, int verbose)
+{
+ application_t *app;
+
+ if (!pool_elts (app_pool))
+ {
+ vlib_cli_output (vm, "No active apps");
+ return;
+ }
+
+ if (do_local)
+ {
+ application_format_local_connects (0, verbose);
+
+ /* *INDENT-OFF* */
+ pool_foreach (app, app_pool, ({
+ if (app->local_connects)
+ application_format_local_connects (app, verbose);
+ }));
+ /* *INDENT-ON* */
+ }
+ else
+ {
+ application_format_connects (0, verbose);
+
+ /* *INDENT-OFF* */
+ pool_foreach (app, app_pool, ({
+ if (app->connects_seg_manager == (u32)~0)
+ continue;
+ application_format_connects (app, verbose);
+ }));
+ /* *INDENT-ON* */
+ }
+}
+
static clib_error_t *
show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
vlib_cli_command_t * cmd)
{
+ int do_server = 0, do_client = 0, do_local = 0;
application_t *app;
- int do_server = 0;
- int do_client = 0;
int verbose = 0;
session_cli_return_if_not_enabled ();
do_server = 1;
else if (unformat (input, "client"))
do_client = 1;
+ else if (unformat (input, "local"))
+ do_local = 1;
else if (unformat (input, "verbose"))
verbose = 1;
else
}
if (do_server)
- {
- u64 handle;
- u32 index;
- if (pool_elts (app_pool))
- {
- vlib_cli_output (vm, "%U", format_application_listener,
- 0 /* header */ , 0, 0, verbose);
-
- /* *INDENT-OFF* */
- pool_foreach (app, app_pool, ({
- /* App's listener sessions */
- if (hash_elts (app->listeners_table) == 0)
- continue;
- hash_foreach (handle, index, app->listeners_table, ({
- vlib_cli_output (vm, "%U", format_application_listener, app,
- handle, index, verbose);
- }));
- }));
- /* *INDENT-ON* */
-
- }
- else
- vlib_cli_output (vm, "No active server bindings");
- }
+ application_format_all_listeners (vm, do_local, verbose);
if (do_client)
- {
- if (pool_elts (app_pool))
- {
- application_format_connects (0, verbose);
-
- /* *INDENT-OFF* */
- pool_foreach (app, app_pool,
- ({
- if (app->connects_seg_manager == (u32)~0)
- continue;
- application_format_connects (app, verbose);
- }));
- /* *INDENT-ON* */
- }
- else
- vlib_cli_output (vm, "No active client bindings");
- }
+ application_format_all_clients (vm, do_local, verbose);
/* Print app related info */
if (!do_server && !do_client)
/** Notify server of new segment */
int (*add_segment_callback) (u32 api_client_index,
const ssvm_private_t * ssvm_seg);
+ /** Notify server of new segment */
+ int (*del_segment_callback) (u32 api_client_index,
+ const ssvm_private_t * ssvm_seg);
/** Notify server of newly accepted session */
int (*session_accept_callback) (stream_session_t * new_session);
/** Direct RX callback, for built-in servers */
int (*builtin_server_rx_callback) (stream_session_t * session);
- /** Redirect connection to local server */
- int (*redirect_connect_callback) (u32 api_client_index, void *mp);
} session_cb_vft_t;
typedef struct _application
session_cb_vft_t cb_fns;
/*
- * svm segment management
+ * ssvm (fifo) segment management
*/
+ /** Segment manager used for outgoing connects issued by the app */
u32 connects_seg_manager;
/** Lookup tables for listeners. Value is segment manager index */
segment_manager_properties_t sm_properties;
u16 proxied_transports;
+
+ /*
+ * Local "cut through" connections specific
+ */
+
+ /** Segment manager used for incoming "cut through" connects */
+ u32 local_segment_manager;
+
+ /** Pool of local listen sessions */
+ local_session_t *local_listen_sessions;
+
+ /** Pool of local sessions the app owns (as a server) */
+ local_session_t *local_sessions;
+
+ /** Hash table of the app's local connects */
+ uword *local_connects;
} application_t;
#define APP_INVALID_INDEX ((u32)~0)
-#define APP_DROP_INDEX (((u32)~0) - 1)
#define APP_NS_INVALID_INDEX ((u32)~0)
#define APP_INVALID_SEGMENT_MANAGER_INDEX ((u32) ~0)
u32 application_get_index (application_t * app);
int application_start_listen (application_t * app,
- session_endpoint_t * tep, u64 * handle);
-int application_stop_listen (application_t * srv, u64 handle);
+ session_endpoint_t * tep,
+ session_handle_t * handle);
+int application_start_local_listen (application_t * server,
+ session_endpoint_t * sep,
+ session_handle_t * handle);
+int application_stop_listen (application_t * srv, session_handle_t handle);
+int application_stop_local_listen (application_t * server,
+ session_handle_t listener_handle);
int application_open_session (application_t * app, session_endpoint_t * tep,
u32 api_context);
int application_api_queue_is_full (application_t * app);
segment_manager_t *application_get_listen_segment_manager (application_t *
app,
stream_session_t *
- s);
+ ls);
segment_manager_t *application_get_connect_segment_manager (application_t *
app);
int application_is_proxy (application_t * app);
segment_manager_properties_t
* application_segment_manager_properties (application_t * app);
+local_session_t *application_alloc_local_session (application_t * app);
+void application_free_local_session (application_t * app,
+ local_session_t * ls);
+local_session_t *application_get_local_session (application_t * app,
+ u32 session_index);
+local_session_t *application_get_local_session_from_handle (session_handle_t
+ handle);
+int application_local_session_connect (u32 table_index,
+ application_t * client,
+ application_t * server,
+ local_session_t * ll, u32 opaque);
+int application_local_session_connect_notify (local_session_t * ls);
+int application_local_session_disconnect (u32 app_index,
+ local_session_t * ls);
+void application_local_sessions_del (application_t * app);
+
+always_inline u32
+local_session_id (local_session_t * ll)
+{
+ ASSERT (ll->app_index < (2 << 16) && ll->session_index < (2 << 16));
+ return ((u32) ll->app_index << 16 | (u32) ll->session_index);
+}
+
+always_inline void
+local_session_parse_id (u32 ls_id, u32 * app_index, u32 * session_index)
+{
+ *app_index = ls_id >> 16;
+ *session_index = ls_id & 0xFFF;
+}
+
+always_inline void
+local_session_parse_handle (session_handle_t handle, u32 * server_index,
+ u32 * session_index)
+{
+ u32 bottom;
+ ASSERT ((handle >> 32) == SESSION_LOCAL_TABLE_PREFIX);
+ bottom = (handle & 0xFFFFFFFF);
+ local_session_parse_id (bottom, server_index, session_index);
+}
+
+always_inline session_handle_t
+application_local_session_handle (local_session_t * ls)
+{
+ return ((u64) SESSION_LOCAL_TABLE_PREFIX << 32) | local_session_id (ls);
+}
+
+always_inline local_session_t *
+application_get_local_listen_session (application_t * app, u32 session_index)
+{
+ return pool_elt_at_index (app->local_listen_sessions, session_index);
+}
+
+always_inline u8
+application_local_session_listener_has_transport (local_session_t * ls)
+{
+ transport_proto_t tp;
+ tp = session_type_transport_proto (ls->listener_session_type);
+ return (tp != TRANSPORT_PROTO_NONE);
+}
+
+
#endif /* SRC_VNET_SESSION_APPLICATION_H_ */
/*
*/
if (application_has_local_scope (app) && session_endpoint_is_zero (sep))
{
- table_index = application_local_session_table (app);
- listener = session_lookup_endpoint_listener (table_index, sep, 1);
- if (listener != SESSION_INVALID_HANDLE)
- return VNET_API_ERROR_ADDRESS_IN_USE;
- session_lookup_add_session_endpoint (table_index, sep, app->index);
- *handle = session_lookup_local_listener_make_handle (sep);
+ if ((rv = application_start_local_listen (app, sep, handle)))
+ return rv;
have_local = 1;
}
}
int
-vnet_unbind_i (u32 app_index, u64 handle)
+vnet_unbind_i (u32 app_index, session_handle_t handle)
{
- application_t *app = application_get_if_valid (app_index);
- stream_session_t *listener = 0;
- u32 table_index;
+ application_t *app;
+ int rv;
- if (!app)
+ if (!(app = application_get_if_valid (app_index)))
{
SESSION_DBG ("app (%d) not attached", app_index);
return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
}
- /*
- * Clean up local session table. If we have a listener session use it to
- * find the port and proto. If not, the handle must be a local table handle
- * so parse it.
- */
-
if (application_has_local_scope (app))
{
- session_endpoint_t sep = SESSION_ENDPOINT_NULL;
- if (!session_lookup_local_is_handle (handle))
- listener = listen_session_get_from_handle (handle);
- if (listener)
- {
- if (listen_session_get_local_session_endpoint (listener, &sep))
- {
- clib_warning ("broken listener");
- return -1;
- }
- }
- else
- {
- if (session_lookup_local_listener_parse_handle (handle, &sep))
- {
- clib_warning ("can't parse handle");
- return -1;
- }
- }
- table_index = application_local_session_table (app);
- session_lookup_del_session_endpoint (table_index, &sep);
+ if ((rv = application_stop_local_listen (app, handle)))
+ return rv;
}
/*
return 0;
}
-static int
-app_connect_redirect (application_t * server, void *mp)
-{
- return server->cb_fns.redirect_connect_callback (server->api_client_index,
- mp);
-}
-
int
-vnet_connect_i (u32 app_index, u32 api_context, session_endpoint_t * sep,
+vnet_connect_i (u32 client_index, u32 api_context, session_endpoint_t * sep,
void *mp)
{
- application_t *server, *app;
- u32 table_index, server_index;
+ application_t *server, *client;
+ u32 table_index, server_index, li;
stream_session_t *listener;
+ local_session_t *ll;
+ u64 lh;
if (session_endpoint_is_zero (sep))
return VNET_API_ERROR_INVALID_VALUE;
- app = application_get (app_index);
- session_endpoint_update_for_app (sep, app);
+ client = application_get (client_index);
+ session_endpoint_update_for_app (sep, client);
/*
- * First check the the local scope for locally attached destinations.
+ * First check the local scope for locally attached destinations.
* If we have local scope, we pass *all* connects through it since we may
* have special policy rules even for non-local destinations, think proxy.
*/
- if (application_has_local_scope (app))
+ if (application_has_local_scope (client))
{
- table_index = application_local_session_table (app);
- server_index = session_lookup_local_endpoint (table_index, sep);
- if (server_index == APP_DROP_INDEX)
+ table_index = application_local_session_table (client);
+ lh = session_lookup_local_endpoint (table_index, sep);
+ if (lh == SESSION_DROP_HANDLE)
return VNET_API_ERROR_APP_CONNECT_FILTERED;
+ local_session_parse_handle (lh, &server_index, &li);
+
/*
* Break loop if rule in local table points to connecting app. This
* can happen if client is a generic proxy. Route connect through
* global table instead.
*/
- if (server_index != app_index)
+ if (server_index != client_index
+ && (server = application_get_if_valid (server_index)))
{
- server = application_get (server_index);
- /*
- * Server is willing to have a direct fifo connection created
- * instead of going through the state machine, etc.
- */
- if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT))
- return app_connect_redirect (server, mp);
+ ll = application_get_local_listen_session (server, li);
+ return application_local_session_connect (table_index, client,
+ server, ll, api_context);
}
}
if (session_endpoint_is_local (sep))
return VNET_API_ERROR_SESSION_CONNECT;
- if (!application_has_global_scope (app))
+ if (!application_has_global_scope (client))
return VNET_API_ERROR_APP_CONNECT_SCOPE;
- table_index = application_session_table (app,
+ table_index = application_session_table (client,
session_endpoint_fib_proto (sep));
listener = session_lookup_listener (table_index, sep);
if (listener)
{
server = application_get (listener->app_index);
- if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT))
- return app_connect_redirect (server, mp);
+ if (server)
+ return application_local_session_connect (table_index, client, server,
+ (local_session_t *)
+ listener, api_context);
}
/*
* Not connecting to a local server, propagate to transport
*/
- if (application_open_session (app, sep, api_context))
+ if (application_open_session (client, sep, api_context))
return VNET_API_ERROR_SESSION_CONNECT;
return 0;
}
int
vnet_disconnect_session (vnet_disconnect_args_t * a)
{
- u32 index, thread_index;
- stream_session_t *s;
-
- session_parse_handle (a->handle, &index, &thread_index);
- s = session_get_if_valid (index, thread_index);
-
- if (!s || s->app_index != a->app_index)
- return VNET_API_ERROR_INVALID_VALUE;
+ if (session_handle_is_local (a->handle))
+ {
+ local_session_t *ls;
+ ls = application_get_local_session_from_handle (a->handle);
+ if (ls->app_index != a->app_index && ls->client_index != a->app_index)
+ {
+ clib_warning ("app %u is neither client nor server for session %u",
+ a->app_index, a->app_index);
+ return VNET_API_ERROR_INVALID_VALUE;
+ }
+ return application_local_session_disconnect (a->app_index, ls);
+ }
+ else
+ {
+ stream_session_t *s;
+ s = session_get_from_handle_if_valid (a->handle);
+ if (!s || s->app_index != a->app_index)
+ return VNET_API_ERROR_INVALID_VALUE;
- /* We're peeking into another's thread pool. Make sure */
- ASSERT (s->session_index == index);
+ /* We're peeking into another's thread pool. Make sure */
+ ASSERT (s->session_index == session_index_from_handle (a->handle));
- stream_session_disconnect (s);
+ stream_session_disconnect (s);
+ }
return 0;
}
/* Used for redirects */
void *mp;
- u64 session_handle;
+ session_handle_t session_handle;
} vnet_connect_args_t;
typedef struct _vnet_disconnect_args_t
{
- u64 handle;
+ session_handle_t handle;
u32 app_index;
} vnet_disconnect_args_t;
api_parse_session_handle (u64 handle, u32 * session_index,
u32 * thread_index);
+void send_local_session_disconnect_callback (u32 app_index,
+ local_session_t * ls);
+
#endif /* __included_uri_h__ */
/*
/**
* Default fifo and segment size. TODO config.
*/
-u32 default_fifo_size = 1 << 12;
-u32 default_segment_size = 1 << 20;
+static u32 default_fifo_size = 1 << 12;
+static u32 default_segment_size = 1 << 20;
+static u32 default_app_evt_queue_size = 128;
segment_manager_properties_t *
segment_manager_properties_get (segment_manager_t * sm)
props->add_segment_size = default_segment_size;
props->rx_fifo_size = default_fifo_size;
props->tx_fifo_size = default_fifo_size;
+ props->evt_q_size = default_app_evt_queue_size;
return props;
}
/**
* Remove segment without lock
*/
-always_inline void
+void
segment_manager_del_segment (segment_manager_t * sm,
svm_fifo_segment_private_t * fs)
{
void
segment_manager_segment_reader_unlock (segment_manager_t * sm)
{
+ ASSERT (sm->segments_rwlock->n_readers > 0);
clib_rwlock_reader_unlock (&sm->segments_rwlock);
}
* If needed a writer's lock is acquired before allocating a new segment
* to avoid affecting any of the segments pool readers.
*/
-always_inline int
+int
segment_manager_add_segment (segment_manager_t * sm, u32 segment_size)
{
segment_manager_main_t *smm = &segment_manager_main;
*/
int
segment_manager_init (segment_manager_t * sm, u32 first_seg_size,
- u32 evt_q_size, u32 prealloc_fifo_pairs)
+ u32 prealloc_fifo_pairs)
{
u32 rx_fifo_size, tx_fifo_size, pair_size;
u32 rx_rounded_data_size, tx_rounded_data_size;
return seg_index;
}
+ segment = segment_manager_get_segment (sm, seg_index);
if (i == 0)
- sm->event_queue = segment_manager_alloc_queue (sm, evt_q_size);
+ sm->event_queue = segment_manager_alloc_queue (segment,
+ props->evt_q_size);
- segment = segment_manager_get_segment (sm, seg_index);
svm_fifo_segment_preallocate_fifo_pairs (segment,
props->rx_fifo_size,
props->tx_fifo_size,
clib_warning ("Failed to allocate segment");
return seg_index;
}
- sm->event_queue = segment_manager_alloc_queue (sm, evt_q_size);
+ segment = segment_manager_get_segment (sm, seg_index);
+ sm->event_queue = segment_manager_alloc_queue (segment,
+ props->evt_q_size);
}
return 0;
}
}
-always_inline int
-segment_try_alloc_fifos (svm_fifo_segment_private_t * fifo_segment,
- u32 rx_fifo_size, u32 tx_fifo_size,
- svm_fifo_t ** rx_fifo, svm_fifo_t ** tx_fifo)
+int
+segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fifo_segment,
+ u32 rx_fifo_size, u32 tx_fifo_size,
+ svm_fifo_t ** rx_fifo, svm_fifo_t ** tx_fifo)
{
rx_fifo_size = clib_max (rx_fifo_size, default_fifo_size);
*rx_fifo = svm_fifo_segment_alloc_fifo (fifo_segment, rx_fifo_size,
svm_fifo_t ** tx_fifo,
u32 * fifo_segment_index)
{
- svm_fifo_segment_private_t *fifo_segment;
+ svm_fifo_segment_private_t *fifo_segment = 0;
int alloc_fail = 1, rv = 0, new_fs_index;
segment_manager_properties_t *props;
u8 added_a_segment = 0;
/* *INDENT-OFF* */
segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({
- alloc_fail = segment_try_alloc_fifos (fifo_segment, props->rx_fifo_size,
- props->tx_fifo_size, rx_fifo,
- tx_fifo);
+ alloc_fail = segment_manager_try_alloc_fifos (fifo_segment,
+ props->rx_fifo_size,
+ props->tx_fifo_size,
+ rx_fifo, tx_fifo);
/* Exit with lock held, drop it after notifying app */
if (!alloc_fail)
goto alloc_success;
return SESSION_ERROR_SEG_CREATE;
}
fifo_segment = segment_manager_get_segment_w_lock (sm, new_fs_index);
- alloc_fail = segment_try_alloc_fifos (fifo_segment, props->rx_fifo_size,
- props->tx_fifo_size, rx_fifo,
- tx_fifo);
+ alloc_fail = segment_manager_try_alloc_fifos (fifo_segment,
+ props->rx_fifo_size,
+ props->tx_fifo_size,
+ rx_fifo, tx_fifo);
added_a_segment = 1;
goto alloc_check;
}
* Must be called with lock held
*/
svm_queue_t *
-segment_manager_alloc_queue (segment_manager_t * sm, u32 queue_size)
+segment_manager_alloc_queue (svm_fifo_segment_private_t * segment,
+ u32 queue_size)
{
- svm_fifo_segment_private_t *segment;
ssvm_shared_header_t *sh;
svm_queue_t *q;
void *oldheap;
- ASSERT (!pool_is_free_index (sm->segments, 0));
-
- segment = segment_manager_get_segment (sm, 0);
sh = segment->ssvm.sh;
oldheap = ssvm_push_heap (sh);
/** Session fifo sizes. */
u32 rx_fifo_size;
u32 tx_fifo_size;
-
- /** Preallocated pool sizes */
-// u32 preallocated_fifo_pairs;
+ u32 evt_q_size;
/** Configured additional segment size */
u32 add_segment_size;
/** Segment type: if set to SSVM_N_TYPES, private segments are used */
ssvm_segment_type_t segment_type;
- /** Use one or more private mheaps, instead of the global heap */
-// u32 private_segment_count;
} segment_manager_properties_t;
typedef struct _segment_manager
segment_manager_t *segment_manager_new ();
int segment_manager_init (segment_manager_t * sm, u32 first_seg_size,
- u32 evt_q_size, u32 prealloc_fifo_pairs);
+ u32 prealloc_fifo_pairs);
svm_fifo_segment_private_t *segment_manager_get_segment (segment_manager_t *
sm,
svm_fifo_segment_private_t
* segment_manager_get_segment_w_lock (segment_manager_t * sm,
u32 segment_index);
+int segment_manager_add_segment (segment_manager_t * sm, u32 segment_size);
+void segment_manager_del_segment (segment_manager_t * sm,
+ svm_fifo_segment_private_t * fs);
void segment_manager_segment_reader_unlock (segment_manager_t * sm);
void segment_manager_segment_writer_unlock (segment_manager_t * sm);
void segment_manager_del (segment_manager_t * sm);
void segment_manager_init_del (segment_manager_t * sm);
u8 segment_manager_has_fifos (segment_manager_t * sm);
-int
-segment_manager_alloc_session_fifos (segment_manager_t * sm,
- svm_fifo_t ** server_rx_fifo,
- svm_fifo_t ** server_tx_fifo,
- u32 * fifo_segment_index);
-void
-segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
- svm_fifo_t * tx_fifo);
-svm_queue_t *segment_manager_alloc_queue (segment_manager_t * sm,
+int segment_manager_alloc_session_fifos (segment_manager_t * sm,
+ svm_fifo_t ** server_rx_fifo,
+ svm_fifo_t ** server_tx_fifo,
+ u32 * fifo_segment_index);
+int segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fs,
+ u32 rx_fifo_size, u32 tx_fifo_size,
+ svm_fifo_t ** rx_fifo,
+ svm_fifo_t ** tx_fifo);
+void segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
+ svm_fifo_t * tx_fifo);
+svm_queue_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs,
u32 queue_size);
void segment_manager_dealloc_queue (segment_manager_t * sm, svm_queue_t * q);
void segment_manager_app_detach (segment_manager_t * sm);
* limitations under the License.
*/
-option version = "1.0.0";
+option version = "1.0.1";
/** \brief client->vpp, attach application to session layer
@param client_index - opaque cookie to identify the sender
u8 segment_name[128];
};
+/** \brief vpp->client unmap shared memory segment
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param segment_name -
+*/
+autoreply define unmap_segment {
+ u32 client_index;
+ u32 context;
+ u8 segment_name[128];
+};
+
/** \brief Bind to a given URI
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param context - sender context, to match reply w/ request
@param listener_handle - tells client which listener this pertains to
@param handle - unique session identifier
- @param session_thread_index - thread index of new session
@param rx_fifo_address - rx (vpp -> vpp-client) fifo address
@param tx_fifo_address - tx (vpp-client -> vpp) fifo address
- @param vpp_event_queue_address - vpp's event queue address
+ @param vpp_event_queue_address - vpp's event queue address or client's
+ event queue for cut through
+ @param server_event_queue_address - server's event queue address for
+ cut through sessions
@param port - remote port
@param is_ip4 - 1 if the ip is ip4
@param ip - remote ip
u64 server_rx_fifo;
u64 server_tx_fifo;
u64 vpp_event_queue_address;
+ u64 server_event_queue_address;
u16 port;
u8 is_ip4;
u8 ip[16];
@param handle - session handle
*/
define disconnect_session_reply {
- u32 client_index;
u32 context;
i32 retval;
u64 handle;
@param server_rx_fifo - rx (vpp -> vpp-client) fifo address
@param server_tx_fifo - tx (vpp-client -> vpp) fifo address
@param vpp_event_queue_address - vpp's event queue address
+ @param client_event_queue_address - client's event queue address
@param segment_size - size of segment to be attached. Only for redirects.
@param segment_name_length - non-zero if the client needs to attach to
the fifo segment
u64 server_rx_fifo;
u64 server_tx_fifo;
u64 vpp_event_queue_address;
+ u64 client_event_queue_address;
u32 segment_size;
u8 segment_name_length;
u8 segment_name[128];
void *arg;
} rpc_args_t;
+typedef u64 session_handle_t;
+
/* *INDENT-OFF* */
typedef CLIB_PACKED (struct {
union
{
svm_fifo_t * fifo;
- u64 session_handle;
+ session_handle_t session_handle;
rpc_args_t rpc_args;
};
u8 event_type;
return pool_elt_at_index (session_manager_main.sessions[thread_index], si);
}
-always_inline u64
+always_inline session_handle_t
session_handle (stream_session_t * s)
{
return ((u64) s->thread_index << 32) | (u64) s->session_index;
}
always_inline u32
-session_index_from_handle (u64 handle)
+session_index_from_handle (session_handle_t handle)
{
return handle & 0xFFFFFFFF;
}
always_inline u32
-session_thread_from_handle (u64 handle)
+session_thread_from_handle (session_handle_t handle)
{
return handle >> 32;
}
always_inline void
-session_parse_handle (u64 handle, u32 * index, u32 * thread_index)
+session_parse_handle (session_handle_t handle, u32 * index,
+ u32 * thread_index)
{
*index = session_index_from_handle (handle);
*thread_index = session_thread_from_handle (handle);
}
always_inline stream_session_t *
-session_get_from_handle (u64 handle)
+session_get_from_handle (session_handle_t handle)
{
session_manager_main_t *smm = &session_manager_main;
- return
- pool_elt_at_index (smm->sessions[session_thread_from_handle (handle)],
- session_index_from_handle (handle));
+ u32 session_index, thread_index;
+ session_parse_handle (handle, &session_index, &thread_index);
+ return pool_elt_at_index (smm->sessions[thread_index], session_index);
+}
+
+always_inline stream_session_t *
+session_get_from_handle_if_valid (session_handle_t handle)
+{
+ u32 session_index, thread_index;
+ session_parse_handle (handle, &session_index, &thread_index);
+ return session_get_if_valid (session_index, thread_index);
+}
+
+always_inline u8
+session_handle_is_local (session_handle_t handle)
+{
+ if ((handle >> 32) == SESSION_LOCAL_TABLE_PREFIX)
+ return 1;
+ return 0;
+}
+
+always_inline transport_proto_t
+session_type_transport_proto (session_type_t st)
+{
+ return (st >> 1);
}
always_inline transport_proto_t
return (s->session_type >> 1);
}
+always_inline fib_protocol_t
+session_get_fib_proto (stream_session_t * s)
+{
+ u8 is_ip4 = s->session_type & 1;
+ return (is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6);
+}
+
always_inline session_type_t
session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4)
{
return (proto << 1 | is_ip4);
}
+always_inline u8
+session_has_transport (stream_session_t * s)
+{
+ return (session_get_transport_proto (s) != TRANSPORT_PROTO_NONE);
+}
+
/**
* Acquires a lock that blocks a session pool from expanding.
*
}
always_inline stream_session_t *
-listen_session_get_from_handle (u64 handle)
+listen_session_get_from_handle (session_handle_t handle)
{
session_manager_main_t *smm = &session_manager_main;
stream_session_t *s;
return 0;
}
+static int
+send_del_segment_callback (u32 api_client_index, const ssvm_private_t * fs)
+{
+ vl_api_unmap_segment_t *mp;
+ vl_api_registration_t *reg;
+
+ reg = vl_mem_api_client_index_to_registration (api_client_index);
+ if (!reg)
+ {
+ clib_warning ("no registration: %u", api_client_index);
+ return -1;
+ }
+
+ if (ssvm_type (fs) == SSVM_SEGMENT_MEMFD
+ && vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
+ {
+ clib_warning ("can't send memfd fd");
+ return -1;
+ }
+
+ mp = vl_msg_api_alloc_as_if_client (sizeof (*mp));
+ memset (mp, 0, sizeof (*mp));
+ mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_UNMAP_SEGMENT);
+ strncpy ((char *) mp->segment_name, (char *) fs->name,
+ sizeof (mp->segment_name) - 1);
+
+ vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
+
+ if (ssvm_type (fs) == SSVM_SEGMENT_MEMFD)
+ return session_send_memfd_fd (reg, fs);
+
+ return 0;
+}
+
static int
send_session_accept_callback (stream_session_t * s)
{
stream_session_t *listener;
svm_queue_t *vpp_queue;
- vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
reg = vl_mem_api_client_index_to_registration (server->api_client_index);
if (!reg)
{
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
mp->context = server->index;
- listener = listen_session_get (s->session_type, s->listener_index);
- tp_vft = transport_protocol_get_vft (session_get_transport_proto (s));
- tc = tp_vft->get_connection (s->connection_index, s->thread_index);
- mp->listener_handle = listen_session_get_handle (listener);
+ mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
+ mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
- if (application_is_proxy (server))
+ if (session_has_transport (s))
{
- listener =
- application_first_listener (server,
- transport_connection_fib_proto (tc),
- tc->proto);
- if (listener)
- mp->listener_handle = listen_session_get_handle (listener);
+ listener = listen_session_get (s->session_type, s->listener_index);
+ mp->listener_handle = listen_session_get_handle (listener);
+ if (application_is_proxy (server))
+ {
+ listener =
+ application_first_listener (server, session_get_fib_proto (s),
+ session_get_transport_proto (s));
+ if (listener)
+ mp->listener_handle = listen_session_get_handle (listener);
+ }
+ vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
+ mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+ mp->handle = session_handle (s);
+ tp_vft = transport_protocol_get_vft (session_get_transport_proto (s));
+ tc = tp_vft->get_connection (s->connection_index, s->thread_index);
+ mp->port = tc->rmt_port;
+ mp->is_ip4 = tc->is_ip4;
+ clib_memcpy (&mp->ip, &tc->rmt_ip, sizeof (tc->rmt_ip));
+ }
+ else
+ {
+ local_session_t *ls = (local_session_t *) s;
+ local_session_t *ll;
+ if (application_local_session_listener_has_transport (ls))
+ {
+ listener = listen_session_get (ls->listener_session_type,
+ ls->listener_index);
+ mp->listener_handle = listen_session_get_handle (listener);
+ }
+ else
+ {
+ ll = application_get_local_listen_session (server,
+ ls->listener_index);
+ mp->listener_handle = application_local_session_handle (ll);
+ }
+ mp->handle = application_local_session_handle (ls);
+ mp->port = ls->port;
+ mp->vpp_event_queue_address = ls->client_evt_q;
+ mp->server_event_queue_address = ls->server_evt_q;
}
- mp->handle = session_handle (s);
- mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
- mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
- mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
- mp->port = tc->rmt_port;
- mp->is_ip4 = tc->is_ip4;
- clib_memcpy (&mp->ip, &tc->rmt_ip, sizeof (tc->rmt_ip));
vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
return 0;
}
+void
+send_local_session_disconnect_callback (u32 app_index, local_session_t * ls)
+{
+ application_t *app = application_get (app_index);
+ vl_api_disconnect_session_t *mp;
+ vl_api_registration_t *reg;
+
+ reg = vl_mem_api_client_index_to_registration (app->api_client_index);
+ if (!reg)
+ {
+ clib_warning ("no registration: %u", app->api_client_index);
+ return;
+ }
+
+ mp = vl_mem_api_alloc_as_if_client_w_reg (reg, sizeof (*mp));
+ memset (mp, 0, sizeof (*mp));
+ mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SESSION);
+ mp->handle = application_local_session_handle (ls);
+ mp->context = app->api_client_index;
+ vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
+}
+
static void
send_session_disconnect_callback (stream_session_t * s)
{
memset (mp, 0, sizeof (*mp));
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SESSION);
mp->handle = session_handle (s);
+ mp->context = app->api_client_index;
vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
}
if (is_fail)
goto done;
- tc = session_get_transport (s);
- if (!tc)
+ if (session_has_transport (s))
{
- is_fail = 1;
- goto done;
- }
+ tc = session_get_transport (s);
+ if (!tc)
+ {
+ is_fail = 1;
+ goto done;
+ }
- vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
- mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
- mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
- mp->handle = session_handle (s);
- mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
- clib_memcpy (mp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
- mp->is_ip4 = tc->is_ip4;
- mp->lcl_port = tc->lcl_port;
+ vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
+ mp->handle = session_handle (s);
+ mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+ clib_memcpy (mp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
+ mp->is_ip4 = tc->is_ip4;
+ mp->lcl_port = tc->lcl_port;
+ mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
+ mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+ }
+ else
+ {
+ local_session_t *ls = (local_session_t *) s;
+ mp->handle = application_local_session_handle (ls);
+ mp->lcl_port = ls->port;
+ mp->vpp_event_queue_address = ls->server_evt_q;
+ mp->client_event_queue_address = ls->client_evt_q;
+ mp->server_rx_fifo = pointer_to_uword (s->server_tx_fifo);
+ mp->server_tx_fifo = pointer_to_uword (s->server_rx_fifo);
+ }
done:
mp->retval = is_fail ?
return 0;
}
-/**
- * Redirect a connect_uri message to the indicated server.
- * Only sent if the server has bound the related port with
- * URI_OPTIONS_FLAGS_USE_FIFO
- */
-static int
-redirect_connect_callback (u32 server_api_client_index, void *mp_arg)
-{
- vl_api_connect_sock_t *mp = mp_arg;
- svm_queue_t *server_q, *client_q;
- segment_manager_properties_t *props;
- vlib_main_t *vm = vlib_get_main ();
- f64 timeout = vlib_time_now (vm) + 0.5;
- application_t *app;
- int rv = 0;
-
- server_q = vl_api_client_index_to_input_queue (server_api_client_index);
-
- if (!server_q)
- {
- rv = VNET_API_ERROR_INVALID_VALUE;
- goto out;
- }
-
- client_q = vl_api_client_index_to_input_queue (mp->client_index);
- if (!client_q)
- {
- rv = VNET_API_ERROR_INVALID_VALUE_2;
- goto out;
- }
-
- /* Tell the server the client's API queue address, so it can reply */
- mp->client_queue_address = pointer_to_uword (client_q);
- app = application_lookup (mp->client_index);
- if (!app)
- {
- clib_warning ("no client application");
- return -1;
- }
-
- props = application_segment_manager_properties (app);
- mp->options[APP_OPTIONS_RX_FIFO_SIZE] = props->rx_fifo_size;
- mp->options[APP_OPTIONS_TX_FIFO_SIZE] = props->tx_fifo_size;
-
- /*
- * Bounce message handlers MUST NOT block the data-plane.
- * Spin waiting for the queue lock, but
- */
-
- while (vlib_time_now (vm) < timeout)
- {
- rv = svm_queue_add (server_q, (u8 *) & mp, 1 /*nowait */ );
- switch (rv)
- {
- /* correctly enqueued */
- case 0:
- return VNET_API_ERROR_SESSION_REDIRECT;
-
- /* continue spinning, wait for pthread_mutex_trylock to work */
- case -1:
- continue;
-
- /* queue stuffed, drop the msg */
- case -2:
- rv = VNET_API_ERROR_QUEUE_FULL;
- goto out;
- }
- }
-out:
- /* Dispose of the message */
- vl_msg_api_free (mp);
- return rv;
-}
-
static session_cb_vft_t session_cb_vft = {
.session_accept_callback = send_session_accept_callback,
.session_disconnect_callback = send_session_disconnect_callback,
.session_connected_callback = send_session_connected_callback,
.session_reset_callback = send_session_reset_callback,
.add_segment_callback = send_add_segment_callback,
- .redirect_connect_callback = redirect_connect_callback
+ .del_segment_callback = send_del_segment_callback,
};
static void
}
/* Disconnect has been confirmed. Confirm close to transport */
- app = application_lookup (mp->client_index);
+ app = application_lookup (mp->context);
if (app)
{
a->handle = mp->handle;
static void
vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
{
+ vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+ local_session_t *ls;
stream_session_t *s;
- u32 session_index, thread_index;
- vnet_disconnect_args_t _a, *a = &_a;
/* Server isn't interested, kill the session */
if (mp->retval)
a->app_index = mp->context;
a->handle = mp->handle;
vnet_disconnect_session (a);
+ return;
+ }
+
+ if (session_handle_is_local (mp->handle))
+ {
+ ls = application_get_local_session_from_handle (mp->handle);
+ if (!ls || ls->app_index != mp->context)
+ {
+ clib_warning ("server %u doesn't own local handle %llu",
+ mp->context, mp->handle);
+ return;
+ }
+ if (application_local_session_connect_notify (ls))
+ return;
+ ls->session_state = SESSION_STATE_READY;
}
else
{
- session_parse_handle (mp->handle, &session_index, &thread_index);
- s = session_get_if_valid (session_index, thread_index);
+ s = session_get_from_handle_if_valid (mp->handle);
if (!s)
{
clib_warning ("session doesn't exist");
return 1;
}
-static u32
-session_lookup_action_to_app_index (u32 action_index)
+static u64
+session_lookup_action_to_handle (u32 action_index)
{
switch (action_index)
{
case SESSION_RULES_TABLE_ACTION_DROP:
- return APP_DROP_INDEX;
+ return SESSION_DROP_HANDLE;
case SESSION_RULES_TABLE_ACTION_ALLOW:
case SESSION_RULES_TABLE_INVALID_INDEX:
- return APP_INVALID_INDEX;
+ return SESSION_INVALID_HANDLE;
default:
+ /* application index */
return action_index;
}
}
u8 transport_proto)
{
u32 app_index;
- app_index = session_lookup_action_to_app_index (action_index);
+ app_index = session_lookup_action_to_handle (action_index);
/* Nothing sophisticated for now, action index is app index */
return session_lookup_app_listen_session (app_index, fib_proto,
transport_proto);
}
+/** UNUSED */
stream_session_t *
session_lookup_rules_table_session4 (session_table_t * st, u8 proto,
ip4_address_t * lcl, u16 lcl_port,
u32 action_index, app_index;
action_index = session_rules_table_lookup4 (srt, lcl, rmt, lcl_port,
rmt_port);
- app_index = session_lookup_action_to_app_index (action_index);
+ app_index = session_lookup_action_to_handle (action_index);
/* Nothing sophisticated for now, action index is app index */
return session_lookup_app_listen_session (app_index, FIB_PROTOCOL_IP4,
proto);
}
+/** UNUSED */
stream_session_t *
session_lookup_rules_table_session6 (session_table_t * st, u8 proto,
ip6_address_t * lcl, u16 lcl_port,
u32 action_index, app_index;
action_index = session_rules_table_lookup6 (srt, lcl, rmt, lcl_port,
rmt_port);
- app_index = session_lookup_action_to_app_index (action_index);
+ app_index = session_lookup_action_to_handle (action_index);
return session_lookup_app_listen_session (app_index, FIB_PROTOCOL_IP6,
proto);
}
* @param use_rules flag that indicates if the session rules of the table
* should be used
* @return invalid handle if nothing is found, the handle of a valid listener
- * or an action_index if a rule is hit
+ * or an action derived handle if a rule is hit
*/
u64
session_lookup_endpoint_listener (u32 table_index, session_endpoint_t * sep,
ai = session_rules_table_lookup4 (srt, &lcl4, &sep->ip.ip4, 0,
sep->port);
if (session_lookup_action_index_is_valid (ai))
- return session_lookup_action_to_app_index (ai);
+ return session_lookup_action_to_handle (ai);
}
}
else
ai = session_rules_table_lookup6 (srt, &lcl6, &sep->ip.ip6, 0,
sep->port);
if (session_lookup_action_index_is_valid (ai))
- return session_lookup_action_to_app_index (ai);
+ return session_lookup_action_to_handle (ai);
}
}
return SESSION_INVALID_HANDLE;
*
* @param table_index table where the lookup should be done
* @param sep session endpoint to be looked up
- * @return index that can be interpreted as an app index or drop action.
+ * @return session handle that can be interpreted as an adjacency
*/
-u32
+u64
session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep)
{
session_rules_table_t *srt;
ai = session_rules_table_lookup4 (srt, &lcl4, &sep->ip.ip4, 0,
sep->port);
if (session_lookup_action_index_is_valid (ai))
- return session_lookup_action_to_app_index (ai);
+ return session_lookup_action_to_handle (ai);
/*
* Check if session endpoint is a listener
sep->transport_proto);
rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
if (rv == 0)
- return (u32) kv4.value;
+ return kv4.value;
/*
* Zero out the ip. Logic is that connect to local ips, say
kv4.key[0] = 0;
rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
if (rv == 0)
- return (u32) kv4.value;
+ return kv4.value;
/*
* Zero out the port and check if we have proxy
kv4.key[1] = 0;
rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
if (rv == 0)
- return (u32) kv4.value;
+ return kv4.value;
}
else
{
ai = session_rules_table_lookup6 (srt, &lcl6, &sep->ip.ip6, 0,
sep->port);
if (session_lookup_action_index_is_valid (ai))
- return session_lookup_action_to_app_index (ai);
+ return session_lookup_action_to_handle (ai);
make_v6_listener_kv (&kv6, &sep->ip.ip6, sep->port,
sep->transport_proto);
rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
if (rv == 0)
- return (u32) kv6.value;
+ return kv6.value;
/*
* Zero out the ip. Same logic as above.
kv6.key[0] = kv6.key[1] = 0;
rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
if (rv == 0)
- return (u32) kv6.value;
+ return kv6.value;
/*
* Zero out the port. Same logic as above.
kv6.key[4] = kv6.key[5] = 0;
rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
if (rv == 0)
- return (u32) kv6.value;
+ return kv6.value;
}
- return APP_INVALID_INDEX;
+ return SESSION_INVALID_HANDLE;
}
static stream_session_t *
return 0;
}
-u64
-session_lookup_local_listener_make_handle (session_endpoint_t * sep)
-{
- return ((u64) SESSION_LOCAL_TABLE_PREFIX << 32
- | (u32) sep->port << 16 | (u32) sep->transport_proto << 8
- | (u32) sep->is_ip4);
-}
-
-u8
-session_lookup_local_is_handle (u64 handle)
-{
- if (handle >> 32 == SESSION_LOCAL_TABLE_PREFIX)
- return 1;
- return 0;
-}
-
-int
-session_lookup_local_listener_parse_handle (u64 handle,
- session_endpoint_t * sep)
-{
- u32 local_table_handle;
- if (handle >> 32 != SESSION_LOCAL_TABLE_PREFIX)
- return -1;
- local_table_handle = handle & 0xFFFFFFFFULL;
- sep->is_ip4 = local_table_handle & 0xff;
- local_table_handle >>= 8;
- sep->transport_proto = local_table_handle & 0xff;
- sep->port = local_table_handle >> 8;
- return 0;
-}
-
clib_error_t *
vnet_session_rule_add_del (session_rule_add_del_args_t * args)
{
u64 session_lookup_endpoint_listener (u32 table_index,
session_endpoint_t * sepi,
u8 use_rules);
-u32 session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep);
+u64 session_lookup_local_endpoint (u32 table_index, session_endpoint_t * sep);
stream_session_t *session_lookup_global_session_endpoint (session_endpoint_t
*);
int session_lookup_add_session_endpoint (u32 table_index,
u8 is_ip4);
u32 session_lookup_get_index_for_fib (u32 fib_proto, u32 fib_index);
-u64 session_lookup_local_listener_make_handle (session_endpoint_t * sep);
-u8 session_lookup_local_is_handle (u64 handle);
-int session_lookup_local_listener_parse_handle (u64 handle,
- session_endpoint_t * sep);
-
void session_lookup_show_table_entries (vlib_main_t * vm,
session_table_t * table, u8 type,
u8 is_local);
#define SESSION_LOCAL_TABLE_PREFIX ((u32)~0)
#define SESSION_INVALID_INDEX ((u32)~0)
#define SESSION_INVALID_HANDLE ((u64)~0)
+#define SESSION_DROP_HANDLE (((u64)~0) - 1)
typedef int (*ip4_session_table_walk_fn_t) (clib_bihash_kv_16_8_t * kvp,
void *ctx);
return -1;
}
+static u32 dummy_segment_count;
+
int
-dummy_add_segment_callback (u32 client_index, const u8 * seg_name,
- u32 seg_size)
+dummy_add_segment_callback (u32 client_index, const ssvm_private_t * fs)
{
- clib_warning ("called...");
- return -1;
+ dummy_segment_count = 1;
+ return 0;
+}
+
+int
+dummy_del_segment_callback (u32 client_index, const ssvm_private_t * fs)
+{
+ dummy_segment_count = 0;
+ return 0;
}
int
clib_warning ("called...");
}
+static u32 dummy_accept;
+
int
dummy_session_accept_callback (stream_session_t * s)
{
- clib_warning ("called...");
- return -1;
+ dummy_accept = 1;
+ s->session_state = SESSION_STATE_READY;
+ return 0;
}
int
.session_accept_callback = dummy_session_accept_callback,
.session_disconnect_callback = dummy_session_disconnect_callback,
.builtin_server_rx_callback = dummy_server_rx_callback,
- .redirect_connect_callback = dummy_redirect_connect_callback,
+ .add_segment_callback = dummy_add_segment_callback,
+ .del_segment_callback = dummy_del_segment_callback,
};
/* *INDENT-ON* */
memset (options, 0, sizeof (options));
options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
- options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
vnet_app_attach_args_t attach_args = {
{
u64 options[APP_OPTIONS_N_OPTIONS], dummy_secret = 1234;
u32 server_index, server_st_index, server_local_st_index;
- u32 dummy_port = 1234, local_listener, client_index;
+ u32 dummy_port = 1234, client_index;
u32 dummy_api_context = 4321, dummy_client_api_index = 1234;
u32 dummy_server_api_index = ~0, sw_if_index = 0;
session_endpoint_t server_sep = SESSION_ENDPOINT_NULL;
app_namespace_t *app_ns;
application_t *server;
stream_session_t *s;
+ u64 handle;
int code;
server_sep.is_ip4 = 1;
memset (options, 0, sizeof (options));
options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
- options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
vnet_app_attach_args_t attach_args = {
.api_client_index = ~0,
.options = options,
SESSION_TEST ((s->app_index == server_index), "app_index should be that of "
"the server");
server_local_st_index = application_local_session_table (server);
- local_listener =
- session_lookup_local_endpoint (server_local_st_index, &server_sep);
- SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
+ handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+ SESSION_TEST ((handle != SESSION_INVALID_HANDLE),
"listener should exist in local table");
/*
code = clib_error_get_code (error);
SESSION_TEST ((code == VNET_API_ERROR_INVALID_VALUE),
"error code should be invalid value (zero ip)");
+ SESSION_TEST ((dummy_segment_count == 0),
+ "shouldn't have received request to map new segment");
connect_args.sep.ip.ip4.as_u8[0] = 127;
error = vnet_connect (&connect_args);
- SESSION_TEST ((error != 0), "client connect should return error code");
+ SESSION_TEST ((error == 0), "client connect should not return error code");
code = clib_error_get_code (error);
- SESSION_TEST ((code == VNET_API_ERROR_SESSION_REDIRECT),
- "error code should be redirect");
+ SESSION_TEST ((dummy_segment_count == 1),
+ "should've received request to map new segment");
+ SESSION_TEST ((dummy_accept == 1), "should've received accept request");
detach_args.app_index = client_index;
vnet_application_detach (&detach_args);
s = session_lookup_listener (server_st_index, &server_sep);
SESSION_TEST ((s == 0), "listener should not exist in global table");
- local_listener =
- session_lookup_local_endpoint (server_local_st_index, &server_sep);
- SESSION_TEST ((s == 0), "listener should not exist in local table");
+ handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+ SESSION_TEST ((handle == SESSION_INVALID_HANDLE),
+ "listener should not exist in local table");
detach_args.app_index = server_index;
vnet_application_detach (&detach_args);
s = session_lookup_listener (server_st_index, &server_sep);
SESSION_TEST ((s == 0), "listener should not exist in global table");
server_local_st_index = application_local_session_table (server);
- local_listener =
- session_lookup_local_endpoint (server_local_st_index, &server_sep);
- SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
+ handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+ SESSION_TEST ((handle != SESSION_INVALID_HANDLE),
"listener should exist in local table");
unbind_args.handle = bind_args.handle;
error = vnet_unbind (&unbind_args);
SESSION_TEST ((error == 0), "unbind should work");
- local_listener =
- session_lookup_local_endpoint (server_local_st_index, &server_sep);
- SESSION_TEST ((local_listener == SESSION_INVALID_INDEX),
+ handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+ SESSION_TEST ((handle == SESSION_INVALID_HANDLE),
"listener should not exist in local table");
/*
SESSION_TEST ((s->app_index == server_index), "app_index should be that of "
"the server");
server_local_st_index = application_local_session_table (server);
- local_listener =
- session_lookup_local_endpoint (server_local_st_index, &server_sep);
- SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
+ handle = session_lookup_local_endpoint (server_local_st_index, &server_sep);
+ SESSION_TEST ((handle != SESSION_INVALID_HANDLE),
"zero listener should exist in local table");
detach_args.app_index = server_index;
vnet_application_detach (&detach_args);
session_endpoint_t server_sep = SESSION_ENDPOINT_NULL;
u64 options[APP_OPTIONS_N_OPTIONS];
u16 lcl_port = 1234, rmt_port = 4321;
- u32 server_index, server_index2, app_index;
+ u32 server_index, server_index2;
u32 dummy_server_api_index = ~0;
transport_connection_t *tc;
u32 dummy_port = 1111;
u32 local_ns_index = default_ns->local_table_index;
int verbose = 0, rv;
app_namespace_t *app_ns;
+ u64 handle;
while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
{
* Attach server with global and local default scope
*/
options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
- options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
attach_args.namespace_id = 0;
.port = rmt_port,
.transport_proto = TRANSPORT_PROTO_TCP,
};
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index != server_index), "local session endpoint lookup "
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle != server_index), "local session endpoint lookup "
"should not work (global scope)");
tc = session_lookup_connection_wt4 (0, &lcl_pref.fp_addr.ip4,
&is_filtered);
SESSION_TEST ((tc->c_index == listener->connection_index),
"optimized lookup for lcl port + 1 should work");
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index == server_index), "local session endpoint lookup "
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle == server_index), "local session endpoint lookup "
"should work (lcl ip was zeroed)");
/*
"should fail (deny rule)");
SESSION_TEST ((is_filtered == 1), "lookup should be filtered (deny)");
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index == APP_DROP_INDEX), "lookup for 1.2.3.4/32 1234 "
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle == SESSION_DROP_HANDLE), "lookup for 1.2.3.4/32 1234 "
"5.6.7.8/16 4321 in local table should return deny");
tc = session_lookup_connection_wt4 (0, &lcl_pref.fp_addr.ip4,
"should fail (allow without app)");
SESSION_TEST ((is_filtered == 0), "lookup should NOT be filtered");
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index == APP_INVALID_INDEX), "lookup for 1.2.3.4/32 1234"
- " 5.6.7.8/32 4321 in local table should return invalid");
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle == SESSION_INVALID_HANDLE), "lookup for 1.2.3.4/32 "
+ "1234 5.6.7.8/32 4321 in local table should return invalid");
if (verbose)
{
}
sep.ip.ip4.as_u32 += 1 << 24;
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index == APP_DROP_INDEX), "lookup for 1.2.3.4/32 1234"
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle == SESSION_DROP_HANDLE), "lookup for 1.2.3.4/32 1234"
" 5.6.7.9/32 4321 in local table should return deny");
vnet_connect_args_t connect_args = {
TRANSPORT_PROTO_TCP);
}
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index == APP_DROP_INDEX),
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle == SESSION_DROP_HANDLE),
"local session endpoint lookup should return deny");
/*
error = vnet_session_rule_add_del (&args);
SESSION_TEST ((error == 0), "Del 1.2.3.4/32 1234 5.6.7.8/32 4321 deny");
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index == APP_INVALID_INDEX),
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle == SESSION_INVALID_HANDLE),
"local session endpoint lookup should return invalid");
/*
args.table_args.rmt_port = 4321;
error = vnet_session_rule_add_del (&args);
SESSION_TEST ((error == 0), "Del 0/0 * 5.6.7.8/16 4321");
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index != server_index), "local session endpoint lookup "
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle != server_index), "local session endpoint lookup "
"should not work (removed)");
args.table_args.is_add = 0;
/*
* Lookup default namespace
*/
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index == APP_INVALID_INDEX),
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle == SESSION_INVALID_HANDLE),
"lookup for 1.2.3.4/32 1234 5.6.7.8/32 4321 in local table "
"should return allow (invalid)");
sep.port += 1;
- app_index = session_lookup_local_endpoint (local_ns_index, &sep);
- SESSION_TEST ((app_index == APP_DROP_INDEX), "lookup for 1.2.3.4/32 1234 "
+ handle = session_lookup_local_endpoint (local_ns_index, &sep);
+ SESSION_TEST ((handle == SESSION_DROP_HANDLE), "lookup for 1.2.3.4/32 1234 "
"5.6.7.8/16 432*2* in local table should return deny");
connect_args.app_index = server_index;
/*
* Lookup test namespace
*/
- app_index = session_lookup_local_endpoint (app_ns->local_table_index, &sep);
- SESSION_TEST ((app_index == APP_DROP_INDEX), "lookup for 1.2.3.4/32 1234 "
+ handle = session_lookup_local_endpoint (app_ns->local_table_index, &sep);
+ SESSION_TEST ((handle == SESSION_DROP_HANDLE), "lookup for 1.2.3.4/32 1234 "
"5.6.7.8/16 4321 in local table should return deny");
connect_args.app_index = server_index;
SESSION_STATE_N_STATES,
} stream_session_state_t;
+/* TODO convert to macro once cleanup completed */
+typedef struct app_session_
+{
+ /** fifo pointers. Once allocated, these do not move */
+ svm_fifo_t *server_rx_fifo;
+ svm_fifo_t *server_tx_fifo;
+
+ /** Type */
+ session_type_t session_type;
+
+ /** State */
+ volatile u8 session_state;
+
+ /** Session index in owning pool */
+ u32 session_index;
+
+ /** Application index */
+ u32 app_index;
+} app_session_t;
+
typedef struct _stream_session_t
{
/** fifo pointers. Once allocated, these do not move */
/** State */
volatile u8 session_state;
+ /** Session index in per_thread pool */
+ u32 session_index;
+
+ /** stream server pool index */
+ u32 app_index;
+
u8 thread_index;
/** To avoid n**2 "one event per frame" check */
/** svm segment index where fifos were allocated */
u32 svm_segment_index;
- /** Session index in per_thread pool */
- u32 session_index;
-
/** Transport specific */
u32 connection_index;
- /** stream server pool index */
- u32 app_index;
-
/** Parent listener session if the result of an accept */
u32 listener_index;
CLIB_CACHE_LINE_ALIGN_MARK (pad);
} stream_session_t;
+typedef struct local_session_
+{
+ /** fifo pointers. Once allocated, these do not move */
+ svm_fifo_t *server_rx_fifo;
+ svm_fifo_t *server_tx_fifo;
+
+ /** Type */
+ session_type_t session_type;
+
+ /** State */
+ volatile u8 session_state;
+
+ /** Session index */
+ u32 session_index;
+
+ /** Server index */
+ u32 app_index;
+
+ /** Segment index where fifos were allocated */
+ u32 svm_segment_index;
+
+ u32 listener_index;
+
+ /** Port for connection */
+ u16 port;
+
+ /** Has transport embedded when listener not purely local */
+ session_type_t listener_session_type;
+
+ /**
+ * Client data
+ */
+ u32 client_index;
+ u32 client_opaque;
+
+ u64 server_evt_q;
+ u64 client_evt_q;
+
+ CLIB_CACHE_LINE_ALIGN_MARK (pad);
+} local_session_t;
+
typedef struct _session_endpoint
{
/*
TRANSPORT_PROTO_TCP,
TRANSPORT_PROTO_UDP,
TRANSPORT_PROTO_SCTP,
+ TRANSPORT_PROTO_NONE,
TRANSPORT_N_PROTO
} transport_proto_t;
self.logger.critical(error)
self.assertEqual(error.find("failed"), -1)
+ if self.vpp_dead:
+ self.assert_equal(0)
+
# Delete inter-table routes
ip_t01.remove_vpp_config()
ip_t10.remove_vpp_config()