http: http/2 connect method 59/43059/14
authorMatus Fabian <[email protected]>
Fri, 30 May 2025 14:51:59 +0000 (10:51 -0400)
committerFlorin Coras <[email protected]>
Wed, 11 Jun 2025 15:52:53 +0000 (15:52 +0000)
Type: feature

Change-Id: I7dc27a93388a6d680f2a87ccbd2704bb76a91357
Signed-off-by: Matus Fabian <[email protected]>
extras/hs-test/h2spec_extras/h2spec_extras.go
extras/hs-test/infra/suite_vpp_proxy.go
extras/hs-test/proxy_test.go
src/plugins/hs_apps/proxy.c
src/plugins/http/http2/http2.c

index 3c2b9dd..6957557 100644 (file)
@@ -2,10 +2,12 @@ package h2spec_extras
 
 import (
        "fmt"
+       "slices"
 
        "github.com/summerwind/h2spec/config"
        "github.com/summerwind/h2spec/spec"
        "golang.org/x/net/http2"
+       "golang.org/x/net/http2/hpack"
 )
 
 var key = "extras"
@@ -25,6 +27,7 @@ func Spec() *spec.TestGroup {
        }
 
        tg.AddTestGroup(FlowControl())
+       tg.AddTestGroup(ConnectMethod())
 
        return tg
 }
@@ -53,6 +56,32 @@ func VerifyWindowUpdate(conn *spec.Conn, streamID, expectedIncrement uint32) err
        return nil
 }
 
