From: Matus Fabian Date: Fri, 30 May 2025 14:51:59 +0000 (-0400) Subject: http: http/2 connect method X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=4d9e8e6b3b48b8ca986d641a9f96f887b2b6a237;p=vpp.git http: http/2 connect method Type: feature Change-Id: I7dc27a93388a6d680f2a87ccbd2704bb76a91357 Signed-off-by: Matus Fabian --- diff --git a/extras/hs-test/h2spec_extras/h2spec_extras.go b/extras/hs-test/h2spec_extras/h2spec_extras.go index 3c2b9dd76a9..6957557a01b 100644 --- a/extras/hs-test/h2spec_extras/h2spec_extras.go +++ b/extras/hs-test/h2spec_extras/h2spec_extras.go @@ -2,10 +2,12 @@ package h2spec_extras import ( "fmt" + "slices" "github.com/summerwind/h2spec/config" "github.com/summerwind/h2spec/spec" "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" ) var key = "extras" @@ -25,6 +27,7 @@ func Spec() *spec.TestGroup { } tg.AddTestGroup(FlowControl()) + tg.AddTestGroup(ConnectMethod()) return tg } @@ -53,6 +56,32 @@ func VerifyWindowUpdate(conn *spec.Conn, streamID, expectedIncrement uint32) err return nil } +func VerifyTunnelClosed(conn *spec.Conn) error { + var streamClosed = false + var lastEvent spec.Event + for !conn.Closed { + ev := conn.WaitEvent() + lastEvent = ev + switch event := ev.(type) { + case spec.DataFrameEvent: + if event.StreamEnded() { + streamClosed = true + goto done + } + case spec.TimeoutEvent: + goto done + } + } +done: + if !streamClosed { + return &spec.TestError{ + Expected: []string{spec.ExpectedStreamClosed}, + Actual: lastEvent.String(), + } + } + return nil +} + func FlowControl() *spec.TestGroup { tg := NewTestGroup("1", "Flow control") tg.AddTestCase(&spec.TestCase{ @@ -168,3 +197,202 @@ func FlowControl() *spec.TestGroup { }) return tg } + +func ConnectHeaders(c *config.Config) []hpack.HeaderField { + + return []hpack.HeaderField{ + spec.HeaderField(":method", "CONNECT"), + spec.HeaderField(":authority", c.Path), + } +} + +func ConnectMethod() *spec.TestGroup { + tg := NewTestGroup("2", "CONNECT method") + + tg.AddTestCase(&spec.TestCase{ + Desc: "Tunnel closed by target", + Requirement: "A proxy that receives a TCP segment with the FIN bit set sends a DATA frame with the END_STREAM flag set.", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + err := conn.Handshake() + if err != nil { + return err + } + + headers := ConnectHeaders(c) + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + err = spec.VerifyHeadersFrame(conn, streamID) + if err != nil { + return err + } + + // send http/1.0 so target will close connection when send response + conn.WriteData(streamID, false, []byte("GET /index.html HTTP/1.0\r\n\r\n")) + + // wait for DATA frame with END_STREAM flag set + err = VerifyTunnelClosed(conn) + if err != nil { + return err + } + + // client is expected to send DATA frame with the END_STREAM flag set + conn.WriteData(streamID, true, []byte("")) + + return nil + }, + }) + + tg.AddTestCase(&spec.TestCase{ + Desc: "Tunnel closed by client (with attached data)", + Requirement: "A proxy that receives a DATA frame with the END_STREAM flag set sends the attached data with the FIN bit set on the last TCP segment.", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + err := conn.Handshake() + if err != nil { + return err + } + + headers := ConnectHeaders(c) + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + err = spec.VerifyHeadersFrame(conn, streamID) + if err != nil { + return err + } + + // close tunnel + conn.WriteData(streamID, true, []byte("HEAD /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n")) + + // wait for DATA frame with END_STREAM flag set + err = VerifyTunnelClosed(conn) + if err != nil { + return err + } + + return nil + }, + }) + + tg.AddTestCase(&spec.TestCase{ + Desc: "Tunnel closed by client (empty DATA frame)", + Requirement: "The final DATA frame could be empty.", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + err := conn.Handshake() + if err != nil { + return err + } + + headers := ConnectHeaders(c) + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + err = spec.VerifyHeadersFrame(conn, streamID) + if err != nil { + return err + } + + conn.WriteData(streamID, false, []byte("HEAD /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n")) + + // verify reception of response DATA frame + err = spec.VerifyEventType(conn, spec.EventDataFrame) + if err != nil { + return err + } + + // close tunnel + conn.WriteData(streamID, true, []byte("")) + + // wait for DATA frame with END_STREAM flag set + err = VerifyTunnelClosed(conn) + if err != nil { + return err + } + + return nil + }, + }) + + tg.AddTestCase(&spec.TestCase{ + Desc: "Multiple tunnels", + Requirement: "In HTTP/2, the CONNECT method establishes a tunnel over a single HTTP/2 stream to a remote host, rather than converting the entire connection to a tunnel.", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + err := conn.Handshake() + if err != nil { + return err + } + + maxStreams, ok := conn.Settings[http2.SettingMaxConcurrentStreams] + if !ok { + return spec.ErrSkipped + } + + for i := 0; i < int(maxStreams); i++ { + headers := ConnectHeaders(c) + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + err = spec.VerifyHeadersFrame(conn, streamID) + if err != nil { + return err + } + + streamID += 2 + } + + streamID = 1 + for i := 0; i < int(maxStreams); i++ { + conn.WriteData(streamID, false, []byte("HEAD /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n")) + streamID += 2 + } + + var receivedResp []uint32 + for i := 0; i < int(maxStreams); i++ { + actual, passed := conn.WaitEventByType(spec.EventDataFrame) + switch event := actual.(type) { + case spec.DataFrameEvent: + passed = !slices.Contains(receivedResp, event.StreamID) + default: + passed = false + } + if !passed { + expected := []string{ + "Receive one response per stream (tunnel)", + } + + return &spec.TestError{ + Expected: expected, + Actual: actual.String(), + } + } + } + + return nil + }, + }) + return tg +} diff --git a/extras/hs-test/infra/suite_vpp_proxy.go b/extras/hs-test/infra/suite_vpp_proxy.go index 2226358c770..44cc6bb7cbb 100644 --- a/extras/hs-test/infra/suite_vpp_proxy.go +++ b/extras/hs-test/infra/suite_vpp_proxy.go @@ -6,15 +6,21 @@ package hst import ( + "bytes" "fmt" + "io" "net" + "os" "reflect" "runtime" "strconv" "strings" + "time" + "fd.io/hs-test/h2spec_extras" . "fd.io/hs-test/infra/common" . "github.com/onsi/ginkgo/v2" + "github.com/summerwind/h2spec/config" ) const ( @@ -124,6 +130,20 @@ func (s *VppProxySuite) SetupNginxServer() { s.AssertNil(s.Containers.NginxServerTransient.Start()) } +func (s *VppProxySuite) ConfigureVppProxy(proto string, proxyPort uint16) { + vppProxy := s.Containers.VppProxy.VppInstance + cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri %s://%s:%d", proto, s.VppProxyAddr(), proxyPort) + if proto != "http" && proto != "https" && proto != "udp" { + proto = "tcp" + } + if proto != "http" && proto != "https" { + cmd += fmt.Sprintf(" client-uri %s://%s:%d", proto, s.ServerAddr(), s.Ports.Server) + } + + output := vppProxy.Vppctl(cmd) + s.Log("proxy configured: " + output) +} + func (s *VppProxySuite) ServerAddr() string { return s.Interfaces.Server.Ip4AddressString() } @@ -163,23 +183,45 @@ func (s *VppProxySuite) CurlUploadResource(uri, file string) { s.AssertNotContains(log, "Operation timed out") } -func (s *VppProxySuite) CurlDownloadResourceViaTunnel(uri string, proxyUri string) { - args := fmt.Sprintf("-w @/tmp/write_out_download_connect --max-time %d --insecure --proxy-insecure -p -x %s --remote-name --output-dir /tmp %s", s.maxTimeout, proxyUri, uri) +func (s *VppProxySuite) CurlDownloadResourceViaTunnel(uri string, proxyUri string, extraArgs ...string) (string, string) { + extras := "" + if len(extraArgs) > 0 { + extras = strings.Join(extraArgs, " ") + extras += " " + } + args := fmt.Sprintf("%s-w @/tmp/write_out_download_connect --max-time %d --insecure --proxy-insecure -p -x %s --remote-name --output-dir /tmp %s", extras, s.maxTimeout, proxyUri, uri) writeOut, log := s.RunCurlContainer(s.Containers.Curl, args) - s.AssertContains(writeOut, "CONNECT response code: 200") + if strings.Contains(extras, "proxy-http2") { + // in case of h2 connect response code is 000 in write out + s.AssertContains(log, "CONNECT tunnel established, response 200") + } else { + s.AssertContains(writeOut, "CONNECT response code: 200") + } s.AssertContains(writeOut, "GET response code: 200") s.AssertNotContains(log, "bytes remaining to read") s.AssertNotContains(log, "Operation timed out") s.AssertNotContains(log, "Upgrade:") + return writeOut, log } -func (s *VppProxySuite) CurlUploadResourceViaTunnel(uri, proxyUri, file string) { - args := fmt.Sprintf("-w @/tmp/write_out_upload_connect --max-time %d --insecure --proxy-insecure -p -x %s -T %s %s", s.maxTimeout, proxyUri, file, uri) +func (s *VppProxySuite) CurlUploadResourceViaTunnel(uri, proxyUri, file string, extraArgs ...string) (string, string) { + extras := "" + if len(extraArgs) > 0 { + extras = strings.Join(extraArgs, " ") + extras += " " + } + args := fmt.Sprintf("%s-w @/tmp/write_out_upload_connect --max-time %d --insecure --proxy-insecure -p -x %s -T %s %s", extras, s.maxTimeout, proxyUri, file, uri) writeOut, log := s.RunCurlContainer(s.Containers.Curl, args) - s.AssertContains(writeOut, "CONNECT response code: 200") + if strings.Contains(extras, "proxy-http2") { + // in case of h2 connect response code is 000 in write out + s.AssertContains(log, "CONNECT tunnel established, response 200") + } else { + s.AssertContains(writeOut, "CONNECT response code: 200") + } s.AssertContains(writeOut, "PUT response code: 201") s.AssertNotContains(log, "Operation timed out") s.AssertNotContains(log, "Upgrade:") + return writeOut, log } func handleConn(conn net.Conn) { @@ -270,3 +312,72 @@ var _ = Describe("VppProxySuiteSolo", Ordered, ContinueOnFailure, Serial, func() } } }) + +var _ = Describe("H2SpecProxySuite", Ordered, ContinueOnFailure, func() { + var s VppProxySuite + BeforeAll(func() { + s.SetupSuite() + }) + BeforeEach(func() { + s.SetupTest() + }) + AfterAll(func() { + s.TeardownSuite() + }) + AfterEach(func() { + s.TeardownTest() + }) + + testCases := []struct { + desc string + }{ + {desc: "extras/2/1"}, + {desc: "extras/2/2"}, + {desc: "extras/2/3"}, + {desc: "extras/2/4"}, + } + + for _, test := range testCases { + test := test + testName := "proxy_test.go/h2spec_" + strings.ReplaceAll(test.desc, "/", "_") + It(testName, func(ctx SpecContext) { + s.Log(testName + ": BEGIN") + s.SetupNginxServer() + s.ConfigureVppProxy("https", s.Ports.Proxy) + path := fmt.Sprintf("%s:%d", s.ServerAddr(), s.Ports.Server) + conf := &config.Config{ + Host: s.VppProxyAddr(), + Port: int(s.Ports.Proxy), + Path: path, + Timeout: time.Second * time.Duration(s.maxTimeout), + MaxHeaderLen: 4096, + TLS: true, + Insecure: true, + Sections: []string{test.desc}, + Verbose: true, + } + // capture h2spec output so it will be in log + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + tg := h2spec_extras.Spec() + tg.Test(conf) + + oChan := make(chan string) + go func() { + var buf bytes.Buffer + io.Copy(&buf, r) + oChan <- buf.String() + }() + + // restore to normal state + w.Close() + os.Stdout = oldStdout + o := <-oChan + s.Log(o) + s.AssertEqual(0, tg.FailedCount) + }, SpecTimeout(TestTimeout)) + } + +}) diff --git a/extras/hs-test/proxy_test.go b/extras/hs-test/proxy_test.go index 875d79f7577..ef7748b3312 100644 --- a/extras/hs-test/proxy_test.go +++ b/extras/hs-test/proxy_test.go @@ -22,7 +22,8 @@ import ( func init() { RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest, - VppConnectProxyGetTest, VppConnectProxyPutTest, VppHttpsConnectProxyGetTest) + VppConnectProxyGetTest, VppConnectProxyPutTest, VppHttpsConnectProxyGetTest, VppH2ConnectProxyGetTest, + VppH2ConnectProxyPutTest) RegisterVppProxySoloTests(VppProxyHttpGetTcpMTTest, VppProxyHttpPutTcpMTTest, VppProxyTcpIperfMTTest, VppProxyUdpIperfMTTest, VppConnectProxyStressTest, VppConnectProxyStressMTTest, VppConnectProxyConnectionFailedMTTest) RegisterVppUdpProxyTests(VppProxyUdpTest, VppConnectUdpProxyTest, VppConnectUdpInvalidCapsuleTest, @@ -32,20 +33,6 @@ func init() { RegisterNginxProxySoloTests(NginxMirroringTest, MirrorMultiThreadTest) } -func configureVppProxy(s *VppProxySuite, proto string, proxyPort uint16) { - vppProxy := s.Containers.VppProxy.VppInstance - cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri %s://%s:%d", proto, s.VppProxyAddr(), proxyPort) - if proto != "http" && proto != "https" && proto != "udp" { - proto = "tcp" - } - if proto != "http" && proto != "https" { - cmd += fmt.Sprintf(" client-uri %s://%s:%d", proto, s.ServerAddr(), s.Ports.Server) - } - - output := vppProxy.Vppctl(cmd) - s.Log("proxy configured: " + output) -} - func VppProxyHttpGetTcpMTTest(s *VppProxySuite) { VppProxyHttpGetTcpTest(s) } @@ -71,9 +58,9 @@ func vppProxyIperfMTTest(s *VppProxySuite, proto string) { s.AssertNil(vppProxy.DeleteTap(s.Interfaces.Client)) s.AssertNil(vppProxy.CreateTap(s.Interfaces.Client, false, 2, uint32(s.Interfaces.Client.Peer.Index), Consistent_qp)) - configureVppProxy(s, "tcp", s.Ports.Proxy) + s.ConfigureVppProxy("tcp", s.Ports.Proxy) if proto == "udp" { - configureVppProxy(s, "udp", s.Ports.Proxy) + s.ConfigureVppProxy("udp", s.Ports.Proxy) proto = "-u" } else { proto = "" @@ -111,14 +98,14 @@ func vppProxyIperfMTTest(s *VppProxySuite, proto string) { func VppProxyHttpGetTcpTest(s *VppProxySuite) { s.SetupNginxServer() - configureVppProxy(s, "tcp", s.Ports.Proxy) + s.ConfigureVppProxy("tcp", s.Ports.Proxy) uri := fmt.Sprintf("http://%s:%d/httpTestFile", s.VppProxyAddr(), s.Ports.Proxy) s.CurlDownloadResource(uri) } func VppProxyHttpGetTlsTest(s *VppProxySuite) { s.SetupNginxServer() - configureVppProxy(s, "tls", s.Ports.Proxy) + s.ConfigureVppProxy("tls", s.Ports.Proxy) uri := fmt.Sprintf("https://%s:%d/httpTestFile", s.VppProxyAddr(), s.Ports.Proxy) s.CurlDownloadResource(uri) } @@ -129,14 +116,14 @@ func VppProxyHttpPutTcpMTTest(s *VppProxySuite) { func VppProxyHttpPutTcpTest(s *VppProxySuite) { s.SetupNginxServer() - configureVppProxy(s, "tcp", s.Ports.Proxy) + s.ConfigureVppProxy("tcp", s.Ports.Proxy) uri := fmt.Sprintf("http://%s:%d/upload/testFile", s.VppProxyAddr(), s.Ports.Proxy) s.CurlUploadResource(uri, CurlContainerTestFile) } func VppProxyHttpPutTlsTest(s *VppProxySuite) { s.SetupNginxServer() - configureVppProxy(s, "tls", s.Ports.Proxy) + s.ConfigureVppProxy("tls", s.Ports.Proxy) uri := fmt.Sprintf("https://%s:%d/upload/testFile", s.VppProxyAddr(), s.Ports.Proxy) s.CurlUploadResource(uri, CurlContainerTestFile) } @@ -173,7 +160,7 @@ func nginxMirroring(s *NginxProxySuite, multiThreadWorkers bool) { func VppConnectProxyGetTest(s *VppProxySuite) { s.SetupNginxServer() - configureVppProxy(s, "http", s.Ports.Proxy) + s.ConfigureVppProxy("http", s.Ports.Proxy) targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server) proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), s.Ports.Proxy) @@ -182,16 +169,27 @@ func VppConnectProxyGetTest(s *VppProxySuite) { func VppHttpsConnectProxyGetTest(s *VppProxySuite) { s.SetupNginxServer() - configureVppProxy(s, "https", s.Ports.Proxy) + s.ConfigureVppProxy("https", s.Ports.Proxy) targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server) proxyUri := fmt.Sprintf("https://%s:%d", s.VppProxyAddr(), s.Ports.Proxy) s.CurlDownloadResourceViaTunnel(targetUri, proxyUri) } +func VppH2ConnectProxyGetTest(s *VppProxySuite) { + s.SetupNginxServer() + s.ConfigureVppProxy("https", s.Ports.Proxy) + + targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server) + proxyUri := fmt.Sprintf("https://%s:%d", s.VppProxyAddr(), s.Ports.Proxy) + _, log := s.CurlDownloadResourceViaTunnel(targetUri, proxyUri, "--proxy-http2") + // ALPN result check + s.AssertContains(log, "CONNECT tunnel: HTTP/2 negotiated") +} + func VppConnectProxyConnectionFailedMTTest(s *VppProxySuite) { s.SetupNginxServer() - configureVppProxy(s, "http", s.Ports.Proxy) + s.ConfigureVppProxy("http", s.Ports.Proxy) targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server+1) proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), s.Ports.Proxy) @@ -201,13 +199,24 @@ func VppConnectProxyConnectionFailedMTTest(s *VppProxySuite) { func VppConnectProxyPutTest(s *VppProxySuite) { s.SetupNginxServer() - configureVppProxy(s, "http", s.Ports.Proxy) + s.ConfigureVppProxy("http", s.Ports.Proxy) proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), s.Ports.Proxy) targetUri := fmt.Sprintf("http://%s:%d/upload/testFile", s.ServerAddr(), s.Ports.Server) s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile) } +func VppH2ConnectProxyPutTest(s *VppProxySuite) { + s.SetupNginxServer() + s.ConfigureVppProxy("https", s.Ports.Proxy) + + proxyUri := fmt.Sprintf("https://%s:%d", s.VppProxyAddr(), s.Ports.Proxy) + targetUri := fmt.Sprintf("http://%s:%d/upload/testFile", s.ServerAddr(), s.Ports.Server) + _, log := s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile, "--proxy-http2") + // ALPN result check + s.AssertContains(log, "CONNECT tunnel: HTTP/2 negotiated") +} + func vppConnectProxyStressLoad(s *VppProxySuite, proxyPort string) { var ( connectError, timeout, readError, writeError, invalidData, total atomic.Uint32 @@ -320,7 +329,7 @@ func VppConnectProxyStressTest(s *VppProxySuite) { remoteServerConn := s.StartEchoServer() defer remoteServerConn.Close() - configureVppProxy(s, "http", s.Ports.Proxy) + s.ConfigureVppProxy("http", s.Ports.Proxy) // no goVPP less noise s.Containers.VppProxy.VppInstance.Disconnect() @@ -340,7 +349,7 @@ func VppConnectProxyStressMTTest(s *VppProxySuite) { s.AssertNil(vppProxy.DeleteTap(s.Interfaces.Client)) s.AssertNil(vppProxy.CreateTap(s.Interfaces.Client, false, 2, uint32(s.Interfaces.Client.Peer.Index), Consistent_qp)) - configureVppProxy(s, "http", s.Ports.Proxy) + s.ConfigureVppProxy("http", s.Ports.Proxy) // no goVPP less noise vppProxy.Disconnect() diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c index c8bdc73a418..445235fec8d 100644 --- a/src/plugins/hs_apps/proxy.c +++ b/src/plugins/hs_apps/proxy.c @@ -1282,8 +1282,6 @@ proxy_server_listen () &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO, sizeof (transport_endpt_crypto_cfg_t)); ext_cfg->crypto.ckpair_index = pm->ckpair_index; - /* TODO: remove when http/2 connect done */ - ext_cfg->crypto.alpn_protos[0] = TLS_ALPN_PROTO_HTTP_1_1; } } else diff --git a/src/plugins/http/http2/http2.c b/src/plugins/http/http2/http2.c index f9f281f8b90..9cf81816aeb 100644 --- a/src/plugins/http/http2/http2.c +++ b/src/plugins/http/http2/http2.c @@ -7,6 +7,7 @@ #include #include #include +#include #define HTTP2_WIN_SIZE_MAX 0x7FFFFFFF #define HTTP2_INITIAL_WIN_SIZE 65535 @@ -58,6 +59,8 @@ typedef struct http2_req_ clib_llist_anchor_t sched_list; void (*dispatch_headers_cb) (struct http2_req_ *req, http_conn_t *hc, u8 *n_emissions, clib_llist_index_t *next_ri); + void (*dispatch_data_cb) (struct http2_req_ *req, http_conn_t *hc, + u8 *n_emissions); } http2_req_t; #define foreach_http2_conn_flags \ @@ -381,6 +384,18 @@ http2_send_stream_error (http_conn_t *hc, u32 stream_id, http2_error_t error, http_io_ts_after_write (hc, 1); } +always_inline void +http2_tunnel_send_close (http_conn_t *hc, http2_req_t *req) +{ + u8 *response; + + response = http_get_tx_buf (hc); + http2_frame_write_data_header (0, req->stream_id, + HTTP2_FRAME_FLAG_END_STREAM, response); + http_io_ts_write (hc, response, HTTP2_FRAME_HEADER_SIZE, 0); + http_io_ts_after_write (hc, 1); +} + /* send RST_STREAM frame and notify app */ always_inline void http2_stream_error (http_conn_t *hc, http2_req_t *req, http2_error_t error, @@ -458,6 +473,150 @@ http2_send_server_preface (http_conn_t *hc) /* stream TX scheduler */ /***********************/ +static void +http2_sched_dispatch_data (http2_req_t *req, http_conn_t *hc, u8 *n_emissions) +{ + u32 max_write, max_read, n_segs, n_read, n_written = 0; + svm_fifo_seg_t *app_segs, *segs = 0; + http_buffer_t *hb = &req->base.tx_buf; + u8 fh[HTTP2_FRAME_HEADER_SIZE]; + u8 finished = 0, flags = 0; + http2_conn_ctx_t *h2c; + + ASSERT (http_buffer_bytes_left (hb) > 0); + + *n_emissions += hb->type == HTTP_BUFFER_PTR ? HTTP2_SCHED_WEIGHT_DATA_PTR : + HTTP2_SCHED_WEIGHT_DATA_INLINE; + + h2c = http2_conn_ctx_get_w_thread (hc); + + max_write = http_io_ts_max_write (hc, 0); + max_write -= HTTP2_FRAME_HEADER_SIZE; + max_write = clib_min (max_write, (u32) req->peer_window); + max_write = clib_min (max_write, h2c->peer_window); + max_write = clib_min (max_write, h2c->peer_settings.max_frame_size); + + max_read = http_buffer_bytes_left (hb); + + n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs); + if (n_read == 0) + { + HTTP_DBG (1, "no data to deq"); + transport_connection_reschedule (&req->base.connection); + return; + } + + finished = (max_read - n_read) == 0; + flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0; + http2_frame_write_data_header (n_read, req->stream_id, flags, fh); + vec_validate (segs, 0); + segs[0].len = HTTP2_FRAME_HEADER_SIZE; + segs[0].data = fh; + vec_append (segs, app_segs); + + n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0); + n_written -= HTTP2_FRAME_HEADER_SIZE; + vec_free (segs); + http_buffer_drain (hb, n_written); + req->peer_window -= n_written; + h2c->peer_window -= n_written; + + if (finished) + { + /* all done, close stream */ + http_buffer_free (hb); + if (hc->flags & HTTP_CONN_F_IS_SERVER) + http2_stream_close (req, hc); + else + req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED; + } + else + { + if (req->peer_window == 0) + { + /* mark that we need window update on stream */ + HTTP_DBG (1, "stream window is full"); + req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE; + } + else + { + /* schedule for next round */ + HTTP_DBG (1, "adding to data queue req_index %x", + ((http_req_handle_t) req->base.hr_req_handle).req_index); + http2_req_schedule_data_tx (hc, req); + http_io_as_dequeue_notify (&req->base, n_written); + } + } + + http_io_ts_after_write (hc, finished); +} + +static void +http2_sched_dispatch_tunnel (http2_req_t *req, http_conn_t *hc, + u8 *n_emissions) +{ + u32 max_write, max_read, n_segs = 2, n_read, n_written = 0; + svm_fifo_seg_t segs[n_segs + 1]; + u8 fh[HTTP2_FRAME_HEADER_SIZE]; + u8 flags = 0; + http2_conn_ctx_t *h2c; + + *n_emissions += HTTP2_SCHED_WEIGHT_DATA_INLINE; + + h2c = http2_conn_ctx_get_w_thread (hc); + + max_read = http_io_as_max_read (&req->base); + if (max_read == 0) + { + HTTP_DBG (2, "max_read == 0"); + transport_connection_reschedule (&req->base.connection); + return; + } + max_write = http_io_ts_max_write (hc, 0); + max_write -= HTTP2_FRAME_HEADER_SIZE; + max_write = clib_min (max_write, (u32) req->peer_window); + max_write = clib_min (max_write, h2c->peer_window); + max_write = clib_min (max_write, h2c->peer_settings.max_frame_size); + n_read = clib_min (max_write, max_read); + + if (req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED && + max_write >= max_read) + { + HTTP_DBG (1, "closing tunnel"); + session_transport_closed_notify (&req->base.connection); + flags = HTTP2_FRAME_FLAG_END_STREAM; + } + http2_frame_write_data_header (n_read, req->stream_id, flags, fh); + segs[0].len = HTTP2_FRAME_HEADER_SIZE; + segs[0].data = fh; + + http_io_as_read_segs (&req->base, segs + 1, &n_segs, n_read); + + n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0); + n_written -= HTTP2_FRAME_HEADER_SIZE; + http_io_as_drain (&req->base, n_written); + req->peer_window -= n_written; + h2c->peer_window -= n_written; + + if (req->peer_window == 0) + { + /* mark that we need window update on stream */ + HTTP_DBG (1, "stream window is full"); + req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE; + } + else if (max_read - n_written) + { + /* schedule for next round if we have more data */ + HTTP_DBG (1, "adding to data queue req_index %x", + ((http_req_handle_t) req->base.hr_req_handle).req_index); + http2_req_schedule_data_tx (hc, req); + } + else + transport_connection_reschedule (&req->base.connection); + + http_io_ts_after_write (hc, 0); +} + static void http2_sched_dispatch_continuation (http2_req_t *req, http_conn_t *hc, u8 *n_emissions, @@ -502,6 +661,7 @@ http2_sched_dispatch_continuation (http2_req_t *req, http_conn_t *hc, if (http_buffer_bytes_left (&req->base.tx_buf)) { /* start sending the actual data */ + req->dispatch_data_cb = http2_sched_dispatch_data; HTTP_DBG (1, "adding to data queue req_index %x", ((http_req_handle_t) req->base.hr_req_handle).req_index); http2_req_schedule_data_tx (hc, req); @@ -567,14 +727,19 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc, stream_id = req->stream_id; + /* tunnel established if 2xx (Successful) response to CONNECT */ + req->base.is_tunnel = + (req->base.is_tunnel && http_status_code_str[msg.code][0] == '2'); + /* END_STREAM flag need to be set in HEADERS frame */ if (msg.data.body_len) { + ASSERT (req->base.is_tunnel == 0); http_req_tx_buffer_init (&req->base, &msg); http_io_as_dequeue_notify (&req->base, n_deq); } else - flags |= HTTP2_FRAME_FLAG_END_STREAM; + flags |= req->base.is_tunnel ? 0 : HTTP2_FRAME_FLAG_END_STREAM; if (headers_len <= max_write) { @@ -584,12 +749,23 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc, if (msg.data.body_len) { /* start sending the actual data */ + req->dispatch_data_cb = http2_sched_dispatch_data; HTTP_DBG (1, "adding to data queue req_index %x", ((http_req_handle_t) req->base.hr_req_handle).req_index); http2_req_schedule_data_tx (hc, req); } + else if (req->base.is_tunnel) + { + req->dispatch_data_cb = http2_sched_dispatch_tunnel; + transport_connection_reschedule (&req->base.connection); + /* cleanup some stuff we don't need anymore in tunnel mode */ + vec_free (req->base.headers); + } else - http2_stream_close (req, hc); + { + /* otherwise we are done */ + http2_stream_close (req, hc); + } } else { @@ -616,84 +792,6 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc, http_io_ts_after_write (hc, 0); } -static void -http2_sched_dispatch_data (http2_req_t *req, http_conn_t *hc, u8 *n_emissions) -{ - u32 max_write, max_read, n_segs, n_read, n_written = 0; - svm_fifo_seg_t *app_segs, *segs = 0; - http_buffer_t *hb = &req->base.tx_buf; - u8 fh[HTTP2_FRAME_HEADER_SIZE]; - u8 finished = 0, flags = 0; - http2_conn_ctx_t *h2c; - - ASSERT (http_buffer_bytes_left (hb) > 0); - - *n_emissions += hb->type == HTTP_BUFFER_PTR ? HTTP2_SCHED_WEIGHT_DATA_PTR : - HTTP2_SCHED_WEIGHT_DATA_INLINE; - - h2c = http2_conn_ctx_get_w_thread (hc); - - max_write = http_io_ts_max_write (hc, 0); - max_write -= HTTP2_FRAME_HEADER_SIZE; - max_write = clib_min (max_write, (u32) req->peer_window); - max_write = clib_min (max_write, h2c->peer_window); - max_write = clib_min (max_write, h2c->peer_settings.max_frame_size); - - max_read = http_buffer_bytes_left (hb); - - n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs); - if (n_read == 0) - { - HTTP_DBG (1, "no data to deq"); - transport_connection_reschedule (&req->base.connection); - return; - } - - finished = (max_read - n_read) == 0; - flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0; - http2_frame_write_data_header (n_read, req->stream_id, flags, fh); - vec_validate (segs, 0); - segs[0].len = HTTP2_FRAME_HEADER_SIZE; - segs[0].data = fh; - vec_append (segs, app_segs); - - n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0); - ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read)); - vec_free (segs); - http_buffer_drain (hb, n_read); - req->peer_window -= n_read; - h2c->peer_window -= n_read; - - if (finished) - { - /* all done, close stream */ - http_buffer_free (hb); - if (hc->flags & HTTP_CONN_F_IS_SERVER) - http2_stream_close (req, hc); - else - req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED; - } - else - { - if (req->peer_window == 0) - { - /* mark that we need window update on stream */ - HTTP_DBG (1, "stream window is full"); - req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE; - } - else - { - /* schedule for next round */ - HTTP_DBG (1, "adding to data queue req_index %x", - ((http_req_handle_t) req->base.hr_req_handle).req_index); - http2_req_schedule_data_tx (hc, req); - http_io_as_dequeue_notify (&req->base, n_read); - } - } - - http_io_ts_after_write (hc, finished); -} - static void http2_update_time_callback (f64 now, u8 thread_index) { @@ -755,7 +853,7 @@ http2_update_time_callback (f64 now, u8 thread_index) 1, "sending data req_index %x", ((http_req_handle_t) req->base.hr_req_handle).req_index); clib_llist_remove (wrk->req_pool, sched_list, req); - http2_sched_dispatch_data (req, hc, &n_emissions); + req->dispatch_data_cb (req, hc, &n_emissions); if (ri == old_ti) break; @@ -791,6 +889,7 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req, http2_conn_ctx_t *h2c; hpack_request_control_data_t control_data; http_msg_t msg; + u8 *p; int rv; http_req_state_t new_state = HTTP_REQ_STATE_WAIT_APP_REPLY; http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index); @@ -816,8 +915,7 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req, http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); return HTTP_SM_STOP; } - if (control_data.method == HTTP_REQ_UNKNOWN || - control_data.method == HTTP_REQ_CONNECT) + if (control_data.method == HTTP_REQ_UNKNOWN) { HTTP_DBG (1, "unsupported method"); http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); @@ -843,13 +941,46 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req, http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); return HTTP_SM_STOP; } - if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_AUTHORITY_PARSED) && - control_data.method != HTTP_REQ_CONNECT) + if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_AUTHORITY_PARSED)) { - HTTP_DBG (1, ":path pseudo-header missing in request"); + HTTP_DBG (1, ":authority pseudo-header missing in request"); http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); return HTTP_SM_STOP; } + if (control_data.method == HTTP_REQ_CONNECT) + { + if (control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_SCHEME_PARSED || + control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PATH_PARSED) + { + HTTP_DBG (1, ":scheme and :path pseudo-header must be omitted for " + "CONNECT method"); + http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); + return HTTP_SM_STOP; + } + /* quick check if port is present */ + p = control_data.authority + control_data.authority_len; + p--; + if (!isdigit (*p)) + { + HTTP_DBG (1, "port not present in authority"); + http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); + return HTTP_SM_STOP; + } + p--; + for (; p > control_data.authority; p--) + { + if (!isdigit (*p)) + break; + } + if (*p != ':') + { + HTTP_DBG (1, "port not present in authority"); + http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); + return HTTP_SM_STOP; + } + req->base.is_tunnel = 1; + http_io_as_add_want_read_ntf (&req->base); + } req->base.control_data_len = control_data.control_data_len; req->base.headers_offset = control_data.headers - wrk->header_list; @@ -868,7 +999,9 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req, http_io_as_add_want_read_ntf (&req->base); } /* TODO: message framing without content length using END_STREAM flag */ - if (req->base.body_len == 0 && req->stream_state == HTTP2_STREAM_STATE_OPEN) + if (req->base.body_len == 0 && + req->stream_state == HTTP2_STREAM_STATE_OPEN && + control_data.method != HTTP_REQ_CONNECT) { HTTP_DBG (1, "no content-length and DATA frame expected"); *error = HTTP2_ERROR_INTERNAL_ERROR; @@ -876,14 +1009,22 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req, } req->base.to_recv = req->base.body_len; - req->base.target_path_len = control_data.path_len; - req->base.target_path_offset = control_data.path - wrk->header_list; - /* drop leading slash */ - req->base.target_path_offset++; - req->base.target_path_len--; req->base.target_query_offset = 0; req->base.target_query_len = 0; - http_identify_optional_query (&req->base, wrk->header_list); + if (control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PATH_PARSED) + { + req->base.target_path_len = control_data.path_len; + req->base.target_path_offset = control_data.path - wrk->header_list; + /* drop leading slash */ + req->base.target_path_offset++; + req->base.target_path_len--; + http_identify_optional_query (&req->base, wrk->header_list); + } + else + { + req->base.target_path_len = 0; + req->base.target_path_offset = 0; + } msg.type = HTTP_MSG_REQUEST; msg.method_type = control_data.method; @@ -954,6 +1095,27 @@ http2_req_state_transport_io_more_data (http_conn_t *hc, http2_req_t *req, return HTTP_SM_STOP; } +static http_sm_result_t +http2_req_state_tunnel_rx (http_conn_t *hc, http2_req_t *req, + transport_send_params_t *sp, http2_error_t *error) +{ + u32 max_enq; + + HTTP_DBG (1, "tunnel received data from client"); + + max_enq = http_io_as_max_write (&req->base); + if (max_enq < req->payload_len) + { + clib_warning ("app's rx fifo full"); + http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp); + return HTTP_SM_STOP; + } + http_io_as_write (&req->base, req->payload, req->payload_len); + http_app_worker_rx_notify (&req->base); + + return HTTP_SM_STOP; +} + /*************************************/ /* request state machine handlers TX */ /*************************************/ @@ -977,7 +1139,9 @@ http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req, clib_llist_add_tail (wrk->req_pool, sched_list, req, he); http2_conn_schedule (h2c, hc->c_thread_index); - http_req_state_change (&req->base, HTTP_REQ_STATE_APP_IO_MORE_DATA); + http_req_state_change (&req->base, req->base.is_tunnel ? + HTTP_REQ_STATE_TUNNEL : + HTTP_REQ_STATE_APP_IO_MORE_DATA); http_req_deschedule (&req->base, sp); return HTTP_SM_STOP; @@ -1005,6 +1169,29 @@ http2_req_state_app_io_more_data (http_conn_t *hc, http2_req_t *req, return HTTP_SM_STOP; } +static http_sm_result_t +http2_req_state_tunnel_tx (http_conn_t *hc, http2_req_t *req, + transport_send_params_t *sp, http2_error_t *error) +{ + http2_conn_ctx_t *h2c; + + ASSERT (!clib_llist_elt_is_linked (req, sched_list)); + + HTTP_DBG (1, "tunnel received data from target"); + + /* add data back to stream scheduler */ + HTTP_DBG (1, "adding to data queue req_index %x", + ((http_req_handle_t) req->base.hr_req_handle).req_index); + http2_req_schedule_data_tx (hc, req); + h2c = http2_conn_ctx_get_w_thread (hc); + if (h2c->peer_window > 0) + http2_conn_schedule (h2c, hc->c_thread_index); + + http_req_deschedule (&req->base, sp); + + return HTTP_SM_STOP; +} + /*************************/ /* request state machine */ /*************************/ @@ -1022,7 +1209,7 @@ static http2_sm_handler tx_state_funcs[HTTP_REQ_N_STATES] = { 0, /* wait transport method */ http2_req_state_wait_app_reply, http2_req_state_app_io_more_data, - 0, /* tunnel */ + http2_req_state_tunnel_tx, 0, /* udp tunnel */ }; @@ -1034,7 +1221,7 @@ static http2_sm_handler rx_state_funcs[HTTP_REQ_N_STATES] = { http2_req_state_wait_transport_method, 0, /* wait app reply */ 0, /* app io more data */ - 0, /* tunnel */ + http2_req_state_tunnel_rx, 0, /* udp tunnel */ }; @@ -1268,7 +1455,17 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh) } if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM) - req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED; + { + req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED; + if (req->base.is_tunnel) + { + session_transport_closing_notify (&req->base.connection); + HTTP_DBG (1, "client closed tunnel"); + /* final DATA frame could be empty */ + if (fh->length == 0) + return HTTP2_ERROR_NO_ERROR; + } + } rx_buf = http_get_rx_buf (hc); vec_validate (rx_buf, fh->length - 1); @@ -1687,7 +1884,6 @@ static void http2_app_rx_evt_callback (http_conn_t *hc, u32 req_index, clib_thread_index_t thread_index) { - /* TODO: continue tunnel RX */ http2_req_t *req; u8 *response; u32 increment; @@ -1707,6 +1903,8 @@ http2_app_rx_evt_callback (http_conn_t *hc, u32 req_index, response = http_get_tx_buf (hc); increment = http_io_as_max_write (&req->base) - req->our_window; HTTP_DBG (1, "stream window increment %u", increment); + if (increment == 0) + return; req->our_window += increment; http2_frame_write_window_update (increment, req->stream_id, &response); http_io_ts_write (hc, response, vec_len (response), 0); @@ -1735,6 +1933,34 @@ http2_app_close_callback (http_conn_t *hc, u32 req_index, HTTP_DBG (1, "nothing more to send, confirm close"); session_transport_closed_notify (&req->base.connection); } + else if (req->base.is_tunnel) + { + switch (req->stream_state) + { + case HTTP2_STREAM_STATE_OPEN: + HTTP_DBG (1, "proxy closing connection"); + req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED; + if (http_io_as_max_read (&req->base)) + { + HTTP_DBG (1, "wait for all data to be written to ts"); + req->flags |= HTTP2_REQ_F_APP_CLOSED; + } + else + { + HTTP_DBG (1, "nothing more to send, closing tunnel"); + http2_tunnel_send_close (hc, req); + } + break; + case HTTP2_STREAM_STATE_HALF_CLOSED: + HTTP_DBG (1, "proxy confirmed close"); + http2_tunnel_send_close (hc, req); + session_transport_closed_notify (&req->base.connection); + break; + default: + session_transport_closed_notify (&req->base.connection); + break; + } + } else { HTTP_DBG (1, "wait for all data to be written to ts");