}
static int
-hc_request (session_t *s, hc_worker_t *wrk, hc_session_t *hc_session,
- session_error_t err)
+hc_request (session_t *s, hc_worker_t *wrk, hc_session_t *hc_session)
{
hc_main_t *hcm = &hc_main;
u64 to_send;
return 0;
}
-typedef struct
-{
- u64 parent_handle;
- u32 parent_index;
-} hc_connect_streams_args_t;
-
-static void
-hc_connect_streams_rpc (void *rpc_args)
+static int
+hc_connect_streams (u64 parent_handle, u32 parent_index)
{
- hc_connect_streams_args_t *args = rpc_args;
hc_main_t *hcm = &hc_main;
vnet_connect_args_t _a, *a = &_a;
hc_worker_t *wrk;
- hc_session_t *ho_hs;
+ hc_session_t *hs;
u32 i;
int rv;
+ session_t *s;
clib_memset (a, 0, sizeof (*a));
clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
- a->sep_ext.parent_handle = args->parent_handle;
+ a->sep_ext.parent_handle = parent_handle;
a->app_index = hcm->app_index;
+ wrk = hc_worker_get (session_thread_from_handle (parent_handle));
+
for (i = 0; i < (hcm->max_streams - 1); i++)
{
- /* allocate half-open session */
- wrk = hc_worker_get (transport_cl_thread ());
- ho_hs = hc_session_alloc (wrk);
- ho_hs->parent_index = args->parent_index;
- a->api_context = ho_hs->session_index;
+ hs = hc_session_alloc (wrk);
+ hs->parent_index = parent_index;
+ a->api_context = hs->session_index;
- rv = vnet_connect (a);
+ rv = vnet_connect_stream (a);
if (rv)
- clib_warning (0, "connect returned: %U", format_session_error, rv);
+ {
+ clib_warning (0, "connect returned: %U", format_session_error, rv);
+ if (rv == SESSION_E_MAX_STREAMS_HIT)
+ vlib_process_signal_event_mt (
+ vlib_get_main (), hcm->cli_node_index, HC_MAX_STREAMS_HIT, 0);
+ else
+ vlib_process_signal_event_mt (
+ vlib_get_main (), hcm->cli_node_index, HC_CONNECT_FAILED, 0);
+ return -1;
+ }
+ s = session_get_from_handle (a->sh);
+ hs->http_session_index = s->session_index;
+ hs->stats.max_req = hcm->reqs_per_session;
+ hs->stats.start = vlib_time_now (wrk->vlib_main);
+ if (hc_request (s, wrk, hs))
+ return -1;
}
- vec_free (args);
-}
-
-static void
-hc_connect_streams (u64 parent_handle, u32 parent_index)
-{
- hc_connect_streams_args_t *args = 0;
- vec_validate (args, 0);
- args->parent_handle = parent_handle;
- args->parent_index = parent_index;
-
- session_send_rpc_evt_to_thread_force (transport_cl_thread (),
- hc_connect_streams_rpc, args);
+ return 0;
}
static int
{
hc_main_t *hcm = &hc_main;
hc_worker_t *wrk;
- hc_session_t *hc_session, *ho_session, *parent_session;
+ hc_session_t *hc_session, *ho_session;
hc_http_header_t *header;
http_version_t http_version;
u8 *f = 0;
if (err)
{
clib_warning ("connected error: %U", format_session_error, err);
- if (err == SESSION_E_MAX_STREAMS_HIT)
- vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
- HC_MAX_STREAMS_HIT, 0);
- else
- vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
- HC_CONNECT_FAILED, 0);
+ vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
+ HC_CONNECT_FAILED, 0);
return -1;
}
hcm->connected_counter++;
clib_spinlock_unlock_if_init (&hcm->lock);
- if (hc_session->session_flags & HC_S_FLAG_IS_PARENT)
- {
- http_version = http_session_get_version (s);
- if (http_version == HTTP_VERSION_2 && hcm->max_streams > 1)
- {
- HTTP_DBG (1, "parent connected, going to open %u streams",
- hcm->max_streams - 1);
- hc_connect_streams (session_handle (s), hc_session->session_index);
- }
- }
- else
- {
- parent_session =
- hc_session_get (hc_session->parent_index, hc_session->thread_index);
- parent_session->child_count++;
- }
-
- hc_session->thread_index = s->thread_index;
hc_session->body_recv = 0;
s->opaque = hc_session->session_index;
wrk->session_index = hc_session->session_index;
hc_session->stats.start = vlib_time_now (wrk->vlib_main);
- return hc_request (s, wrk, hc_session, err);
+ if (hc_request (s, wrk, hc_session))
+ return -1;
+
+ http_version = http_session_get_version (s);
+ if (http_version == HTTP_VERSION_2 && hcm->max_streams > 1)
+ {
+ ASSERT (hc_session->session_flags & HC_S_FLAG_IS_PARENT);
+ HTTP_DBG (1, "parent connected, going to open %u streams",
+ hcm->max_streams - 1);
+ hc_session->child_count = hcm->max_streams - 1;
+ if (hc_connect_streams (session_handle (s), hc_session->session_index))
+ return -1;
+ }
+
+ return 0;
}
static void
http_msg_t msg;
int rv;
u32 max_deq;
- session_error_t session_err = 0;
int send_err = 0;
http_version_t http_version;
else
{
HTTP_DBG (1, "doing another repeat");
- send_err = hc_request (s, wrk, hc_session, session_err);
+ send_err = hc_request (s, wrk, hc_session);
if (send_err)
clib_warning ("failed to send request, error %d", send_err);
}
hcpc_main_t *hcpcm = &hcpc_main;
vnet_connect_args_t _a, *a = &_a;
session_error_t rv;
+ session_t *s;
+ hcpc_session_t *ps;
clib_memset (a, 0, sizeof (*a));
clib_memcpy (&a->sep_ext, &hcpcm->proxy_server_sep,
a->app_index = hcpcm->http_app_index;
a->api_context = session_index;
- rv = vnet_connect (a);
+ rv = vnet_connect_stream (a);
if (rv)
- clib_warning ("connect returned: %U", format_session_error, rv);
+ {
+ clib_warning ("session %u connect error: %U", session_index,
+ format_session_error, rv);
+ clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+ ps = hcpc_session_get (session_index);
+ ASSERT (ps);
+ ps->state = HCPC_SESSION_CLOSED;
+ ps->http_disconnected = 1;
+ ps->http_establishing = 0;
+ if (!ps->intercept_diconnected)
+ {
+ ps->intercept.rx_fifo->master_thread_index =
+ ps->intercept.tx_fifo->master_thread_index;
+ if (ps->intercept.session_handle != SESSION_INVALID_HANDLE)
+ {
+ session_reset (
+ session_get_from_handle (ps->intercept.session_handle));
+ ps->intercept_diconnected = 1;
+ }
+ else
+ {
+ if (session_thread_from_handle (hcpcm->http_connection_handle) !=
+ ps->intercept.tx_fifo->master_thread_index)
+ {
+ session_send_rpc_evt_to_thread (
+ ps->intercept.tx_fifo->master_thread_index,
+ hcpc_session_postponed_free_rpc,
+ uword_to_pointer (ps->session_index, void *));
+ }
+ else
+ hcpc_session_free (ps);
+ }
+ if (rv == SESSION_E_MAX_STREAMS_HIT)
+ hcpc_worker_stats_inc (vlib_get_thread_index (), max_streams_hit,
+ 1);
+ }
+ clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+ return;
+ }
+
+ HCPC_DBG ("stream for session [%u] opened", session_index);
+ s = session_get_from_handle (a->sh);
+ clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+ ps = hcpc_session_get (session_index);
+ ASSERT (ps);
+ ps->http.session_handle = session_handle (s);
+ ps->http.rx_fifo = s->rx_fifo;
+ ps->http.tx_fifo = s->tx_fifo;
+
+ /* listener session was already closed */
+ if (ps->intercept_diconnected)
+ {
+ hcpc_worker_stats_inc (s->thread_index,
+ client_closed_before_stream_opened, 1);
+ session_reset (s);
+ ps->http_establishing = 0;
+ ps->http_disconnected = 1;
+ clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+ return;
+ }
+
+ clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+
+ if (svm_fifo_max_dequeue (s->tx_fifo))
+ if (svm_fifo_set_event (s->tx_fifo))
+ session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
}
static void
static void
hcpc_connect_http_stream (u32 session_index)
{
- u32 connects_thread = transport_cl_thread (), thread_index;
+ hcpc_main_t *hcpcm = &hcpc_main;
+ u32 connects_thread, thread_index;
+ connects_thread = session_thread_from_handle (hcpcm->http_connection_handle);
thread_index = vlib_get_thread_index ();
if (thread_index == connects_thread)
}
session_send_rpc_evt_to_thread_force (
- transport_cl_thread (), hcpc_connect_http_stream_rpc,
+ connects_thread, hcpc_connect_http_stream_rpc,
uword_to_pointer (session_index, void *));
}
hcpc_main_t *hcpcm = &hcpc_main;
hcpc_session_t *ps;
+ ASSERT (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE);
+
if (err)
{
- clib_warning ("session %u connect error: %U", session_index,
+ clib_warning ("connect to http proxy server failed: %U",
format_session_error, err);
-
- /* connect to http proxy server failed */
- if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE)
- return 0;
-
- clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
- ps = hcpc_session_get (session_index);
- ASSERT (ps);
- ps->state = HCPC_SESSION_CLOSED;
- ps->http_disconnected = 1;
- ps->http_establishing = 0;
- if (!ps->intercept_diconnected)
- {
- if (ps->intercept.session_handle != SESSION_INVALID_HANDLE)
- {
- session_reset (
- session_get_from_handle (ps->intercept.session_handle));
- ps->intercept_diconnected = 1;
- }
- else
- hcpc_delete_session (s, 1);
- }
- clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
- if (err == SESSION_E_MAX_STREAMS_HIT)
- hcpc_worker_stats_inc (vlib_get_thread_index (), max_streams_hit, 1);
- return 0;
- }
-
- if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE)
- {
- HCPC_DBG ("parent session connected");
- clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
- ps = hcpc_session_alloc ();
- ps->http.session_handle = session_handle (s);
- ps->http.rx_fifo = s->rx_fifo;
- ps->http.tx_fifo = s->tx_fifo;
- ps->flags |= HCPC_SESSION_F_IS_PARENT;
- clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
- s->opaque = ps->session_index;
- hcpcm->http_connection_handle = session_handle (s);
- vlib_process_signal_event_mt (vlib_get_main (),
- hcpcm->process_node_index,
- HCPC_EVENT_PROXY_CONNECTED, 0);
return 0;
}
- HCPC_DBG ("stream for session [%u] opened", session_index);
+ HCPC_DBG ("parent session connected");
clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
- ps = hcpc_session_get (session_index);
- ASSERT (ps);
+ ps = hcpc_session_alloc ();
ps->http.session_handle = session_handle (s);
ps->http.rx_fifo = s->rx_fifo;
ps->http.tx_fifo = s->tx_fifo;
-
- /* listener session was already closed */
- if (ps->intercept_diconnected)
- {
- hcpc_worker_stats_inc (s->thread_index,
- client_closed_before_stream_opened, 1);
- session_reset (s);
- ps->http_establishing = 0;
- ps->http_disconnected = 1;
- clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
- return -1;
- }
-
+ ps->flags |= HCPC_SESSION_F_IS_PARENT;
clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
-
- if (svm_fifo_max_dequeue (s->tx_fifo))
- if (svm_fifo_set_event (s->tx_fifo))
- session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
-
+ s->opaque = ps->session_index;
+ hcpcm->http_connection_handle = session_handle (s);
+ vlib_process_signal_event_mt (vlib_get_main (), hcpcm->process_node_index,
+ HCPC_EVENT_PROXY_CONNECTED, 0);
return 0;
}
u8 *app_headers;
int rv;
- as = session_get_from_handle (req->hr_pa_session_handle);
+ as = session_get (req->c_s_index, req->c_thread_index);
if (msg->data.type == HTTP_MSG_DATA_PTR)
{
u8 *target;
int rv;
- as = session_get_from_handle (req->hr_pa_session_handle);
+ as = session_get (req->c_s_index, req->c_thread_index);
if (msg->data.type == HTTP_MSG_DATA_PTR)
{
void
http_req_tx_buffer_init (http_req_t *req, http_msg_t *msg)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
http_buffer_init (&req->tx_buf, msg_to_buf_type[msg->data.type], as->tx_fifo,
msg->data.body_len);
}
vec_validate (hm->wrk, num_threads - 1);
vec_foreach (wrk, hm->wrk)
{
- clib_spinlock_init (&wrk->pending_stream_connects_lock);
clib_memset (&wrk->stats, 0, sizeof (wrk->stats));
}
vec_validate (hm->rx_bufs, num_threads - 1);
}
static int
-http_connect_stream (u64 parent_handle, u32 opaque)
+http_connect_stream (u64 parent_handle, u32 *req_index)
{
session_t *hs;
http_req_handle_t rh;
return -1;
}
- return http_vfts[rh.version].conn_connect_stream_callback (hc, opaque);
-}
-
-static void
-http_handle_stream_connects_rpc (void *args)
-{
- clib_thread_index_t thread_index = pointer_to_uword (args);
- http_worker_t *wrk;
- u32 n_pending, max_connects, n_connects = 0;
- http_pending_connect_stream_t *pc;
-
- wrk = http_worker_get (thread_index);
-
- clib_spinlock_lock (&wrk->pending_stream_connects_lock);
-
- n_pending = clib_fifo_elts (wrk->pending_connect_streams);
- max_connects = clib_min (32, n_pending);
- vec_validate (wrk->burst_connect_streams, max_connects);
-
- while (n_connects < max_connects)
- clib_fifo_sub1 (wrk->pending_connect_streams,
- wrk->burst_connect_streams[n_connects++]);
-
- clib_spinlock_unlock (&wrk->pending_stream_connects_lock);
-
- n_connects = 0;
- while (n_connects < max_connects)
- {
- pc = &wrk->burst_connect_streams[n_connects++];
- http_connect_stream (pc->parent_handle, pc->opaque);
- }
-
- /* more work to do? */
- if (max_connects < n_pending)
- session_send_rpc_evt_to_thread_force (
- thread_index, http_handle_stream_connects_rpc,
- uword_to_pointer ((uword) thread_index, void *));
+ return http_vfts[rh.version].conn_connect_stream_callback (hc, req_index);
}
static int
-http_program_connect_stream (session_endpoint_cfg_t *sep)
+http_transport_connect (transport_endpoint_cfg_t *tep)
{
- clib_thread_index_t parent_thread_index =
- session_thread_from_handle (sep->parent_handle);
- http_worker_t *wrk;
- u32 n_pending;
-
- ASSERT (session_vlib_thread_is_cl_thread ());
-
- /* if we are already on same worker as parent, handle connect */
- if (parent_thread_index == transport_cl_thread ())
- return http_connect_stream (sep->parent_handle, sep->opaque);
-
- /* if not on same worker as parent, queue request */
- wrk = http_worker_get (parent_thread_index);
-
- clib_spinlock_lock (&wrk->pending_stream_connects_lock);
-
- http_pending_connect_stream_t p = { .parent_handle = sep->parent_handle,
- .opaque = sep->opaque };
- clib_fifo_add1 (wrk->pending_connect_streams, p);
- n_pending = clib_fifo_elts (wrk->pending_connect_streams);
-
- clib_spinlock_unlock (&wrk->pending_stream_connects_lock);
-
- if (n_pending == 1)
- session_send_rpc_evt_to_thread_force (
- parent_thread_index, http_handle_stream_connects_rpc,
- uword_to_pointer ((uword) parent_thread_index, void *));
+ session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
- return 0;
+ ASSERT (sep->parent_handle == SESSION_INVALID_HANDLE);
+ return http_connect_connection (sep);
}
static int
-http_transport_connect (transport_endpoint_cfg_t *tep)
+http_transport_connect_stream (transport_endpoint_cfg_t *tep,
+ CLIB_UNUSED (session_t *stream_session),
+ u32 *conn_index)
{
session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
- session_t *hs;
- hs = session_get_from_handle_if_valid (sep->parent_handle);
- if (hs)
- return http_program_connect_stream (sep);
- else
- return http_connect_connection (sep);
+ return http_connect_stream (sep->parent_handle, conn_index);
}
static u32
static const transport_proto_vft_t http_proto = {
.enable = http_transport_enable,
.connect = http_transport_connect,
+ .connect_stream = http_transport_connect_stream,
.start_listen = http_start_listen,
.stop_listen = http_stop_listen,
.half_close = http_transport_shutdown,
pool_get_aligned_safe (h1m->req_pool[hc->c_thread_index], req,
CLIB_CACHE_LINE_BYTES);
clib_memset (req, 0, sizeof (*req));
- req->hr_pa_session_handle = SESSION_INVALID_HANDLE;
+ req->c_s_index = SESSION_INVALID_INDEX;
req_index = req - h1m->req_pool[hc->c_thread_index];
hr_handle.version = HTTP_VERSION_1;
hr_handle.req_index = req_index;
req = http1_conn_alloc_req (hc);
http_req_state_change (req, HTTP_REQ_STATE_WAIT_APP_METHOD);
http_stats_connections_established_inc (hc->c_thread_index);
- return http_conn_established (hc, req, hc->hc_pa_app_api_ctx, 0);
+ return http_conn_established (hc, req, hc->hc_pa_app_api_ctx);
}
static void
pool_get_aligned_safe (wrk->req_pool, req, CLIB_CACHE_LINE_BYTES);
clib_memset (req, 0, sizeof (*req));
- req->base.hr_pa_session_handle = SESSION_INVALID_HANDLE;
+ req->base.c_s_index = SESSION_INVALID_INDEX;
req_index = req - wrk->req_pool;
hr_handle.version = HTTP_VERSION_2;
hr_handle.req_index = req_index;
http2_default_conn_settings.header_table_size);
http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD);
http_stats_connections_established_inc (hc->c_thread_index);
- if (http_conn_established (hc, &req->base, hc->hc_pa_app_api_ctx, 0))
+ if (http_conn_established (hc, &req->base, hc->hc_pa_app_api_ctx))
return HTTP2_ERROR_INTERNAL_ERROR;
}
}
static int
-http2_conn_connect_stream_callback (http_conn_t *hc, u32 parent_app_api_ctx)
+http2_conn_connect_stream_callback (http_conn_t *hc, u32 *req_index)
{
http2_conn_ctx_t *h2c;
http2_req_t *req;
- app_worker_t *app_wrk;
- int rv;
HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
h2c = http2_conn_ctx_get_w_thread (hc);
ASSERT (!(hc->flags & HTTP_CONN_F_IS_SERVER));
ASSERT (!(h2c->flags & HTTP2_CONN_F_EXPECT_SERVER_SETTINGS));
- app_wrk = app_worker_get_if_valid (hc->hc_pa_wrk_index);
- if (!app_wrk)
- return -1;
if (h2c->req_num == h2c->settings.max_concurrent_streams)
- return app_worker_connect_notify (app_wrk, 0, SESSION_E_MAX_STREAMS_HIT,
- parent_app_api_ctx);
+ return SESSION_E_MAX_STREAMS_HIT;
req = http2_conn_alloc_req (hc, 0);
+ req->base.hr_pa_wrk_index = hc->hc_pa_wrk_index;
http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD);
- rv = http_conn_established (hc, &req->base, parent_app_api_ctx, 1);
- if (rv != 0)
- http2_conn_free_req (h2c, req, hc->c_thread_index);
- return rv;
+ *req_index = req->base.hr_req_handle;
+ return SESSION_E_NONE;
}
static void
http_req_id_t c_http_req_id;
};
#define hr_pa_wrk_index c_http_req_id.parent_app_wrk_index
-#define hr_pa_session_handle c_http_req_id.app_session_handle
#define hr_hc_index c_http_req_id.hc_index
#define hr_req_handle connection.c_index
typedef struct http_worker_
{
http_conn_t *conn_pool;
- clib_spinlock_t pending_stream_connects_lock;
- http_pending_connect_stream_t *pending_connect_streams;
- http_pending_connect_stream_t *burst_connect_streams;
http_wrk_stats_t stats;
} http_worker_t;
void (*transport_conn_reschedule_callback) (http_conn_t *hc);
void (*conn_accept_callback) (http_conn_t *hc); /* optional */
int (*conn_connect_stream_callback) (http_conn_t *hc,
- u32 parent_app_api_ctx); /* optional */
+ u32 *req_index); /* optional */
void (*conn_cleanup_callback) (http_conn_t *hc);
void (*enable_callback) (void); /* optional */
uword (*unformat_cfg_callback) (unformat_input_t *input); /* optional */
session_t *as;
app_worker_t *app_wrk;
- as = session_get_from_handle (req->hr_pa_session_handle);
+ as = session_get (req->c_s_index, req->c_thread_index);
if (!(as->flags & SESSION_F_RX_EVT))
{
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
session_t *as;
int rv;
- as = session_get_from_handle (req->hr_pa_session_handle);
+ as = session_get (req->c_s_index, req->c_thread_index);
rv = svm_fifo_dequeue (as->tx_fifo, sizeof (*msg), (u8 *) msg);
ASSERT (rv == sizeof (*msg));
}
always_inline void
http_io_as_add_want_deq_ntf (http_req_t *req)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
}
always_inline void
http_io_as_add_want_read_ntf (http_req_t *req)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
}
always_inline void
http_io_as_del_want_read_ntf (http_req_t *req)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
svm_fifo_del_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
}
always_inline void
http_io_as_reset_has_read_ntf (http_req_t *req)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
svm_fifo_reset_has_deq_ntf (as->rx_fifo);
}
always_inline void
http_io_as_dequeue_notify (http_req_t *req, u32 n_last_deq)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
if (svm_fifo_needs_deq_ntf (as->tx_fifo, n_last_deq))
session_dequeue_notify (as);
}
always_inline u32
http_io_as_max_write (http_req_t *req)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
return svm_fifo_max_enqueue_prod (as->rx_fifo);
}
always_inline u32
http_io_as_max_read (http_req_t *req)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
return svm_fifo_max_dequeue_cons (as->tx_fifo);
}
http_io_as_write (http_req_t *req, u8 *data, u32 len)
{
int n_written;
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
n_written = svm_fifo_enqueue (as->rx_fifo, len, data);
ASSERT (n_written == len);
u32 n_segs)
{
int n_written;
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
n_written = svm_fifo_enqueue_segments (as->rx_fifo, segs, n_segs, 0);
ASSERT (n_written > 0);
return (u32) n_written;
http_io_as_peek (http_req_t *req, u8 *buf, u32 len, u32 offset)
{
int n_read;
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
n_read = svm_fifo_peek (as->tx_fifo, offset, len, buf);
ASSERT (n_read > 0);
u32 max_bytes)
{
int n_read;
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
n_read = svm_fifo_segments (as->tx_fifo, 0, segs, n_segs, max_bytes);
ASSERT (n_read > 0);
return (u32) n_read;
always_inline void
http_io_as_drain (http_req_t *req, u32 len)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
svm_fifo_dequeue_drop (as->tx_fifo, len);
}
always_inline void
http_io_as_drain_all (http_req_t *req)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
svm_fifo_dequeue_drop_all (as->tx_fifo);
}
always_inline void
http_io_as_drain_unread (http_req_t *req)
{
- session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ session_t *as = session_get (req->c_s_index, req->c_thread_index);
svm_fifo_dequeue_drop_all (as->rx_fifo);
}
if ((rv = app_worker_init_accepted (as)))
{
HTTP_DBG (1, "failed to allocate fifos");
- req->hr_pa_session_handle = SESSION_INVALID_HANDLE;
+ req->c_s_index = SESSION_INVALID_INDEX;
session_free (as);
hc->flags |= HTTP_CONN_F_NO_APP_SESSION;
return rv;
}
- req->hr_pa_session_handle = session_handle (as);
req->hr_pa_wrk_index = as->app_wrk_index;
app_wrk = app_worker_get (as->app_wrk_index);
if ((rv = app_worker_accept_notify (app_wrk, as)))
{
HTTP_DBG (1, "app accept returned");
- req->hr_pa_session_handle = SESSION_INVALID_HANDLE;
+ req->c_s_index = SESSION_INVALID_INDEX;
session_free (as);
hc->flags |= HTTP_CONN_F_NO_APP_SESSION;
return rv;
always_inline int
http_conn_established (http_conn_t *hc, http_req_t *req,
- u32 parent_app_api_ctx, u8 is_stream)
+ u32 parent_app_api_ctx)
{
session_t *as;
app_worker_t *app_wrk;
http_conn_t *ho_hc;
int rv;
- if (!is_stream)
- {
- ho_hc = http_ho_conn_get (hc->ho_index);
- /* in chain with TLS there is race on half-open cleanup */
- __atomic_fetch_or (&ho_hc->flags, HTTP_CONN_F_HO_DONE, __ATOMIC_RELEASE);
- }
+ ho_hc = http_ho_conn_get (hc->ho_index);
+ /* in chain with TLS there is race on half-open cleanup */
+ __atomic_fetch_or (&ho_hc->flags, HTTP_CONN_F_HO_DONE, __ATOMIC_RELEASE);
/* allocate app session and initialize */
as = session_alloc (hc->c_thread_index);
app_worker_connect_notify (app_wrk, as, 0, parent_app_api_ctx);
- req->hr_pa_session_handle = session_handle (as);
req->hr_pa_wrk_index = as->app_wrk_index;
return 0;