+func VerifyTunnelClosed(conn *spec.Conn) error {
+       var streamClosed = false
+       var lastEvent spec.Event
+       for !conn.Closed {
+               ev := conn.WaitEvent()
+               lastEvent = ev
+               switch event := ev.(type) {
+               case spec.DataFrameEvent:
+                       if event.StreamEnded() {
+                               streamClosed = true
+                               goto done
+                       }
+               case spec.TimeoutEvent:
+                       goto done
+               }
+       }
+done:
+       if !streamClosed {
+               return &spec.TestError{
+                       Expected: []string{spec.ExpectedStreamClosed},
+                       Actual:   lastEvent.String(),
+               }
+       }
+       return nil
+}
+
 func FlowControl() *spec.TestGroup {
        tg := NewTestGroup("1", "Flow control")
        tg.AddTestCase(&spec.TestCase{
@@ -168,3 +197,202 @@ func FlowControl() *spec.TestGroup {
        })
        return tg
 }
+
+func ConnectHeaders(c *config.Config) []hpack.HeaderField {
+
+       return []hpack.HeaderField{
+               spec.HeaderField(":method", "CONNECT"),
+               spec.HeaderField(":authority", c.Path),
+       }
+}
+
+func ConnectMethod() *spec.TestGroup {
+       tg := NewTestGroup("2", "CONNECT method")
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Tunnel closed by target",
+               Requirement: "A proxy that receives a TCP segment with the FIN bit set sends a DATA frame with the END_STREAM flag set.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       headers := ConnectHeaders(c)
+                       hp := http2.HeadersFrameParam{
+                               StreamID:      streamID,
+                               EndStream:     false,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       err = spec.VerifyHeadersFrame(conn, streamID)
+                       if err != nil {
+                               return err
+                       }
+
+                       // send http/1.0 so target will close connection when send response
+                       conn.WriteData(streamID, false, []byte("GET /index.html HTTP/1.0\r\n\r\n"))
+
+                       // wait for DATA frame with END_STREAM flag set
+                       err = VerifyTunnelClosed(conn)
+                       if err != nil {
+                               return err
+                       }
+
+                       // client is expected to send DATA frame with the END_STREAM flag set
+                       conn.WriteData(streamID, true, []byte(""))
+
+                       return nil
+               },
+       })
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Tunnel closed by client (with attached data)",
+               Requirement: "A proxy that receives a DATA frame with the END_STREAM flag set sends the attached data with the FIN bit set on the last TCP segment.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       headers := ConnectHeaders(c)
+                       hp := http2.HeadersFrameParam{
+                               StreamID:      streamID,
+                               EndStream:     false,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       err = spec.VerifyHeadersFrame(conn, streamID)
+                       if err != nil {
+                               return err
+                       }
+
+                       // close tunnel
+                       conn.WriteData(streamID, true, []byte("HEAD /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n"))
+
+                       // wait for DATA frame with END_STREAM flag set
+                       err = VerifyTunnelClosed(conn)
+                       if err != nil {
+                               return err
+                       }
+
+                       return nil
+               },
+       })
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Tunnel closed by client (empty DATA frame)",
+               Requirement: "The final DATA frame could be empty.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       headers := ConnectHeaders(c)
+                       hp := http2.HeadersFrameParam{
+                               StreamID:      streamID,
+                               EndStream:     false,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       err = spec.VerifyHeadersFrame(conn, streamID)
+                       if err != nil {
+                               return err
+                       }
+
+                       conn.WriteData(streamID, false, []byte("HEAD /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n"))
+
+                       // verify reception of response DATA frame
+                       err = spec.VerifyEventType(conn, spec.EventDataFrame)
+                       if err != nil {
+                               return err
+                       }
+
+                       // close tunnel
+                       conn.WriteData(streamID, true, []byte(""))
+
+                       // wait for DATA frame with END_STREAM flag set
+                       err = VerifyTunnelClosed(conn)
+                       if err != nil {
+                               return err
+                       }
+
+                       return nil
+               },
+       })
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Multiple tunnels",
+               Requirement: "In HTTP/2, the CONNECT method establishes a tunnel over a single HTTP/2 stream to a remote host, rather than converting the entire connection to a tunnel.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       maxStreams, ok := conn.Settings[http2.SettingMaxConcurrentStreams]
+                       if !ok {
+                               return spec.ErrSkipped
+                       }
+
+                       for i := 0; i < int(maxStreams); i++ {
+                               headers := ConnectHeaders(c)
+                               hp := http2.HeadersFrameParam{
+                                       StreamID:      streamID,
+                                       EndStream:     false,
+                                       EndHeaders:    true,
+                                       BlockFragment: conn.EncodeHeaders(headers),
+                               }
+                               conn.WriteHeaders(hp)
+                               err = spec.VerifyHeadersFrame(conn, streamID)
+                               if err != nil {
+                                       return err
+                               }
+
+                               streamID += 2
+                       }
+
+                       streamID = 1
+                       for i := 0; i < int(maxStreams); i++ {
+                               conn.WriteData(streamID, false, []byte("HEAD /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n"))
+                               streamID += 2
+                       }
+
+                       var receivedResp []uint32
+                       for i := 0; i < int(maxStreams); i++ {
+                               actual, passed := conn.WaitEventByType(spec.EventDataFrame)
+                               switch event := actual.(type) {
+                               case spec.DataFrameEvent:
+                                       passed = !slices.Contains(receivedResp, event.StreamID)
+                               default:
+                                       passed = false
+                               }
+                               if !passed {
+                                       expected := []string{
+                                               "Receive one response per stream (tunnel)",
+                                       }
+
+                                       return &spec.TestError{
+                                               Expected: expected,
+                                               Actual:   actual.String(),
+                                       }
+                               }
+                       }
+
+                       return nil
+               },
+       })
+       return tg
+}
index 2226358..44cc6bb 100644 (file)
@@ -6,15 +6,21 @@
 package hst
 
 import (
+       "bytes"
        "fmt"
+       "io"
        "net"
+       "os"
        "reflect"
        "runtime"
        "strconv"
        "strings"
+       "time"
 
+       "fd.io/hs-test/h2spec_extras"
        . "fd.io/hs-test/infra/common"
        . "github.com/onsi/ginkgo/v2"
+       "github.com/summerwind/h2spec/config"
 )
 
 const (
@@ -124,6 +130,20 @@ func (s *VppProxySuite) SetupNginxServer() {
        s.AssertNil(s.Containers.NginxServerTransient.Start())
 }
 
+func (s *VppProxySuite) ConfigureVppProxy(proto string, proxyPort uint16) {
+       vppProxy := s.Containers.VppProxy.VppInstance
+       cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri %s://%s:%d", proto, s.VppProxyAddr(), proxyPort)
+       if proto != "http" && proto != "https" && proto != "udp" {
+               proto = "tcp"
+       }
+       if proto != "http" && proto != "https" {
+               cmd += fmt.Sprintf(" client-uri %s://%s:%d", proto, s.ServerAddr(), s.Ports.Server)
+       }
+
+       output := vppProxy.Vppctl(cmd)
+       s.Log("proxy configured: " + output)
+}
+
 func (s *VppProxySuite) ServerAddr() string {
        return s.Interfaces.Server.Ip4AddressString()
 }
@@ -163,23 +183,45 @@ func (s *VppProxySuite) CurlUploadResource(uri, file string) {
        s.AssertNotContains(log, "Operation timed out")
 }
 
-func (s *VppProxySuite) CurlDownloadResourceViaTunnel(uri string, proxyUri string) {
-       args := fmt.Sprintf("-w @/tmp/write_out_download_connect --max-time %d --insecure --proxy-insecure -p -x %s --remote-name --output-dir /tmp %s", s.maxTimeout, proxyUri, uri)
+func (s *VppProxySuite) CurlDownloadResourceViaTunnel(uri string, proxyUri string, extraArgs ...string) (string, string) {
+       extras := ""
+       if len(extraArgs) > 0 {
+               extras = strings.Join(extraArgs, " ")
+               extras += " "
+       }
+       args := fmt.Sprintf("%s-w @/tmp/write_out_download_connect --max-time %d --insecure --proxy-insecure -p -x %s --remote-name --output-dir /tmp %s", extras, s.maxTimeout, proxyUri, uri)
        writeOut, log := s.RunCurlContainer(s.Containers.Curl, args)
-       s.AssertContains(writeOut, "CONNECT response code: 200")
+       if strings.Contains(extras, "proxy-http2") {
+               // in case of h2 connect response code is 000 in write out
+               s.AssertContains(log, "CONNECT tunnel established, response 200")
+       } else {
+               s.AssertContains(writeOut, "CONNECT response code: 200")
+       }
        s.AssertContains(writeOut, "GET response code: 200")
        s.AssertNotContains(log, "bytes remaining to read")
        s.AssertNotContains(log, "Operation timed out")
        s.AssertNotContains(log, "Upgrade:")
+       return writeOut, log
 }
 
-func (s *VppProxySuite) CurlUploadResourceViaTunnel(uri, proxyUri, file string) {
-       args := fmt.Sprintf("-w @/tmp/write_out_upload_connect --max-time %d --insecure --proxy-insecure -p -x %s -T %s %s", s.maxTimeout, proxyUri, file, uri)
+func (s *VppProxySuite) CurlUploadResourceViaTunnel(uri, proxyUri, file string, extraArgs ...string) (string, string) {
+       extras := ""
+       if len(extraArgs) > 0 {
+               extras = strings.Join(extraArgs, " ")
+               extras += " "
+       }
+       args := fmt.Sprintf("%s-w @/tmp/write_out_upload_connect --max-time %d --insecure --proxy-insecure -p -x %s -T %s %s", extras, s.maxTimeout, proxyUri, file, uri)
        writeOut, log := s.RunCurlContainer(s.Containers.Curl, args)
-       s.AssertContains(writeOut, "CONNECT response code: 200")
+       if strings.Contains(extras, "proxy-http2") {
+               // in case of h2 connect response code is 000 in write out
+               s.AssertContains(log, "CONNECT tunnel established, response 200")
+       } else {
+               s.AssertContains(writeOut, "CONNECT response code: 200")
+       }
        s.AssertContains(writeOut, "PUT response code: 201")
        s.AssertNotContains(log, "Operation timed out")
        s.AssertNotContains(log, "Upgrade:")
+       return writeOut, log
 }
 
 func handleConn(conn net.Conn) {
@@ -270,3 +312,72 @@ var _ = Describe("VppProxySuiteSolo", Ordered, ContinueOnFailure, Serial, func()
                }
        }
 })
+
+var _ = Describe("H2SpecProxySuite", Ordered, ContinueOnFailure, func() {
+       var s VppProxySuite
+       BeforeAll(func() {
+               s.SetupSuite()
+       })
+       BeforeEach(func() {
+               s.SetupTest()
+       })
+       AfterAll(func() {
+               s.TeardownSuite()
+       })
+       AfterEach(func() {
+               s.TeardownTest()
+       })
+
+       testCases := []struct {
+               desc string
+       }{
+               {desc: "extras/2/1"},
+               {desc: "extras/2/2"},
+               {desc: "extras/2/3"},
+               {desc: "extras/2/4"},
+       }
+
+       for _, test := range testCases {
+               test := test
+               testName := "proxy_test.go/h2spec_" + strings.ReplaceAll(test.desc, "/", "_")
+               It(testName, func(ctx SpecContext) {
+                       s.Log(testName + ": BEGIN")
+                       s.SetupNginxServer()
+                       s.ConfigureVppProxy("https", s.Ports.Proxy)
+                       path := fmt.Sprintf("%s:%d", s.ServerAddr(), s.Ports.Server)
+                       conf := &config.Config{
+                               Host:         s.VppProxyAddr(),
+                               Port:         int(s.Ports.Proxy),
+                               Path:         path,
+                               Timeout:      time.Second * time.Duration(s.maxTimeout),
+                               MaxHeaderLen: 4096,
+                               TLS:          true,
+                               Insecure:     true,
+                               Sections:     []string{test.desc},
+                               Verbose:      true,
+                       }
+                       // capture h2spec output so it will be in log
+                       oldStdout := os.Stdout
+                       r, w, _ := os.Pipe()
+                       os.Stdout = w
+
+                       tg := h2spec_extras.Spec()
+                       tg.Test(conf)
+
+                       oChan := make(chan string)
+                       go func() {
+                               var buf bytes.Buffer
+                               io.Copy(&buf, r)
+                               oChan <- buf.String()
+                       }()
+
+                       // restore to normal state
+                       w.Close()
+                       os.Stdout = oldStdout
+                       o := <-oChan
+                       s.Log(o)
+                       s.AssertEqual(0, tg.FailedCount)
+               }, SpecTimeout(TestTimeout))
+       }
+
+})
index 875d79f..ef7748b 100644 (file)
@@ -22,7 +22,8 @@ import (
 
 func init() {
        RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest,
-               VppConnectProxyGetTest, VppConnectProxyPutTest, VppHttpsConnectProxyGetTest)
+               VppConnectProxyGetTest, VppConnectProxyPutTest, VppHttpsConnectProxyGetTest, VppH2ConnectProxyGetTest,
+               VppH2ConnectProxyPutTest)
        RegisterVppProxySoloTests(VppProxyHttpGetTcpMTTest, VppProxyHttpPutTcpMTTest, VppProxyTcpIperfMTTest,
                VppProxyUdpIperfMTTest, VppConnectProxyStressTest, VppConnectProxyStressMTTest, VppConnectProxyConnectionFailedMTTest)
        RegisterVppUdpProxyTests(VppProxyUdpTest, VppConnectUdpProxyTest, VppConnectUdpInvalidCapsuleTest,
@@ -32,20 +33,6 @@ func init() {
        RegisterNginxProxySoloTests(NginxMirroringTest, MirrorMultiThreadTest)
 }
 
-func configureVppProxy(s *VppProxySuite, proto string, proxyPort uint16) {
-       vppProxy := s.Containers.VppProxy.VppInstance
-       cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri %s://%s:%d", proto, s.VppProxyAddr(), proxyPort)
-       if proto != "http" && proto != "https" && proto != "udp" {
-               proto = "tcp"
-       }
-       if proto != "http" && proto != "https" {
-               cmd += fmt.Sprintf(" client-uri %s://%s:%d", proto, s.ServerAddr(), s.Ports.Server)
-       }
-
-       output := vppProxy.Vppctl(cmd)
-       s.Log("proxy configured: " + output)
-}
-
 func VppProxyHttpGetTcpMTTest(s *VppProxySuite) {
        VppProxyHttpGetTcpTest(s)
 }
@@ -71,9 +58,9 @@ func vppProxyIperfMTTest(s *VppProxySuite, proto string) {
        s.AssertNil(vppProxy.DeleteTap(s.Interfaces.Client))
        s.AssertNil(vppProxy.CreateTap(s.Interfaces.Client, false, 2, uint32(s.Interfaces.Client.Peer.Index), Consistent_qp))
 
-       configureVppProxy(s, "tcp", s.Ports.Proxy)
+       s.ConfigureVppProxy("tcp", s.Ports.Proxy)
        if proto == "udp" {
-               configureVppProxy(s, "udp", s.Ports.Proxy)
+               s.ConfigureVppProxy("udp", s.Ports.Proxy)
                proto = "-u"
        } else {
                proto = ""
@@ -111,14 +98,14 @@ func vppProxyIperfMTTest(s *VppProxySuite, proto string) {
 
 func VppProxyHttpGetTcpTest(s *VppProxySuite) {
        s.SetupNginxServer()
-       configureVppProxy(s, "tcp", s.Ports.Proxy)
+       s.ConfigureVppProxy("tcp", s.Ports.Proxy)
        uri := fmt.Sprintf("http://%s:%d/httpTestFile", s.VppProxyAddr(), s.Ports.Proxy)
        s.CurlDownloadResource(uri)
 }
 
 func VppProxyHttpGetTlsTest(s *VppProxySuite) {
        s.SetupNginxServer()
-       configureVppProxy(s, "tls", s.Ports.Proxy)
+       s.ConfigureVppProxy("tls", s.Ports.Proxy)
        uri := fmt.Sprintf("https://%s:%d/httpTestFile", s.VppProxyAddr(), s.Ports.Proxy)
        s.CurlDownloadResource(uri)
 }
@@ -129,14 +116,14 @@ func VppProxyHttpPutTcpMTTest(s *VppProxySuite) {
 
 func VppProxyHttpPutTcpTest(s *VppProxySuite) {
        s.SetupNginxServer()
-       configureVppProxy(s, "tcp", s.Ports.Proxy)
+       s.ConfigureVppProxy("tcp", s.Ports.Proxy)
        uri := fmt.Sprintf("http://%s:%d/upload/testFile", s.VppProxyAddr(), s.Ports.Proxy)
        s.CurlUploadResource(uri, CurlContainerTestFile)
 }
 
 func VppProxyHttpPutTlsTest(s *VppProxySuite) {
        s.SetupNginxServer()
-       configureVppProxy(s, "tls", s.Ports.Proxy)
+       s.ConfigureVppProxy("tls", s.Ports.Proxy)
        uri := fmt.Sprintf("https://%s:%d/upload/testFile", s.VppProxyAddr(), s.Ports.Proxy)
        s.CurlUploadResource(uri, CurlContainerTestFile)
 }
@@ -173,7 +160,7 @@ func nginxMirroring(s *NginxProxySuite, multiThreadWorkers bool) {
 
 func VppConnectProxyGetTest(s *VppProxySuite) {
        s.SetupNginxServer()
-       configureVppProxy(s, "http", s.Ports.Proxy)
+       s.ConfigureVppProxy("http", s.Ports.Proxy)
 
        targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server)
        proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
@@ -182,16 +169,27 @@ func VppConnectProxyGetTest(s *VppProxySuite) {
 
 func VppHttpsConnectProxyGetTest(s *VppProxySuite) {
        s.SetupNginxServer()
-       configureVppProxy(s, "https", s.Ports.Proxy)
+       s.ConfigureVppProxy("https", s.Ports.Proxy)
 
        targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server)
        proxyUri := fmt.Sprintf("https://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
        s.CurlDownloadResourceViaTunnel(targetUri, proxyUri)
 }
 
+func VppH2ConnectProxyGetTest(s *VppProxySuite) {
+       s.SetupNginxServer()
+       s.ConfigureVppProxy("https", s.Ports.Proxy)
+
+       targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server)
+       proxyUri := fmt.Sprintf("https://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
+       _, log := s.CurlDownloadResourceViaTunnel(targetUri, proxyUri, "--proxy-http2")
+       // ALPN result check
+       s.AssertContains(log, "CONNECT tunnel: HTTP/2 negotiated")
+}
+
 func VppConnectProxyConnectionFailedMTTest(s *VppProxySuite) {
        s.SetupNginxServer()
-       configureVppProxy(s, "http", s.Ports.Proxy)
+       s.ConfigureVppProxy("http", s.Ports.Proxy)
 
        targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server+1)
        proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
@@ -201,13 +199,24 @@ func VppConnectProxyConnectionFailedMTTest(s *VppProxySuite) {
 
 func VppConnectProxyPutTest(s *VppProxySuite) {
        s.SetupNginxServer()
-       configureVppProxy(s, "http", s.Ports.Proxy)
+       s.ConfigureVppProxy("http", s.Ports.Proxy)
 
        proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
        targetUri := fmt.Sprintf("http://%s:%d/upload/testFile", s.ServerAddr(), s.Ports.Server)
        s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile)
 }
 
+func VppH2ConnectProxyPutTest(s *VppProxySuite) {
+       s.SetupNginxServer()
+       s.ConfigureVppProxy("https", s.Ports.Proxy)
+
+       proxyUri := fmt.Sprintf("https://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
+       targetUri := fmt.Sprintf("http://%s:%d/upload/testFile", s.ServerAddr(), s.Ports.Server)
+       _, log := s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile, "--proxy-http2")
+       // ALPN result check
+       s.AssertContains(log, "CONNECT tunnel: HTTP/2 negotiated")
+}
+
 func vppConnectProxyStressLoad(s *VppProxySuite, proxyPort string) {
        var (
                connectError, timeout, readError, writeError, invalidData, total atomic.Uint32
@@ -320,7 +329,7 @@ func VppConnectProxyStressTest(s *VppProxySuite) {
        remoteServerConn := s.StartEchoServer()
        defer remoteServerConn.Close()
 
-       configureVppProxy(s, "http", s.Ports.Proxy)
+       s.ConfigureVppProxy("http", s.Ports.Proxy)
 
        // no goVPP less noise
        s.Containers.VppProxy.VppInstance.Disconnect()
@@ -340,7 +349,7 @@ func VppConnectProxyStressMTTest(s *VppProxySuite) {
        s.AssertNil(vppProxy.DeleteTap(s.Interfaces.Client))
        s.AssertNil(vppProxy.CreateTap(s.Interfaces.Client, false, 2, uint32(s.Interfaces.Client.Peer.Index), Consistent_qp))
 
-       configureVppProxy(s, "http", s.Ports.Proxy)
+       s.ConfigureVppProxy("http", s.Ports.Proxy)
 
        // no goVPP less noise
        vppProxy.Disconnect()
index c8bdc73..445235f 100644 (file)
@@ -1282,8 +1282,6 @@ proxy_server_listen ()
            &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO,
            sizeof (transport_endpt_crypto_cfg_t));
          ext_cfg->crypto.ckpair_index = pm->ckpair_index;
-         /* TODO: remove when http/2 connect done */
-         ext_cfg->crypto.alpn_protos[0] = TLS_ALPN_PROTO_HTTP_1_1;
        }
     }
   else
index f9f281f..9cf8181 100644 (file)
@@ -7,6 +7,7 @@
 #include <http/http2/frame.h>
 #include <http/http_private.h>
 #include <http/http_timer.h>
+#include <http/http_status_codes.h>
 
 #define HTTP2_WIN_SIZE_MAX     0x7FFFFFFF
 #define HTTP2_INITIAL_WIN_SIZE 65535
@@ -58,6 +59,8 @@ typedef struct http2_req_
   clib_llist_anchor_t sched_list;
   void (*dispatch_headers_cb) (struct http2_req_ *req, http_conn_t *hc,
                               u8 *n_emissions, clib_llist_index_t *next_ri);
+  void (*dispatch_data_cb) (struct http2_req_ *req, http_conn_t *hc,
+                           u8 *n_emissions);
 } http2_req_t;
 
 #define foreach_http2_conn_flags                                              \
@@ -381,6 +384,18 @@ http2_send_stream_error (http_conn_t *hc, u32 stream_id, http2_error_t error,
   http_io_ts_after_write (hc, 1);
 }
 
+always_inline void
+http2_tunnel_send_close (http_conn_t *hc, http2_req_t *req)
+{
+  u8 *response;
+
+  response = http_get_tx_buf (hc);
+  http2_frame_write_data_header (0, req->stream_id,
+                                HTTP2_FRAME_FLAG_END_STREAM, response);
+  http_io_ts_write (hc, response, HTTP2_FRAME_HEADER_SIZE, 0);
+  http_io_ts_after_write (hc, 1);
+}
+
 /* send RST_STREAM frame and notify app */
 always_inline void
 http2_stream_error (http_conn_t *hc, http2_req_t *req, http2_error_t error,
@@ -458,6 +473,150 @@ http2_send_server_preface (http_conn_t *hc)
 /* stream TX scheduler */
 /***********************/
 
+static void
+http2_sched_dispatch_data (http2_req_t *req, http_conn_t *hc, u8 *n_emissions)
+{
+  u32 max_write, max_read, n_segs, n_read, n_written = 0;
+  svm_fifo_seg_t *app_segs, *segs = 0;
+  http_buffer_t *hb = &req->base.tx_buf;
+  u8 fh[HTTP2_FRAME_HEADER_SIZE];
+  u8 finished = 0, flags = 0;
+  http2_conn_ctx_t *h2c;
+
+  ASSERT (http_buffer_bytes_left (hb) > 0);
+
+  *n_emissions += hb->type == HTTP_BUFFER_PTR ? HTTP2_SCHED_WEIGHT_DATA_PTR :
+                                               HTTP2_SCHED_WEIGHT_DATA_INLINE;
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+
+  max_write = http_io_ts_max_write (hc, 0);
+  max_write -= HTTP2_FRAME_HEADER_SIZE;
+  max_write = clib_min (max_write, (u32) req->peer_window);
+  max_write = clib_min (max_write, h2c->peer_window);
+  max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
+
+  max_read = http_buffer_bytes_left (hb);
+
+  n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs);
+  if (n_read == 0)
+    {
+      HTTP_DBG (1, "no data to deq");
+      transport_connection_reschedule (&req->base.connection);
+      return;
+    }
+
+  finished = (max_read - n_read) == 0;
+  flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0;
+  http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
+  vec_validate (segs, 0);
+  segs[0].len = HTTP2_FRAME_HEADER_SIZE;
+  segs[0].data = fh;
+  vec_append (segs, app_segs);
+
+  n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0);
+  n_written -= HTTP2_FRAME_HEADER_SIZE;
+  vec_free (segs);
+  http_buffer_drain (hb, n_written);
+  req->peer_window -= n_written;
+  h2c->peer_window -= n_written;
+
+  if (finished)
+    {
+      /* all done, close stream */
+      http_buffer_free (hb);
+      if (hc->flags & HTTP_CONN_F_IS_SERVER)
+       http2_stream_close (req, hc);
+      else
+       req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+    }
+  else
+    {
+      if (req->peer_window == 0)
+       {
+         /* mark that we need window update on stream */
+         HTTP_DBG (1, "stream window is full");
+         req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+       }
+      else
+       {
+         /* schedule for next round */
+         HTTP_DBG (1, "adding to data queue req_index %x",
+                   ((http_req_handle_t) req->base.hr_req_handle).req_index);
+         http2_req_schedule_data_tx (hc, req);
+         http_io_as_dequeue_notify (&req->base, n_written);
+       }
+    }
+
+  http_io_ts_after_write (hc, finished);
+}
+
+static void
+http2_sched_dispatch_tunnel (http2_req_t *req, http_conn_t *hc,
+                            u8 *n_emissions)
+{
+  u32 max_write, max_read, n_segs = 2, n_read, n_written = 0;
+  svm_fifo_seg_t segs[n_segs + 1];
+  u8 fh[HTTP2_FRAME_HEADER_SIZE];
+  u8 flags = 0;
+  http2_conn_ctx_t *h2c;
+
+  *n_emissions += HTTP2_SCHED_WEIGHT_DATA_INLINE;
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+
+  max_read = http_io_as_max_read (&req->base);
+  if (max_read == 0)
+    {
+      HTTP_DBG (2, "max_read == 0");
+      transport_connection_reschedule (&req->base.connection);
+      return;
+    }
+  max_write = http_io_ts_max_write (hc, 0);
+  max_write -= HTTP2_FRAME_HEADER_SIZE;
+  max_write = clib_min (max_write, (u32) req->peer_window);
+  max_write = clib_min (max_write, h2c->peer_window);
+  max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
+  n_read = clib_min (max_write, max_read);
+
+  if (req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED &&
+      max_write >= max_read)
+    {
+      HTTP_DBG (1, "closing tunnel");
+      session_transport_closed_notify (&req->base.connection);
+      flags = HTTP2_FRAME_FLAG_END_STREAM;
+    }
+  http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
+  segs[0].len = HTTP2_FRAME_HEADER_SIZE;
+  segs[0].data = fh;
+
+  http_io_as_read_segs (&req->base, segs + 1, &n_segs, n_read);
+
+  n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0);
+  n_written -= HTTP2_FRAME_HEADER_SIZE;
+  http_io_as_drain (&req->base, n_written);
+  req->peer_window -= n_written;
+  h2c->peer_window -= n_written;
+
+  if (req->peer_window == 0)
+    {
+      /* mark that we need window update on stream */
+      HTTP_DBG (1, "stream window is full");
+      req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+    }
+  else if (max_read - n_written)
+    {
+      /* schedule for next round if we have more data */
+      HTTP_DBG (1, "adding to data queue req_index %x",
+               ((http_req_handle_t) req->base.hr_req_handle).req_index);
+      http2_req_schedule_data_tx (hc, req);
+    }
+  else
+    transport_connection_reschedule (&req->base.connection);
+
+  http_io_ts_after_write (hc, 0);
+}
+
 static void
 http2_sched_dispatch_continuation (http2_req_t *req, http_conn_t *hc,
                                   u8 *n_emissions,
@@ -502,6 +661,7 @@ http2_sched_dispatch_continuation (http2_req_t *req, http_conn_t *hc,
       if (http_buffer_bytes_left (&req->base.tx_buf))
        {
          /* start sending the actual data */
+         req->dispatch_data_cb = http2_sched_dispatch_data;
          HTTP_DBG (1, "adding to data queue req_index %x",
                    ((http_req_handle_t) req->base.hr_req_handle).req_index);
          http2_req_schedule_data_tx (hc, req);
@@ -567,14 +727,19 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc,
 
   stream_id = req->stream_id;
 
+  /* tunnel established if 2xx (Successful) response to CONNECT */
+  req->base.is_tunnel =
+    (req->base.is_tunnel && http_status_code_str[msg.code][0] == '2');
+
   /* END_STREAM flag need to be set in HEADERS frame */
   if (msg.data.body_len)
     {
+      ASSERT (req->base.is_tunnel == 0);
       http_req_tx_buffer_init (&req->base, &msg);
       http_io_as_dequeue_notify (&req->base, n_deq);
     }
   else
-    flags |= HTTP2_FRAME_FLAG_END_STREAM;
+    flags |= req->base.is_tunnel ? 0 : HTTP2_FRAME_FLAG_END_STREAM;
 
   if (headers_len <= max_write)
     {
@@ -584,12 +749,23 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc,
       if (msg.data.body_len)
        {
          /* start sending the actual data */
+         req->dispatch_data_cb = http2_sched_dispatch_data;
          HTTP_DBG (1, "adding to data queue req_index %x",
                    ((http_req_handle_t) req->base.hr_req_handle).req_index);
          http2_req_schedule_data_tx (hc, req);
        }
+      else if (req->base.is_tunnel)
+       {
+         req->dispatch_data_cb = http2_sched_dispatch_tunnel;
+         transport_connection_reschedule (&req->base.connection);
+         /* cleanup some stuff we don't need anymore in tunnel mode */
+         vec_free (req->base.headers);
+       }
       else
-       http2_stream_close (req, hc);
+       {
+         /* otherwise we are done */
+         http2_stream_close (req, hc);
+       }
     }
   else
     {
@@ -616,84 +792,6 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc,
   http_io_ts_after_write (hc, 0);
 }
 
-static void
-http2_sched_dispatch_data (http2_req_t *req, http_conn_t *hc, u8 *n_emissions)
-{
-  u32 max_write, max_read, n_segs, n_read, n_written = 0;
-  svm_fifo_seg_t *app_segs, *segs = 0;
-  http_buffer_t *hb = &req->base.tx_buf;
-  u8 fh[HTTP2_FRAME_HEADER_SIZE];
-  u8 finished = 0, flags = 0;
-  http2_conn_ctx_t *h2c;
-
-  ASSERT (http_buffer_bytes_left (hb) > 0);
-
-  *n_emissions += hb->type == HTTP_BUFFER_PTR ? HTTP2_SCHED_WEIGHT_DATA_PTR :
-                                               HTTP2_SCHED_WEIGHT_DATA_INLINE;
-
-  h2c = http2_conn_ctx_get_w_thread (hc);
-
-  max_write = http_io_ts_max_write (hc, 0);
-  max_write -= HTTP2_FRAME_HEADER_SIZE;
-  max_write = clib_min (max_write, (u32) req->peer_window);
-  max_write = clib_min (max_write, h2c->peer_window);
-  max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
-
-  max_read = http_buffer_bytes_left (hb);
-
-  n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs);
-  if (n_read == 0)
-    {
-      HTTP_DBG (1, "no data to deq");
-      transport_connection_reschedule (&req->base.connection);
-      return;
-    }
-
-  finished = (max_read - n_read) == 0;
-  flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0;
-  http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
-  vec_validate (segs, 0);
-  segs[0].len = HTTP2_FRAME_HEADER_SIZE;
-  segs[0].data = fh;
-  vec_append (segs, app_segs);
-
-  n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0);
-  ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read));
-  vec_free (segs);
-  http_buffer_drain (hb, n_read);
-  req->peer_window -= n_read;
-  h2c->peer_window -= n_read;
-
-  if (finished)
-    {
-      /* all done, close stream */
-      http_buffer_free (hb);
-      if (hc->flags & HTTP_CONN_F_IS_SERVER)
-       http2_stream_close (req, hc);
-      else
-       req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
-    }
-  else
-    {
-      if (req->peer_window == 0)
-       {
-         /* mark that we need window update on stream */
-         HTTP_DBG (1, "stream window is full");
-         req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
-       }
-      else
-       {
-         /* schedule for next round */
-         HTTP_DBG (1, "adding to data queue req_index %x",
-                   ((http_req_handle_t) req->base.hr_req_handle).req_index);
-         http2_req_schedule_data_tx (hc, req);
-         http_io_as_dequeue_notify (&req->base, n_read);
-       }
-    }
-
-  http_io_ts_after_write (hc, finished);
-}
-
 static void
 http2_update_time_callback (f64 now, u8 thread_index)
 {
@@ -755,7 +853,7 @@ http2_update_time_callback (f64 now, u8 thread_index)
                1, "sending data req_index %x",
                ((http_req_handle_t) req->base.hr_req_handle).req_index);
              clib_llist_remove (wrk->req_pool, sched_list, req);
-             http2_sched_dispatch_data (req, hc, &n_emissions);
+             req->dispatch_data_cb (req, hc, &n_emissions);
              if (ri == old_ti)
                break;
 
@@ -791,6 +889,7 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
   http2_conn_ctx_t *h2c;
   hpack_request_control_data_t control_data;
   http_msg_t msg;
+  u8 *p;
   int rv;
   http_req_state_t new_state = HTTP_REQ_STATE_WAIT_APP_REPLY;
   http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
@@ -816,8 +915,7 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
       http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
       return HTTP_SM_STOP;
     }
-  if (control_data.method == HTTP_REQ_UNKNOWN ||
-      control_data.method == HTTP_REQ_CONNECT)
+  if (control_data.method == HTTP_REQ_UNKNOWN)
     {
       HTTP_DBG (1, "unsupported method");
       http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
@@ -843,13 +941,46 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
       http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
       return HTTP_SM_STOP;
     }
-  if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_AUTHORITY_PARSED) &&
-      control_data.method != HTTP_REQ_CONNECT)
+  if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_AUTHORITY_PARSED))
     {
-      HTTP_DBG (1, ":path pseudo-header missing in request");
+      HTTP_DBG (1, ":authority pseudo-header missing in request");
       http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
       return HTTP_SM_STOP;
     }
