/*
- * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
}
app_worker_t *
-application_listener_select_worker (stream_session_t * ls, u8 is_local)
+application_listener_select_worker (session_t * ls, u8 is_local)
{
app_listener_t *app_listener;
application_t *app;
}
int
-app_worker_start_listen (app_worker_t * app_wrk, stream_session_t * ls)
+app_worker_start_listen (app_worker_t * app_wrk, session_t * ls)
{
segment_manager_t *sm;
hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
segment_manager_index (sm));
- if (!ls->server_rx_fifo
+ if (!ls->rx_fifo
&& session_transport_service_type (ls) == TRANSPORT_SERVICE_CL)
{
if (session_alloc_fifos (sm, ls))
return 0;
}
+int
+app_worker_own_session (app_worker_t * app_wrk, session_t * s)
+{
+ segment_manager_t *sm;
+ svm_fifo_t *rxf, *txf;
+
+ if (s->session_state == SESSION_STATE_LISTENING)
+ {
+ app_worker_t *old_wrk = app_worker_get (s->app_wrk_index);
+ u64 lsh = listen_session_get_handle (s);
+ app_listener_t *app_listener;
+ application_t *app;
+
+ if (!old_wrk)
+ return -1;
+
+ hash_unset (old_wrk->listeners_table, lsh);
+ if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
+ return -1;
+
+ hash_set (app_wrk->listeners_table, lsh, segment_manager_index (sm));
+ s->app_wrk_index = app_wrk->wrk_index;
+
+ app = application_get (old_wrk->app_index);
+ if (!app)
+ return -1;
+
+ app_listener = app_listener_get (app, s->listener_db_index);
+ app_listener->workers = clib_bitmap_set (app_listener->workers,
+ app_wrk->wrk_map_index, 1);
+ app_listener->workers = clib_bitmap_set (app_listener->workers,
+ old_wrk->wrk_map_index, 0);
+ return 0;
+ }
+
+ s->app_wrk_index = app_wrk->wrk_index;
+
+ rxf = s->rx_fifo;
+ txf = s->tx_fifo;
+
+ if (!rxf || !txf)
+ return 0;
+
+ s->rx_fifo = 0;
+ s->tx_fifo = 0;
+
+ sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk);
+ if (session_alloc_fifos (sm, s))
+ return -1;
+
+ if (!svm_fifo_is_empty (rxf))
+ {
+ clib_memcpy_fast (s->rx_fifo->data, rxf->data, rxf->nitems);
+ s->rx_fifo->head = rxf->head;
+ s->rx_fifo->tail = rxf->tail;
+ s->rx_fifo->cursize = rxf->cursize;
+ }
+
+ if (!svm_fifo_is_empty (txf))
+ {
+ clib_memcpy_fast (s->tx_fifo->data, txf->data, txf->nitems);
+ s->tx_fifo->head = txf->head;
+ s->tx_fifo->tail = txf->tail;
+ s->tx_fifo->cursize = txf->cursize;
+ }
+
+ segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf);
+
+ return 0;
+}
+
/**
* Start listening local transport endpoint for requested transport.
*
u32 table_index, fib_proto;
session_endpoint_t *sep;
app_worker_t *app_wrk;
- stream_session_t *ls;
+ session_t *ls;
session_handle_t lh;
session_type_t sst;
session_handle_t handle)
{
app_listener_t *app_listener;
- stream_session_t *listener;
+ session_t *listener;
app_worker_t *app_wrk;
application_t *app;
return segment_manager_get (app->connects_seg_manager);
}
+segment_manager_t *
+app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk)
+{
+ if (app_wrk->connects_seg_manager == (u32) ~ 0)
+ app_worker_alloc_connects_segment_manager (app_wrk);
+ return segment_manager_get (app_wrk->connects_seg_manager);
+}
+
segment_manager_t *
app_worker_get_listen_segment_manager (app_worker_t * app,
- stream_session_t * listener)
+ session_t * listener)
{
uword *smp;
smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
application_get_local_segment_manager_w_session (app_worker_t * app,
local_session_t * ls)
{
- stream_session_t *listener;
+ session_t *listener;
if (application_local_session_listener_has_transport (ls))
{
listener = listen_session_get (ls->listener_index);
return hash_elts (app->listeners_table);
}
-stream_session_t *
+session_t *
app_worker_first_listener (app_worker_t * app, u8 fib_proto,
u8 transport_proto)
{
- stream_session_t *listener;
+ session_t *listener;
u64 handle;
u32 sm_index;
u8 sst;
return app_wrk->app_is_builtin;
}
-stream_session_t *
+session_t *
application_proxy_listener (app_worker_t * app, u8 fib_proto,
u8 transport_proto)
{
- stream_session_t *listener;
+ session_t *listener;
u64 handle;
u32 sm_index;
u8 sst;
session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
transport_connection_t *tc;
app_worker_t *app_wrk;
- stream_session_t *s;
+ session_t *s;
u64 handle;
/* TODO decide if we want proxy to be enabled for all workers */
}
static inline int
-app_send_io_evt_rx (app_worker_t * app_wrk, stream_session_t * s, u8 lock)
+app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock)
{
session_event_t *evt;
svm_msg_q_msg_t msg;
{
/* Session is closed so app will never clean up. Flush rx fifo */
if (s->session_state == SESSION_STATE_CLOSED)
- svm_fifo_dequeue_drop_all (s->server_rx_fifo);
+ svm_fifo_dequeue_drop_all (s->rx_fifo);
return 0;
}
return app->cb_fns.builtin_app_rx_callback (s);
}
- if (svm_fifo_has_event (s->server_rx_fifo)
- || svm_fifo_is_empty (s->server_rx_fifo))
+ if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo))
return 0;
mq = app_wrk->event_queue;
ASSERT (!svm_msg_q_msg_is_invalid (&msg));
evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
- evt->fifo = s->server_rx_fifo;
+ evt->fifo = s->rx_fifo;
evt->event_type = FIFO_EVENT_APP_RX;
- (void) svm_fifo_set_event (s->server_rx_fifo);
+ (void) svm_fifo_set_event (s->rx_fifo);
if (app_enqueue_evt (mq, &msg, lock))
return -1;
}
static inline int
-app_send_io_evt_tx (app_worker_t * app_wrk, stream_session_t * s, u8 lock)
+app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock)
{
svm_msg_q_t *mq;
session_event_t *evt;
evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
evt->event_type = FIFO_EVENT_APP_TX;
- evt->fifo = s->server_tx_fifo;
+ evt->fifo = s->tx_fifo;
return app_enqueue_evt (mq, &msg, lock);
}
/* *INDENT-OFF* */
typedef int (app_send_evt_handler_fn) (app_worker_t *app,
- stream_session_t *s,
+ session_t *s,
u8 lock);
static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
app_send_io_evt_rx,
* not enough space to enqueue a message, we return.
*/
int
-app_worker_send_event (app_worker_t * app, stream_session_t * s, u8 evt_type)
+app_worker_send_event (app_worker_t * app, session_t * s, u8 evt_type)
{
ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
* we return.
*/
int
-app_worker_lock_and_send_event (app_worker_t * app, stream_session_t * s,
+app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
u8 evt_type)
{
return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
u32 table_index, ll_index, server_index;
app_listener_t *app_listener;
app_worker_t *server_wrk;
- stream_session_t *sl = 0;
+ session_t *sl = 0;
local_session_t *ll, *ls;
application_t *server;
round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
- has_transport = session_has_transport ((stream_session_t *) ll);
+ has_transport = session_has_transport ((session_t *) ll);
if (!has_transport)
{
/* Local sessions don't have backing transport */
}
else
{
- stream_session_t *sl = (stream_session_t *) ll;
+ session_t *sl = (session_t *) ll;
transport_connection_t *tc;
tc = listen_session_get_transport (sl);
ls->port = tc->lcl_port;
ls->client_evt_q = pointer_to_uword (cq);
rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
props->tx_fifo_size,
- &ls->server_rx_fifo,
- &ls->server_tx_fifo);
+ &ls->rx_fifo, &ls->tx_fifo);
if (rv)
{
clib_warning ("failed to add fifos in cut-through segment");
goto failed;
}
sm_index = segment_manager_index (sm);
- ls->server_rx_fifo->ct_session_index = ls->session_index;
- ls->server_tx_fifo->ct_session_index = ls->session_index;
- ls->server_rx_fifo->segment_manager = sm_index;
- ls->server_tx_fifo->segment_manager = sm_index;
- ls->server_rx_fifo->segment_index = seg_index;
- ls->server_tx_fifo->segment_index = seg_index;
+ ls->rx_fifo->ct_session_index = ls->session_index;
+ ls->tx_fifo->ct_session_index = ls->session_index;
+ ls->rx_fifo->segment_manager = sm_index;
+ ls->tx_fifo->segment_manager = sm_index;
+ ls->rx_fifo->segment_index = seg_index;
+ ls->tx_fifo->segment_index = seg_index;
ls->svm_segment_index = seg_index;
ls->listener_index = ll->session_index;
ls->client_wrk_index = client_wrk->wrk_index;
goto failed;
}
segment_manager_segment_reader_unlock (sm);
- if ((rv = server->cb_fns.session_accept_callback ((stream_session_t *) ls)))
+ if ((rv = server->cb_fns.session_accept_callback ((session_t *) ls)))
{
clib_warning ("failed to send accept cut-through notify to server");
goto failed;
return rv;
}
-static uword
+static u64
application_client_local_connect_key (local_session_t * ls)
{
- return ((uword) ls->app_wrk_index << 32 | (uword) ls->session_index);
+ return (((u64) ls->app_wrk_index) << 32 | (u64) ls->session_index);
}
static void
-application_client_local_connect_key_parse (uword key, u32 * app_wrk_index,
+application_client_local_connect_key_parse (u64 key, u32 * app_wrk_index,
u32 * session_index)
{
*app_wrk_index = key >> 32;
application_t *client;
int rv, is_fail = 0;
u64 segment_handle;
- uword client_key;
+ u64 client_key;
client_wrk = app_worker_get (ls->client_wrk_index);
server_wrk = app_worker_get (ls->app_wrk_index);
client->cb_fns.session_connected_callback (client_wrk->wrk_index,
ls->client_opaque,
- (stream_session_t *) ls,
- is_fail);
+ (session_t *) ls, is_fail);
client_key = application_client_local_connect_key (ls);
hash_set (client_wrk->local_connects, client_key, client_key);
local_session_t * ls)
{
svm_fifo_segment_private_t *seg;
- stream_session_t *listener;
+ session_t *listener;
segment_manager_t *sm;
- uword client_key;
+ u64 client_key;
u8 has_transport;
/* Retrieve listener transport type as it is the one that decides where
application_t *client = application_get (client_wrk->app_index);
client->cb_fns.session_connected_callback (client_wrk->wrk_index,
ls->client_opaque,
- (stream_session_t *) ls,
+ (session_t *) ls,
1 /* is_fail */ );
ls->session_state = SESSION_STATE_CLOSED;
return application_local_session_cleanup (client_wrk, server_wrk,
u64 handle = va_arg (*args, u64);
u32 sm_index = va_arg (*args, u32);
int verbose = va_arg (*args, int);
- stream_session_t *listener;
+ session_t *listener;
const u8 *app_name;
u8 *str;
while (fifo)
{
u32 session_index, thread_index;
- stream_session_t *session;
+ session_t *session;
session_index = fifo->master_session_index;
thread_index = fifo->master_thread_index;
u32 app_wrk_index, session_index;
app_worker_t *server_wrk;
local_session_t *ls;
- uword client_key;
+ u64 client_key;
u64 value;
/* Header */