"strings"
"time"
+ "github.com/edwarnicke/exechelper"
+
. "fd.io/hs-test/infra"
)
func init() {
RegisterH2Tests(Http2TcpGetTest, Http2TcpPostTest, Http2MultiplexingTest, Http2TlsTest, Http2ContinuationTxTest, Http2ServerMemLeakTest,
- Http2ClientGetTest, Http2ClientPostTest, Http2ClientPostPtrTest, Http2ClientGetRepeatTest)
- RegisterH2MWTests(Http2MultiplexingMWTest)
+ Http2ClientGetTest, Http2ClientPostTest, Http2ClientPostPtrTest, Http2ClientGetRepeatTest, Http2ClientMultiplexingTest)
+ RegisterH2MWTests(Http2MultiplexingMWTest, Http2ClientMultiplexingMWTest)
RegisterVethTests(Http2CliTlsTest, Http2ClientContinuationTest)
}
s.Log(o)
}
+func Http2ClientMultiplexingTest(s *Http2Suite) {
+ vpp := s.Containers.Vpp.VppInstance
+ serverAddress := s.HostAddr() + ":" + s.Ports.Port2
+
+ s.CreateNginxServer()
+ s.AssertNil(s.Containers.NginxServer.Start())
+
+ uri := "https://" + serverAddress + "/httpTestFile"
+ cmd := fmt.Sprintf("http client http2 streams %d repeat %d uri %s", 10, 20, uri)
+ o := vpp.Vppctl(cmd)
+ s.Log(o)
+ s.AssertContains(o, "20 request(s)")
+ logPath := s.Containers.NginxServer.GetHostWorkDir() + "/" + s.Containers.NginxServer.Name + "-access.log"
+ logContents, err := exechelper.Output("cat " + logPath)
+ s.Log(string(logContents))
+ s.AssertNil(err)
+ s.AssertContains(string(logContents), "conn_reqs=20")
+
+ /* test session cleanup */
+ httpStreamCleanupDone := false
+ tcpSessionCleanupDone := false
+ for nTries := 0; nTries < 30; nTries++ {
+ o := vpp.Vppctl("show session verbose")
+ if !strings.Contains(o, "[T]") {
+ tcpSessionCleanupDone = true
+ }
+ if !strings.Contains(o, "[H2]") {
+ httpStreamCleanupDone = true
+ }
+ if httpStreamCleanupDone && tcpSessionCleanupDone {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ }
+ s.AssertEqual(true, tcpSessionCleanupDone, "TCP session not cleaned up")
+ s.AssertEqual(true, httpStreamCleanupDone, "HTTP/2 stream not cleaned up")
+}
+
+func Http2ClientMultiplexingMWTest(s *Http2Suite) {
+ s.CpusPerVppContainer = 3
+ s.SetupTest()
+
+ vpp := s.Containers.Vpp.VppInstance
+ serverAddress := s.HostAddr() + ":" + s.Ports.Port2
+
+ s.CreateNginxServer()
+ s.AssertNil(s.Containers.NginxServer.Start())
+
+ uri := "https://" + serverAddress + "/httpTestFile"
+ cmd := fmt.Sprintf("http client http2 sessions 2 streams %d repeat %d uri %s", 5, 20, uri)
+ o := vpp.Vppctl(cmd)
+ s.Log(o)
+ s.AssertContains(o, "20 request(s)")
+ logPath := s.Containers.NginxServer.GetHostWorkDir() + "/" + s.Containers.NginxServer.Name + "-access.log"
+ logContents, err := exechelper.Output("cat " + logPath)
+ s.Log(string(logContents))
+ s.AssertNil(err)
+ s.AssertEqual(2, strings.Count(string(logContents), "conn_reqs=10"))
+}
+
func Http2ClientContinuationTest(s *VethsSuite) {
serverAddress := s.Interfaces.Server.Ip4AddressString() + ":" + s.Ports.Port1
}
func (s *Http2Suite) TeardownTest() {
- s.HstSuite.TeardownTest()
+ defer s.HstSuite.TeardownTest()
+ vpp := s.Containers.Vpp.VppInstance
+ if CurrentSpecReport().Failed() {
+ s.Log(vpp.Vppctl("show session verbose 2"))
+ s.Log(vpp.Vppctl("show error"))
+ }
}
func (s *Http2Suite) VppAddr() string {
})
testCases := []struct {
- desc string
- portOffset int
+ desc string
+ portOffset int
+ clientExtraArgs string
}{
+ // some tests are testing error conditions after request is completed so in this run http client with repeat
{desc: "client/1/1", portOffset: 0},
{desc: "client/4.1/1", portOffset: 1},
{desc: "client/4.1/2", portOffset: 2},
//{desc: "client/5.1/6", portOffset: 13},
//{desc: "client/5.1/7", portOffset: 14},
{desc: "client/5.1/8", portOffset: 15},
- {desc: "client/5.1/9", portOffset: 16},
- {desc: "client/5.1/10", portOffset: 17},
+ {desc: "client/5.1/9", portOffset: 16, clientExtraArgs: "repeat 2 "},
+ {desc: "client/5.1/10", portOffset: 17, clientExtraArgs: "repeat 2 "},
{desc: "client/5.1.1/1", portOffset: 18},
{desc: "client/5.4.1/1", portOffset: 19},
{desc: "client/5.4.1/2", portOffset: 20},
//{desc: "client/6.3/2", portOffset: 29},
{desc: "client/6.4/1", portOffset: 30},
{desc: "client/6.4/2", portOffset: 31},
- {desc: "client/6.4/3", portOffset: 32},
+ {desc: "client/6.4/3", portOffset: 32, clientExtraArgs: "repeat 2 "},
{desc: "client/6.5/1", portOffset: 33},
{desc: "client/6.5/2", portOffset: 34},
{desc: "client/6.5/3", portOffset: 35},
{desc: "client/6.9.1/1", portOffset: 49},
// TODO: message framing without content length using END_STREAM flag
//{desc: "client/6.9.1/2", portOffset: 50},
- {desc: "client/6.10/1", portOffset: 51},
+ {desc: "client/6.10/1", portOffset: 51, clientExtraArgs: "repeat 2 "},
{desc: "client/6.10/2", portOffset: 52},
{desc: "client/6.10/3", portOffset: 53},
- {desc: "client/6.10/4", portOffset: 54},
- {desc: "client/6.10/5", portOffset: 55},
+ {desc: "client/6.10/4", portOffset: 54, clientExtraArgs: "repeat 2 "},
+ {desc: "client/6.10/5", portOffset: 55, clientExtraArgs: "repeat 2 "},
{desc: "client/6.10/6", portOffset: 56},
}
go h2spec.RunClientSpec(conf)
- cmd := fmt.Sprintf("http client timeout 5 verbose uri https://%s:%d/", serverAddress, h2specdFromPort+test.portOffset)
- res := s.Containers.Vpp.VppInstance.Vppctl(cmd)
- s.Log(res)
- s.AssertNotContains(res, "error: timeout")
+ cmd := fmt.Sprintf("http client timeout 5 %s uri https://%s:%d/", test.clientExtraArgs, serverAddress, h2specdFromPort+test.portOffset)
+ s.Log(s.Containers.Vpp.VppInstance.Vppctl(cmd))
oChan := make(chan string)
go func() {
}
http {
+ log_format access_log_fmt '$remote_addr - $remote_user [$time_local] '
+ '"$request" $status $body_bytes_sent '
+ '"$http_referer" "$http_user_agent" conn=$connection conn_reqs=$connection_requests';
keepalive_timeout 300s;
keepalive_requests 1000000;
client_body_timeout {{.Timeout}}s;
send_timeout {{.Timeout}}s;
sendfile on;
server {
- access_log /tmp/nginx/{{.LogPrefix}}-access.log;
+ access_log /tmp/nginx/{{.LogPrefix}}-access.log access_log_fmt;
listen {{.Port}};
listen {{.PortSsl}} ssl;
server_name {{.Address}};
#define foreach_hc_s_flag \
_ (1, IS_CLOSED) \
_ (2, PRINTABLE_BODY) \
- _ (4, CHUNKED_BODY)
+ _ (4, CHUNKED_BODY) \
+ _ (8, IS_PARENT)
typedef enum hc_s_flag_
{
typedef struct
{
- u64 req_per_wrk;
+ u64 max_req;
u64 request_count;
f64 start, end;
f64 elapsed_time;
u8 *http_response;
u8 *response_status;
FILE *file_ptr;
+ union
+ {
+ u32 child_count;
+ u32 parent_index;
+ };
+ u32 http_session_index;
} hc_session_t;
typedef struct
bool verbose;
f64 timeout;
http_req_method_t req_method;
- u64 repeat_count;
+ u64 reqs_per_session;
+ u64 reqs_remainder;
f64 duration;
bool repeat;
bool multi_session;
u32 connected_counter;
u32 worker_index;
u32 max_sessions;
+ u32 max_streams;
u32 private_segment_size;
u32 prealloc_fifos;
u32 fifo_size;
HC_GENERIC_ERR,
HC_FOPEN_FAILED,
HC_REPEAT_DONE,
+ HC_MAX_STREAMS_HIT,
} hc_cli_signal_t;
#define mime_printable_max_len 35
pool_get_zero (wrk->sessions, s);
s->session_index = s - wrk->sessions;
s->thread_index = wrk->thread_index;
+ HTTP_DBG (1, "[%u]%u", s->thread_index, s->session_index);
return s;
}
return 0;
}
+typedef struct
+{
+ u64 parent_handle;
+ u32 parent_index;
+} hc_connect_streams_args_t;
+
+static void
+hc_connect_streams_rpc (void *rpc_args)
+{
+ hc_connect_streams_args_t *args = rpc_args;
+ hc_main_t *hcm = &hc_main;
+ vnet_connect_args_t _a, *a = &_a;
+ hc_worker_t *wrk;
+ hc_session_t *ho_hs;
+ u32 i;
+ int rv;
+
+ clib_memset (a, 0, sizeof (*a));
+ clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
+ a->sep_ext.parent_handle = args->parent_handle;
+ a->app_index = hcm->app_index;
+
+ for (i = 0; i < (hcm->max_streams - 1); i++)
+ {
+ /* allocate half-open session */
+ wrk = hc_worker_get (transport_cl_thread ());
+ ho_hs = hc_session_alloc (wrk);
+ ho_hs->parent_index = args->parent_index;
+ a->api_context = ho_hs->session_index;
+
+ rv = vnet_connect (a);
+ if (rv)
+ clib_warning (0, "connect returned: %U", format_session_error, rv);
+ }
+ vec_free (args);
+}
+
+static void
+hc_connect_streams (u64 parent_handle, u32 parent_index)
+{
+ hc_connect_streams_args_t *args = 0;
+
+ vec_validate (args, 0);
+ args->parent_handle = parent_handle;
+ args->parent_index = parent_index;
+
+ session_send_rpc_evt_to_thread_force (transport_cl_thread (),
+ hc_connect_streams_rpc, args);
+}
+
static int
-hc_session_connected_callback (u32 app_index, u32 hc_session_index,
- session_t *s, session_error_t err)
+hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s,
+ session_error_t err)
{
hc_main_t *hcm = &hc_main;
hc_worker_t *wrk;
- hc_session_t *hc_session;
+ hc_session_t *hc_session, *ho_session, *parent_session;
hc_http_header_t *header;
+ http_version_t http_version;
u8 *f = 0;
+ u32 s_index;
if (err)
{
- clib_warning ("hc_session_index[%d] connected error: %U",
- hc_session_index, format_session_error, err);
- vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
- HC_CONNECT_FAILED, 0);
+ clib_warning ("connected error: %U", format_session_error, err);
+ if (err == SESSION_E_MAX_STREAMS_HIT)
+ vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
+ HC_MAX_STREAMS_HIT, 0);
+ else
+ vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
+ HC_CONNECT_FAILED, 0);
return -1;
}
+ ho_session = hc_session_get (ho_index, transport_cl_thread ());
wrk = hc_worker_get (s->thread_index);
-
hc_session = hc_session_alloc (wrk);
+ s_index = hc_session->session_index;
+ clib_memcpy_fast (hc_session, ho_session, sizeof (*hc_session));
+ hc_session->session_index = s_index;
+ hc_session->thread_index = s->thread_index;
+ hc_session->http_session_index = s->session_index;
+
clib_spinlock_lock_if_init (&hcm->lock);
hcm->connected_counter++;
clib_spinlock_unlock_if_init (&hcm->lock);
+ if (hc_session->session_flags & HC_S_FLAG_IS_PARENT)
+ {
+ http_version = http_session_get_version (s);
+ if (http_version == HTTP_VERSION_2 && hcm->max_streams > 1)
+ {
+ HTTP_DBG (1, "parent connected, going to open %u streams",
+ hcm->max_streams - 1);
+ hc_connect_streams (session_handle (s), hc_session->session_index);
+ }
+ }
+ else
+ {
+ parent_session =
+ hc_session_get (hc_session->parent_index, hc_session->thread_index);
+ parent_session->child_count++;
+ }
+
hc_session->thread_index = s->thread_index;
hc_session->body_recv = 0;
s->opaque = hc_session->session_index;
if (hcm->multi_session)
{
- hc_session->stats.req_per_wrk = hcm->repeat_count / hcm->max_sessions;
+ hc_session->stats.max_req = hcm->reqs_per_session;
clib_spinlock_lock_if_init (&hcm->lock);
/* add remaining requests to the first connected session */
if (hcm->connected_counter == 1)
{
- hc_session->stats.req_per_wrk +=
- hcm->repeat_count % hcm->max_sessions;
+ hc_session->stats.max_req += hcm->reqs_remainder;
}
clib_spinlock_unlock_if_init (&hcm->lock);
}
else
{
- hc_session->stats.req_per_wrk = hcm->repeat_count;
+ hc_session->stats.max_req = hcm->reqs_per_session;
hcm->worker_index = s->thread_index;
}
if (hcm->filename)
}
/* send an event when all sessions are closed */
- if (hcm->done_count >= hcm->max_sessions)
+ if (hcm->done_count >= (hcm->max_sessions * hcm->max_streams))
{
if (hcm->was_transport_closed)
{
max_deq = svm_fifo_max_dequeue_cons (s->rx_fifo);
if (PREDICT_FALSE (max_deq == 0))
- goto done;
+ {
+ HTTP_DBG (1, "no data to deq");
+ return 0;
+ }
if (hc_session->to_recv == 0)
{
return -1;
}
+ HTTP_DBG (1, "hc_session_index[%u]%u %U content-length: %lu",
+ s->thread_index, s->opaque, format_http_status_code, msg.code,
+ msg.data.body_len);
+
if (msg.data.headers_len)
{
http_init_header_table_buf (&hc_session->resp_headers, msg);
svm_fifo_max_dequeue (s->rx_fifo));
if (!max_deq)
{
+ HTTP_DBG (1, "body not yet received");
goto done;
}
u32 n_deq = clib_min (hc_session->to_recv, max_deq);
ASSERT (hc_session->to_recv >= rv);
hc_session->to_recv -= rv;
hc_session->body_recv += rv;
+ HTTP_DBG (1, "read %u, left to recv %u", n_deq, hc_session->to_recv);
if (hcm->filename)
{
if (hc_session->file_ptr == NULL)
hc_session->stats.request_count++;
if (hc_session->stats.elapsed_time >= hcm->duration &&
- hc_session->stats.request_count >= hc_session->stats.req_per_wrk)
+ hc_session->stats.request_count >= hc_session->stats.max_req)
{
HTTP_DBG (1, "repeat done");
- hc_session_disconnect_callback (s);
+ if (hc_session->session_flags & HC_S_FLAG_IS_PARENT)
+ {
+ /* parent must be closed last */
+ if (hc_session->child_count != 0)
+ hc_session->session_flags |= HC_S_FLAG_IS_CLOSED;
+ else
+ hc_session_disconnect_callback (s);
+ }
+ else
+ {
+ hc_session_disconnect_callback (s);
+ hc_session_t *parent = hc_session_get (
+ hc_session->parent_index, hc_session->thread_index);
+ parent->child_count--;
+ if (parent->child_count == 0 &&
+ parent->session_flags & HC_S_FLAG_IS_CLOSED)
+ hc_session_disconnect_callback (session_get (
+ parent->http_session_index, parent->thread_index));
+ }
}
else
{
return 0;
}
+static void
+hc_ho_cleanup_callback (session_t *s)
+{
+ HTTP_DBG (1, "ho index %u", s->opaque);
+ hc_worker_t *wrk = hc_worker_get (transport_cl_thread ());
+ pool_put_index (wrk->sessions, s->opaque);
+}
+
static session_cb_vft_t hc_session_cb_vft = {
.session_connected_callback = hc_session_connected_callback,
.session_disconnect_callback = hc_session_disconnect_callback,
.session_reset_callback = hc_session_reset_callback,
.builtin_app_rx_callback = hc_rx_callback,
.builtin_app_tx_callback = hc_tx_callback,
+ .half_open_cleanup_callback = hc_ho_cleanup_callback,
};
static clib_error_t *
return 0;
}
-static int
+static void
hc_connect_rpc (void *rpc_args)
{
vnet_connect_args_t *a = rpc_args;
int rv = ~0;
hc_main_t *hcm = &hc_main;
+ hc_worker_t *wrk;
+ hc_session_t *ho_hs;
for (u32 i = 0; i < hcm->max_sessions; i++)
{
+ /* allocate half-open session */
+ wrk = hc_worker_get (transport_cl_thread ());
+ ho_hs = hc_session_alloc (wrk);
+ ho_hs->session_flags |= HC_S_FLAG_IS_PARENT;
+ a->api_context = ho_hs->session_index;
+
rv = vnet_connect (a);
- if (rv > 0)
+ if (rv)
clib_warning (0, "connect returned: %U", format_session_error, rv);
}
session_endpoint_free_ext_cfgs (&a->sep_ext);
vec_free (a);
-
- return rv;
}
static void
vnet_connect_args_t *a = 0;
transport_endpt_ext_cfg_t *ext_cfg;
transport_endpt_cfg_http_t http_cfg = { (u32) hcm->timeout, 0 };
+
vec_validate (a, 0);
clib_memset (a, 0, sizeof (a[0]));
clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
ext_cfg->crypto.alpn_protos[0] = TLS_ALPN_PROTO_HTTP_1_1;
break;
case HTTP_VERSION_2:
- ext_cfg->crypto.alpn_protos[1] = TLS_ALPN_PROTO_HTTP_2;
+ ext_cfg->crypto.alpn_protos[0] = TLS_ALPN_PROTO_HTTP_2;
break;
default:
break;
hc_session_t *hc_session;
vec_foreach (wrk, hcm->wrk)
{
- vec_foreach (hc_session, wrk->sessions)
+ pool_foreach (hc_session, wrk->sessions)
{
hc_stats.request_count += hc_session->stats.request_count;
hc_session->stats.request_count = 0;
case HC_CONNECT_FAILED:
err = clib_error_return (0, "error: failed to connect");
break;
+ case HC_MAX_STREAMS_HIT:
+ err = clib_error_return (0, "error: max streams hit");
+ break;
case HC_TRANSPORT_CLOSED:
err = clib_error_return (0, "error: transport closed");
break;
HTTP_DBG (1, "worker and worker sessions cleanup");
vec_free (wrk->headers_buf);
- vec_foreach (hc_session, wrk->sessions)
+ pool_foreach (hc_session, wrk->sessions)
{
http_free_header_table (&hc_session->resp_headers);
vec_free (hc_session->http_response);
hc_main_t *hcm = &hc_main;
clib_error_t *err = 0;
unformat_input_t _line_input, *line_input = &_line_input;
- u64 mem_size;
+ u64 mem_size, repeat_count = 0;
u8 *appns_id = 0;
u8 *path = 0;
u8 *file_data;
u8 *value;
int rv;
hcm->timeout = 10;
- hcm->repeat_count = 0;
hcm->duration = 0;
hcm->repeat = false;
hcm->multi_session = false;
hcm->done_count = 0;
hcm->connected_counter = 0;
hcm->max_sessions = 1;
+ hcm->max_streams = 1;
hcm->prealloc_fifos = 0;
hcm->private_segment_size = 0;
hcm->fifo_size = 0;
hcm->verbose = true;
else if (unformat (line_input, "timeout %f", &hcm->timeout))
;
- else if (unformat (line_input, "repeat %d", &hcm->repeat_count))
+ else if (unformat (line_input, "repeat %d", &repeat_count))
{
hcm->repeat = true;
}
goto done;
}
}
+ else if (unformat (line_input, "streams %d", &hcm->max_streams))
+ {
+ if (hcm->max_streams <= 1)
+ {
+ err = clib_error_return (0, "streams must be > 1");
+ goto done;
+ }
+ }
else if (unformat (line_input, "prealloc-fifos %d",
&hcm->prealloc_fifos))
;
}
}
- if (hcm->duration && hcm->repeat_count)
+ if (hcm->duration && repeat_count)
{
err = clib_error_return (
0, "combining duration and repeat is not supported");
goto done;
}
+ if (hcm->max_streams > 1 && !hcm->repeat)
+ {
+ err = clib_error_return (
+ 0, "multiple streams are only supported with request repeating");
+ goto done;
+ }
+
+ if (repeat_count)
+ {
+ hcm->reqs_per_session =
+ repeat_count / (hcm->max_sessions * hcm->max_streams);
+ hcm->reqs_remainder =
+ repeat_count % (hcm->max_sessions * hcm->max_streams);
+ }
+
if ((rv = parse_target ((char **) &hcm->uri, (char **) &hcm->target)))
{
err = clib_error_return (0, "target parse error: %U",
http_main_t *hm = &http_main;
u32 num_threads, i;
http_engine_vft_t *http_version;
+ http_worker_t *wrk;
if (!is_en)
{
}
vec_validate (hm->wrk, num_threads - 1);
+ vec_foreach (wrk, hm->wrk)
+ {
+ clib_spinlock_init (&wrk->pending_stream_connects_lock);
+ }
vec_validate (hm->rx_bufs, num_threads - 1);
vec_validate (hm->tx_bufs, num_threads - 1);
vec_validate (hm->app_header_lists, num_threads - 1);
}
static int
-http_transport_connect (transport_endpoint_cfg_t *tep)
+http_connect_connection (session_endpoint_cfg_t *sep)
{
vnet_connect_args_t _cargs, *cargs = &_cargs;
http_main_t *hm = &http_main;
- session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
application_t *app;
http_conn_t *hc;
int error;
return 0;
}
+static int
+http_connect_stream (u64 parent_handle, u32 opaque)
+{
+ session_t *hs;
+ http_req_handle_t rh;
+ u32 hc_index;
+ http_conn_t *hc;
+
+ hs = session_get_from_handle (parent_handle);
+ if (session_type_transport_proto (hs->session_type) != TRANSPORT_PROTO_HTTP)
+ {
+ HTTP_DBG (1, "received incompatible session");
+ return -1;
+ }
+
+ rh.as_u32 = hs->connection_index;
+ if (rh.version != HTTP_VERSION_2)
+ {
+ HTTP_DBG (1, "%U multiplexing not supported", format_http_version,
+ rh.version);
+ return -1;
+ }
+
+ hc_index = http_vfts[rh.version].hc_index_get_by_req_index (
+ rh.req_index, hs->thread_index);
+ HTTP_DBG (1, "hc [%u]%x", hs->thread_index, hc_index);
+
+ hc = http_conn_get_w_thread (hc_index, hs->thread_index);
+
+ if (hc->state == HTTP_CONN_STATE_CLOSED)
+ {
+ HTTP_DBG (1, "conn closed");
+ return -1;
+ }
+
+ return http_vfts[rh.version].conn_connect_stream_callback (hc, opaque);
+}
+
+static void
+http_handle_stream_connects_rpc (void *args)
+{
+ clib_thread_index_t thread_index = pointer_to_uword (args);
+ http_worker_t *wrk;
+ u32 n_pending, max_connects, n_connects = 0;
+ http_pending_connect_stream_t *pc;
+
+ wrk = http_worker_get (thread_index);
+
+ clib_spinlock_lock (&wrk->pending_stream_connects_lock);
+
+ n_pending = clib_fifo_elts (wrk->pending_connect_streams);
+ max_connects = clib_min (32, n_pending);
+ vec_validate (wrk->burst_connect_streams, max_connects);
+
+ while (n_connects < max_connects)
+ clib_fifo_sub1 (wrk->pending_connect_streams,
+ wrk->burst_connect_streams[n_connects++]);
+
+ clib_spinlock_unlock (&wrk->pending_stream_connects_lock);
+
+ n_connects = 0;
+ while (n_connects < max_connects)
+ {
+ pc = &wrk->burst_connect_streams[n_connects++];
+ http_connect_stream (pc->parent_handle, pc->opaque);
+ }
+
+ /* more work to do? */
+ if (max_connects < n_pending)
+ session_send_rpc_evt_to_thread_force (
+ thread_index, http_handle_stream_connects_rpc,
+ uword_to_pointer ((uword) thread_index, void *));
+}
+
+static int
+http_program_connect_stream (session_endpoint_cfg_t *sep)
+{
+ clib_thread_index_t parent_thread_index =
+ session_thread_from_handle (sep->parent_handle);
+ http_worker_t *wrk;
+ u32 n_pending;
+
+ ASSERT (session_vlib_thread_is_cl_thread ());
+
+ /* if we are already on same worker as parent, handle connect */
+ if (parent_thread_index == transport_cl_thread ())
+ return http_connect_stream (sep->parent_handle, sep->opaque);
+
+ /* if not on same worker as parent, queue request */
+ wrk = http_worker_get (parent_thread_index);
+
+ clib_spinlock_lock (&wrk->pending_stream_connects_lock);
+
+ http_pending_connect_stream_t p = { .parent_handle = sep->parent_handle,
+ .opaque = sep->opaque };
+ clib_fifo_add1 (wrk->pending_connect_streams, p);
+ n_pending = clib_fifo_elts (wrk->pending_connect_streams);
+
+ clib_spinlock_unlock (&wrk->pending_stream_connects_lock);
+
+ if (n_pending == 1)
+ session_send_rpc_evt_to_thread_force (
+ parent_thread_index, http_handle_stream_connects_rpc,
+ uword_to_pointer ((uword) parent_thread_index, void *));
+
+ return 0;
+}
+
+static int
+http_transport_connect (transport_endpoint_cfg_t *tep)
+{
+ session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
+ session_t *hs;
+
+ hs = session_get_from_handle_if_valid (sep->parent_handle);
+ if (hs)
+ return http_program_connect_stream (sep);
+ else
+ return http_connect_connection (sep);
+}
+
static u32
http_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep)
{
req = http1_conn_alloc_req (hc);
http_req_state_change (req, HTTP_REQ_STATE_WAIT_APP_METHOD);
- return http_conn_established (hc, req);
+ return http_conn_established (hc, req, hc->hc_pa_app_api_ctx);
}
static void
u8 *unparsed_headers; /* temporary storing rx fragmented headers */
u8 *unsent_headers; /* temporary storing tx fragmented headers */
u32 unsent_headers_offset;
- u32 client_req_index;
+ u32 req_num;
} http2_conn_ctx_t;
typedef struct http2_worker_ctx_
h2c = http2_conn_ctx_get_w_thread (hc);
HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index, h2c - wrk->conn_pool);
+ ASSERT (h2c->req_num == 0);
hash_free (h2c->req_by_stream_id);
if (hc->flags & HTTP_CONN_F_HAS_REQUEST)
hpack_dynamic_table_free (&h2c->decoder_dynamic_table);
h2c - wrk->conn_pool, req_index);
req->peer_window = h2c->peer_settings.initial_window_size;
req->our_window = h2c->settings.initial_window_size;
+ h2c->req_num++;
return req;
}
static_always_inline void
http2_req_set_stream_id (http2_req_t *req, http2_conn_ctx_t *h2c,
- u32 stream_id)
+ u32 stream_id, u8 unset_old)
{
HTTP_DBG (1, "req_index [%u]%x stream_id %u", req->base.c_thread_index,
((http_req_handle_t) req->base.hr_req_handle).req_index,
stream_id);
+ if (unset_old && req->stream_id)
+ hash_unset (h2c->req_by_stream_id, req->stream_id);
req->stream_id = stream_id;
hash_set (h2c->req_by_stream_id, stream_id,
((http_req_handle_t) req->base.hr_req_handle).req_index);
if (CLIB_DEBUG)
memset (req, 0xba, sizeof (*req));
pool_put (wrk->req_pool, req);
+ h2c->req_num--;
}
static inline void
if (clib_llist_elt_is_linked (req, sched_list))
clib_llist_remove (wrk->req_pool, sched_list, req);
http_buffer_free (&req->base.tx_buf);
- if (req->stream_id)
- hash_unset (h2c->req_by_stream_id, req->stream_id);
- req->flags = 0;
+ req->flags &= ~HTTP2_REQ_F_NEED_WINDOW_UPDATE;
req->stream_state = HTTP2_STREAM_STATE_IDLE;
req->peer_window = h2c->peer_settings.initial_window_size;
req->our_window = h2c->settings.initial_window_size;
}
else
{
- if (hc->flags & HTTP_CONN_F_HAS_REQUEST)
- {
- req = http2_req_get (h2c->client_req_index, hc->c_thread_index);
- session_transport_reset_notify (&req->base.connection);
- }
+ hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({
+ req = http2_req_get (req_index, hc->c_thread_index);
+ session_transport_reset_notify (
+ &req->base.connection);
+ }));
}
}
if (clib_llist_elt_is_linked (h2c, sched_list))
max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
stream_id = http2_conn_get_next_stream_id (h2c);
- http2_req_set_stream_id (req, h2c, stream_id);
+ http2_req_set_stream_id (req, h2c, stream_id, 1);
http_io_as_dequeue_notify (&req->base, n_deq);
transport_connection_reschedule (&req->base.connection);
h2c = http2_conn_ctx_get_w_thread (hc);
http2_conn_reset_req (h2c, req, hc->c_thread_index);
+ http_io_as_del_want_read_ntf (&req->base);
}
}
http_io_as_write (&req->base, req->payload, req->payload_len);
http2_error_t *error)
{
int rv;
- u8 payload_offset;
- u64 payload_len;
+ u8 payload_offset = 0;
+ u64 payload_len = 0;
session_dgram_hdr_t hdr;
HTTP_DBG (1, "udp tunnel received data from client");
return HTTP2_ERROR_STREAM_CLOSED;
}
h2c->last_opened_stream_id = fh->stream_id;
- if (hash_elts (h2c->req_by_stream_id) ==
- h2c->settings.max_concurrent_streams)
+ if (h2c->req_num == h2c->settings.max_concurrent_streams)
{
HTTP_DBG (1, "SETTINGS_MAX_CONCURRENT_STREAMS exceeded");
http_io_ts_drain (hc, fh->length);
return HTTP2_ERROR_NO_ERROR;
}
req = http2_conn_alloc_req (hc);
- http2_req_set_stream_id (req, h2c, fh->stream_id);
+ http2_req_set_stream_id (req, h2c, fh->stream_id, 0);
req->dispatch_headers_cb = http2_sched_dispatch_resp_headers;
http_conn_accept_request (hc, &req->base);
http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_TRANSPORT_METHOD);
req->our_window -= fh->length;
h2c->our_window -= fh->length;
- HTTP_DBG (1, "run state machine");
+ HTTP_DBG (1, "run state machine '%U' req_index %x", format_http_req_state,
+ req->base.state,
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
return http2_req_run_state_machine (hc, req, 0, 0);
}
h2c->flags &= ~HTTP2_CONN_F_EXPECT_SERVER_SETTINGS;
HTTP_DBG (1, "client connection established");
req = http2_conn_alloc_req (hc);
- h2c->client_req_index =
- ((http_req_handle_t) req->base.hr_req_handle).req_index;
+ req->flags |= HTTP2_REQ_F_IS_PARENT;
hc->flags |= HTTP_CONN_F_HAS_REQUEST;
hpack_dynamic_table_init (
&h2c->decoder_dynamic_table,
http2_default_conn_settings.header_table_size);
http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD);
- if (http_conn_established (hc, &req->base))
+ if (http_conn_established (hc, &req->base, hc->hc_pa_app_api_ctx))
return HTTP2_ERROR_INTERNAL_ERROR;
}
if (!(hc->flags & HTTP_CONN_F_IS_SERVER))
{
ASSERT (hc->flags & HTTP_CONN_F_HAS_REQUEST);
- req = http2_req_get (h2c->client_req_index, hc->c_thread_index);
- if (!req)
- return HTTP2_ERROR_NO_ERROR;
- session_transport_closed_notify (&req->base.connection);
+ hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({
+ req = http2_req_get (req_index, hc->c_thread_index);
+ session_transport_closed_notify (
+ &req->base.connection);
+ }));
}
}
else
{
HTTP_DBG (1, "nothing more to send, confirm close");
session_transport_closed_notify (&req->base.connection);
+ if (req->flags & HTTP2_REQ_F_IS_PARENT)
+ {
+ HTTP_DBG (1, "client app closed parent, closing connection");
+ ASSERT (!(hc->flags & HTTP_CONN_F_IS_SERVER));
+ http_shutdown_transport (hc);
+ }
}
else if (req->base.is_tunnel)
{
h2c->flags |= HTTP2_CONN_F_PREFACE_VERIFIED;
}
+static int
+http2_conn_connect_stream_callback (http_conn_t *hc, u32 parent_app_api_ctx)
+{
+ http2_conn_ctx_t *h2c;
+ http2_req_t *req;
+ app_worker_t *app_wrk;
+
+ HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
+ h2c = http2_conn_ctx_get_w_thread (hc);
+ ASSERT (!(hc->flags & HTTP_CONN_F_IS_SERVER));
+ ASSERT (!(h2c->flags & HTTP2_CONN_F_EXPECT_SERVER_SETTINGS));
+ app_wrk = app_worker_get_if_valid (hc->hc_pa_wrk_index);
+ if (!app_wrk)
+ return -1;
+ if (h2c->req_num == h2c->settings.max_concurrent_streams)
+ return app_worker_connect_notify (app_wrk, 0, SESSION_E_MAX_STREAMS_HIT,
+ hc->hc_pa_app_api_ctx);
+ req = http2_conn_alloc_req (hc);
+ http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD);
+ return http_conn_established (hc, &req->base, parent_app_api_ctx);
+}
+
static void
http2_conn_cleanup_callback (http_conn_t *hc)
{
HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
h2c = http2_conn_ctx_get_w_thread (hc);
- if (hc->flags & HTTP_CONN_F_IS_SERVER)
- hash_foreach (stream_id, req_index, h2c->req_by_stream_id,
- ({ vec_add1 (req_indices, req_index); }));
- else
- {
- if (hc->flags & HTTP_CONN_F_HAS_REQUEST)
- vec_add1 (req_indices, h2c->client_req_index);
- }
+ hash_foreach (stream_id, req_index, h2c->req_by_stream_id,
+ ({ vec_add1 (req_indices, req_index); }));
vec_foreach (req_index_p, req_indices)
{
.transport_conn_reschedule_callback =
http2_transport_conn_reschedule_callback,
.conn_accept_callback = http2_conn_accept_callback,
+ .conn_connect_stream_callback = http2_conn_connect_stream_callback,
.conn_cleanup_callback = http2_conn_cleanup_callback,
.enable_callback = http2_enable_callback,
.unformat_cfg_callback = http2_unformat_config_callback,
void *opaque; /* version specific data */
} http_conn_t;
+typedef struct http_pending_connect_stream_
+{
+ u64 parent_handle;
+ u32 opaque;
+} http_pending_connect_stream_t;
+
typedef struct http_worker_
{
http_conn_t *conn_pool;
+ clib_spinlock_t pending_stream_connects_lock;
+ http_pending_connect_stream_t *pending_connect_streams;
+ http_pending_connect_stream_t *burst_connect_streams;
} http_worker_t;
typedef struct http_main_
void (*transport_reset_callback) (http_conn_t *hc);
void (*transport_conn_reschedule_callback) (http_conn_t *hc);
void (*conn_accept_callback) (http_conn_t *hc); /* optional */
+ int (*conn_connect_stream_callback) (http_conn_t *hc,
+ u32 parent_app_api_ctx); /* optional */
void (*conn_cleanup_callback) (http_conn_t *hc);
void (*enable_callback) (void); /* optional */
uword (*unformat_cfg_callback) (unformat_input_t *input); /* optional */
SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
}
+always_inline void
+http_io_as_del_want_read_ntf (http_req_t *req)
+{
+ session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+ svm_fifo_del_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
+ SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
+}
+
always_inline void
http_io_as_reset_has_read_ntf (http_req_t *req)
{
}
always_inline int
-http_conn_established (http_conn_t *hc, http_req_t *req)
+http_conn_established (http_conn_t *hc, http_req_t *req,
+ u32 parent_app_api_ctx)
{
session_t *as;
app_worker_t *app_wrk;
as->app_wrk_index = hc->hc_pa_wrk_index;
as->connection_index = req->hr_req_handle;
as->session_state = SESSION_STATE_READY;
- as->opaque = hc->hc_pa_app_api_ctx;
+ as->opaque = parent_app_api_ctx;
ts = session_get_from_handle (hc->hc_tc_session_handle);
as->session_type = session_type_from_proto_and_ip (
TRANSPORT_PROTO_HTTP, session_type_is_ip4 (ts->session_type));
return rv;
}
- app_worker_connect_notify (app_wrk, as, 0, hc->hc_pa_app_api_ctx);
+ app_worker_connect_notify (app_wrk, as, 0, parent_app_api_ctx);
req->hr_pa_session_handle = session_handle (as);
req->hr_pa_wrk_index = as->app_wrk_index;