+  if (control_data.method == HTTP_REQ_CONNECT)
+    {
+      if (control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_SCHEME_PARSED ||
+         control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PATH_PARSED)
+       {
+         HTTP_DBG (1, ":scheme and :path pseudo-header must be omitted for "
+                      "CONNECT method");
+         http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+         return HTTP_SM_STOP;
+       }
+      /* quick check if port is present */
+      p = control_data.authority + control_data.authority_len;
+      p--;
+      if (!isdigit (*p))
+       {
+         HTTP_DBG (1, "port not present in authority");
+         http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+         return HTTP_SM_STOP;
+       }
+      p--;
+      for (; p > control_data.authority; p--)
+       {
+         if (!isdigit (*p))
+           break;
+       }
+      if (*p != ':')
+       {
+         HTTP_DBG (1, "port not present in authority");
+         http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+         return HTTP_SM_STOP;
+       }
+      req->base.is_tunnel = 1;
+      http_io_as_add_want_read_ntf (&req->base);
+    }
 
   req->base.control_data_len = control_data.control_data_len;
   req->base.headers_offset = control_data.headers - wrk->header_list;
@@ -868,7 +999,9 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
       http_io_as_add_want_read_ntf (&req->base);
     }
   /* TODO: message framing without content length using END_STREAM flag */
-  if (req->base.body_len == 0 && req->stream_state == HTTP2_STREAM_STATE_OPEN)
+  if (req->base.body_len == 0 &&
+      req->stream_state == HTTP2_STREAM_STATE_OPEN &&
+      control_data.method != HTTP_REQ_CONNECT)
     {
       HTTP_DBG (1, "no content-length and DATA frame expected");
       *error = HTTP2_ERROR_INTERNAL_ERROR;
@@ -876,14 +1009,22 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
     }
   req->base.to_recv = req->base.body_len;
 
-  req->base.target_path_len = control_data.path_len;
-  req->base.target_path_offset = control_data.path - wrk->header_list;
-  /* drop leading slash */
-  req->base.target_path_offset++;
-  req->base.target_path_len--;
   req->base.target_query_offset = 0;
   req->base.target_query_len = 0;
-  http_identify_optional_query (&req->base, wrk->header_list);
+  if (control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PATH_PARSED)
+    {
+      req->base.target_path_len = control_data.path_len;
+      req->base.target_path_offset = control_data.path - wrk->header_list;
+      /* drop leading slash */
+      req->base.target_path_offset++;
+      req->base.target_path_len--;
+      http_identify_optional_query (&req->base, wrk->header_list);
+    }
+  else
+    {
+      req->base.target_path_len = 0;
+      req->base.target_path_offset = 0;
+    }
 
   msg.type = HTTP_MSG_REQUEST;
   msg.method_type = control_data.method;
@@ -954,6 +1095,27 @@ http2_req_state_transport_io_more_data (http_conn_t *hc, http2_req_t *req,
   return HTTP_SM_STOP;
 }
 
+static http_sm_result_t
+http2_req_state_tunnel_rx (http_conn_t *hc, http2_req_t *req,
+                          transport_send_params_t *sp, http2_error_t *error)
+{
+  u32 max_enq;
+
+  HTTP_DBG (1, "tunnel received data from client");
+
+  max_enq = http_io_as_max_write (&req->base);
+  if (max_enq < req->payload_len)
+    {
+      clib_warning ("app's rx fifo full");
+      http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
+  http_io_as_write (&req->base, req->payload, req->payload_len);
+  http_app_worker_rx_notify (&req->base);
+
+  return HTTP_SM_STOP;
+}
+
 /*************************************/
 /* request state machine handlers TX */
 /*************************************/
@@ -977,7 +1139,9 @@ http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req,
   clib_llist_add_tail (wrk->req_pool, sched_list, req, he);
   http2_conn_schedule (h2c, hc->c_thread_index);
 
-  http_req_state_change (&req->base, HTTP_REQ_STATE_APP_IO_MORE_DATA);
+  http_req_state_change (&req->base, req->base.is_tunnel ?
+                                      HTTP_REQ_STATE_TUNNEL :
+                                      HTTP_REQ_STATE_APP_IO_MORE_DATA);
   http_req_deschedule (&req->base, sp);
 
   return HTTP_SM_STOP;
@@ -1005,6 +1169,29 @@ http2_req_state_app_io_more_data (http_conn_t *hc, http2_req_t *req,
   return HTTP_SM_STOP;
 }
 
+static http_sm_result_t
+http2_req_state_tunnel_tx (http_conn_t *hc, http2_req_t *req,
+                          transport_send_params_t *sp, http2_error_t *error)
+{
+  http2_conn_ctx_t *h2c;
+
+  ASSERT (!clib_llist_elt_is_linked (req, sched_list));
+
+  HTTP_DBG (1, "tunnel received data from target");
+
+  /* add data back to stream scheduler */
+  HTTP_DBG (1, "adding to data queue req_index %x",
+           ((http_req_handle_t) req->base.hr_req_handle).req_index);
+  http2_req_schedule_data_tx (hc, req);
+  h2c = http2_conn_ctx_get_w_thread (hc);
+  if (h2c->peer_window > 0)
+    http2_conn_schedule (h2c, hc->c_thread_index);
+
+  http_req_deschedule (&req->base, sp);
+
+  return HTTP_SM_STOP;
+}
+
 /*************************/
 /* request state machine */
 /*************************/
@@ -1022,7 +1209,7 @@ static http2_sm_handler tx_state_funcs[HTTP_REQ_N_STATES] = {
   0, /* wait transport method */
   http2_req_state_wait_app_reply,
   http2_req_state_app_io_more_data,
-  0, /* tunnel */
+  http2_req_state_tunnel_tx,
   0, /* udp tunnel */
 };
 
@@ -1034,7 +1221,7 @@ static http2_sm_handler rx_state_funcs[HTTP_REQ_N_STATES] = {
   http2_req_state_wait_transport_method,
   0, /* wait app reply */
   0, /* app io more data */
-  0, /* tunnel */
+  http2_req_state_tunnel_rx,
   0, /* udp tunnel */
 };
 
@@ -1268,7 +1455,17 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
     }
 
   if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM)
-    req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+    {
+      req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+      if (req->base.is_tunnel)
+       {
+         session_transport_closing_notify (&req->base.connection);
+         HTTP_DBG (1, "client closed tunnel");
+         /* final DATA frame could be empty */
+         if (fh->length == 0)
+           return HTTP2_ERROR_NO_ERROR;
+       }
+    }
 
   rx_buf = http_get_rx_buf (hc);
   vec_validate (rx_buf, fh->length - 1);
@@ -1687,7 +1884,6 @@ static void
 http2_app_rx_evt_callback (http_conn_t *hc, u32 req_index,
                           clib_thread_index_t thread_index)
 {
-  /* TODO: continue tunnel RX */
   http2_req_t *req;
   u8 *response;
   u32 increment;
@@ -1707,6 +1903,8 @@ http2_app_rx_evt_callback (http_conn_t *hc, u32 req_index,
       response = http_get_tx_buf (hc);
       increment = http_io_as_max_write (&req->base) - req->our_window;
       HTTP_DBG (1, "stream window increment %u", increment);
+      if (increment == 0)
+       return;
       req->our_window += increment;
       http2_frame_write_window_update (increment, req->stream_id, &response);
       http_io_ts_write (hc, response, vec_len (response), 0);
@@ -1735,6 +1933,34 @@ http2_app_close_callback (http_conn_t *hc, u32 req_index,
       HTTP_DBG (1, "nothing more to send, confirm close");
       session_transport_closed_notify (&req->base.connection);
     }
+  else if (req->base.is_tunnel)
+    {
+      switch (req->stream_state)
+       {
+       case HTTP2_STREAM_STATE_OPEN:
+         HTTP_DBG (1, "proxy closing connection");
+         req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+         if (http_io_as_max_read (&req->base))
+           {
+             HTTP_DBG (1, "wait for all data to be written to ts");
+             req->flags |= HTTP2_REQ_F_APP_CLOSED;
+           }
+         else
+           {
+             HTTP_DBG (1, "nothing more to send, closing tunnel");
+             http2_tunnel_send_close (hc, req);
+           }
+         break;
+       case HTTP2_STREAM_STATE_HALF_CLOSED:
+         HTTP_DBG (1, "proxy confirmed close");
+         http2_tunnel_send_close (hc, req);
+         session_transport_closed_notify (&req->base.connection);
+         break;
+       default:
+         session_transport_closed_notify (&req->base.connection);
+         break;
+       }
+    }
   else
     {
       HTTP_DBG (1, "wait for all data to be written to ts");