http: Tunneling UDP over HTTP/2 76/43176/8
authorMatus Fabian <[email protected]>
Fri, 13 Jun 2025 15:39:23 +0000 (11:39 -0400)
committerMatus Fabian <[email protected]>
Thu, 10 Jul 2025 08:19:02 +0000 (04:19 -0400)
Type: feature

Change-Id: I94f6af893872ae28669b7b9c30d61e58c0b65422
Signed-off-by: Matus Fabian <[email protected]>
extras/hs-test/h2spec_extras/h2spec_extras.go
extras/hs-test/infra/suite_vpp_proxy.go
extras/hs-test/infra/suite_vpp_udp_proxy.go
extras/hs-test/infra/utils.go
extras/hs-test/proxy_test.go
src/plugins/http/http2/http2.c

index b1d86a0..a48cdac 100644 (file)
@@ -1,10 +1,15 @@
 package h2spec_extras
 
 import (
+       "bytes"
+       "errors"
        "fmt"
        "slices"
        "strconv"
+       "strings"
 
+       "github.com/quic-go/quic-go/http3"
+       "github.com/quic-go/quic-go/quicvarint"
        "github.com/summerwind/h2spec/config"
        "github.com/summerwind/h2spec/spec"
        "golang.org/x/net/http2"
@@ -208,6 +213,24 @@ func ConnectHeaders(c *config.Config) []hpack.HeaderField {
        }
 }
 
+func readTcpTunnel(conn *spec.Conn, streamID uint32) ([]byte, error) {
+       actual, passed := conn.WaitEventByType(spec.EventDataFrame)
+       switch event := actual.(type) {
+       case spec.DataFrameEvent:
+               passed = event.Header().StreamID == streamID
+       default:
+               passed = false
+       }
+       if !passed {
+               return nil, &spec.TestError{
+                       Expected: []string{spec.EventDataFrame.String()},
+                       Actual:   actual.String(),
+               }
+       }
+       df, _ := actual.(spec.DataFrameEvent)
+       return df.Data(), nil
+}
+
 func ConnectMethod() *spec.TestGroup {
        tg := NewTestGroup("2", "CONNECT method")
 
@@ -480,5 +503,436 @@ func ExtendedConnectMethod() *spec.TestGroup {
                        return spec.VerifyStreamError(conn, http2.ErrCodeProtocol)
                },
        })
+
+       tg.AddTestGroup(ConnectUdp())
+
+       return tg
+}
+
+func ConnectUdpHeaders(c *config.Config) []hpack.HeaderField {
+
+       headers := spec.CommonHeaders(c)
+       headers[0].Value = "CONNECT"
+       headers = append(headers, spec.HeaderField(":protocol", "connect-udp"))
+       headers = append(headers, spec.HeaderField("capsule-protocol", "?1"))
+       return headers
+}
+
+func writeCapsule(conn *spec.Conn, streamID uint32, endStream bool, payload []byte) error {
+       b := make([]byte, 0)
+       b = quicvarint.Append(b, 0)
+       b = append(b, payload...)
+       var capsule bytes.Buffer
+       err := http3.WriteCapsule(&capsule, 0, b)
+       if err != nil {
+               return err
+       }
+
+       return conn.WriteData(streamID, endStream, capsule.Bytes())
+}
+
+func readCapsule(conn *spec.Conn, streamID uint32) ([]byte, error) {
+       actual, passed := conn.WaitEventByType(spec.EventDataFrame)
+       switch event := actual.(type) {
+       case spec.DataFrameEvent:
+               passed = event.Header().StreamID == streamID
+       default:
+               passed = false
+       }
+       if !passed {
+               return nil, &spec.TestError{
+                       Expected: []string{spec.EventDataFrame.String()},
+                       Actual:   actual.String(),
+               }
+       }
+       df, _ := actual.(spec.DataFrameEvent)
+       r := bytes.NewReader(df.Data())
+       capsuleType, payloadReader, err := http3.ParseCapsule(r)
+       if err != nil {
+               return nil, err
+       }
+       if capsuleType != 0 {
+               return nil, errors.New("capsule type should be 0")
+       }
+       b := make([]byte, 1024)
+       n, err := payloadReader.Read(b)
+       if err != nil {
+               return nil, err
+       }
+       if n < 3 {
+               return nil, errors.New("response payload too short")
+       }
+       if b[0] != 0 {
+               return nil, errors.New("context id should be 0")
+       }
+       return b[1:n], nil
+}
+
+func ConnectUdp() *spec.TestGroup {
+       tg := NewTestGroup("3.1", "Proxying UDP in HTTP")
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Tunneling UDP over HTTP/2",
+               Requirement: "To initiate a UDP tunnel associated with a single HTTP stream, a client issues a request containing the \"connect-udp\" upgrade token. The target of the tunnel is indicated by the client to the UDP proxy via the \"target_host\" and \"target_port\" variables of the URI Template",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       headers := ConnectUdpHeaders(c)
+                       hp := http2.HeadersFrameParam{
+                               StreamID:      streamID,
+                               EndStream:     false,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       // verify response headers
+                       actual, passed := conn.WaitEventByType(spec.EventHeadersFrame)
+                       switch event := actual.(type) {
+                       case spec.HeadersFrameEvent:
+                               passed = event.Header().StreamID == streamID
+                       default:
+                               passed = false
+                       }
+                       if !passed {
+                               expected := []string{
+                                       fmt.Sprintf("DATA Frame (length:1, flags:0x00, stream_id:%d)", streamID),
+                               }
+
+                               return &spec.TestError{
+                                       Expected: expected,
+                                       Actual:   actual.String(),
+                               }
+                       }
+                       hf, _ := actual.(spec.HeadersFrameEvent)
+                       respHeaders := make([]hpack.HeaderField, 0, 256)
+                       decoder := hpack.NewDecoder(4096, func(f hpack.HeaderField) { respHeaders = append(respHeaders, f) })
+                       _, err = decoder.Write(hf.HeaderBlockFragment())
+                       if err != nil {
+                               return err
+                       }
+                       if !slices.Contains(respHeaders, spec.HeaderField("capsule-protocol", "?1")) {
+                               hs := ""
+                               for _, h := range respHeaders {
+                                       hs += h.String() + "\n"
+                               }
+                               return &spec.TestError{
+                                       Expected: []string{"\"capsule-protocol: ?1\" header received"},
+                                       Actual:   hs,
+                               }
+                       }
+                       if !slices.Contains(respHeaders, spec.HeaderField(":status", "200")) {
+                               hs := ""
+                               for _, h := range respHeaders {
+                                       hs += h.String() + "\n"
+                               }
+                               return &spec.TestError{
+                                       Expected: []string{"\":status: 200\" header received"},
+                                       Actual:   hs,
+                               }
+                       }
+                       for _, h := range respHeaders {
+                               if h.Name == "content-length" {
+                                       return &spec.TestError{
+                                               Expected: []string{"\"content-length\" header must not be used"},
+                                               Actual:   h.String(),
+                                       }
+                               }
+                       }
+
+                       // send and receive data over tunnel
+                       data := []byte("hello")
+                       err = writeCapsule(conn, streamID, false, data)
+                       if err != nil {
+                               return err
+                       }
+                       resp, err := readCapsule(conn, streamID)
+                       if err != nil {
+                               return err
+                       }
+                       if !bytes.Equal(data, resp) {
+                               return &spec.TestError{
+                                       Expected: []string{"capsule payload: " + string(data)},
+                                       Actual:   "capsule payload:" + string(resp),
+                               }
+                       }
+                       // try again
+                       err = writeCapsule(conn, streamID, false, data)
+                       if err != nil {
+                               return err
+                       }
+                       resp, err = readCapsule(conn, streamID)
+                       if err != nil {
+                               return err
+                       }
+                       if !bytes.Equal(data, resp) {
+                               return &spec.TestError{
+                                       Expected: []string{"capsule payload: " + string(data)},
+                                       Actual:   "capsule payload:" + string(resp),
+                               }
+                       }
+                       return nil
+               },
+       })
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Multiple tunnels",
+               Requirement: "In HTTP/2, the data stream of a given HTTP request consists of all bytes sent in DATA frames with the corresponding stream ID.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       maxStreams, ok := conn.Settings[http2.SettingMaxConcurrentStreams]
+                       if !ok {
+                               return spec.ErrSkipped
+                       }
+
+                       for i := 0; i < int(maxStreams); i++ {
+                               headers := ConnectUdpHeaders(c)
+                               hp := http2.HeadersFrameParam{
+                                       StreamID:      streamID,
+                                       EndStream:     false,
+                                       EndHeaders:    true,
+                                       BlockFragment: conn.EncodeHeaders(headers),
+                               }
+                               conn.WriteHeaders(hp)
+                               err = spec.VerifyHeadersFrame(conn, streamID)
+                               if err != nil {
+                                       return err
+                               }
+
+                               streamID += 2
+                       }
+
+                       streamID = 1
+                       data := []byte("hello")
+                       for i := 0; i < int(maxStreams); i++ {
+                               err = writeCapsule(conn, streamID, false, data)
+                               if err != nil {
+                                       return err
+                               }
+                       }
+
+                       for i := 0; i < int(maxStreams); i++ {
+                               resp, err := readCapsule(conn, streamID)
+                               if err != nil {
+                                       return err
+                               }
+                               if !bytes.Equal(data, resp) {
+                                       return &spec.TestError{
+                                               Expected: []string{"capsule payload: " + string(data)},
+                                               Actual:   "capsule payload:" + string(resp),
+                                       }
+                               }
+                       }
+
+                       return nil
+               },
+       })
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Tunnel closed by client (with attached data)",
+               Requirement: "A proxy that receives a DATA frame with the END_STREAM flag set sends the attached data and close UDP connection.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       headers := ConnectUdpHeaders(c)
+                       hp := http2.HeadersFrameParam{
+                               StreamID:      streamID,
+                               EndStream:     false,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       err = spec.VerifyHeadersFrame(conn, streamID)
+                       if err != nil {
+                               return err
+                       }
+
+                       // close tunnel
+                       data := []byte("hello")
+                       err = writeCapsule(conn, streamID, true, data)
+                       if err != nil {
+                               return err
+                       }
+
+                       // wait for DATA frame with END_STREAM flag set
+                       err = VerifyTunnelClosed(conn)
+                       if err != nil {
+                               return err
+                       }
+
+                       return nil
+               },
+       })
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Tunnel closed by client (empty DATA frame)",
+               Requirement: "The final DATA frame could be empty.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       headers := ConnectUdpHeaders(c)
+                       hp := http2.HeadersFrameParam{
+                               StreamID:      streamID,
+                               EndStream:     false,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       err = spec.VerifyHeadersFrame(conn, streamID)
+                       if err != nil {
+                               return err
+                       }
+
+                       // send and receive data over tunnel
+                       data := []byte("hello")
+                       err = writeCapsule(conn, streamID, false, data)
+                       if err != nil {
+                               return err
+                       }
+                       resp, err := readCapsule(conn, streamID)
+                       if err != nil {
+                               return err
+                       }
+                       if !bytes.Equal(data, resp) {
+                               return &spec.TestError{
+                                       Expected: []string{"capsule payload: " + string(data)},
+                                       Actual:   "capsule payload:" + string(resp),
+                               }
+                       }
+
+                       // close tunnel
+                       conn.WriteData(streamID, true, []byte(""))
+
+                       // wait for DATA frame with END_STREAM flag set
+                       err = VerifyTunnelClosed(conn)
+                       if err != nil {
+                               return err
+                       }
+
+                       return nil
+               },
+       })
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "CONNECT and CONNECT-UDP on single connection",
+               Requirement: "One stream establish TCP tunnel and second UDP tunnel.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       var udpTunnelStreamID uint32 = 1
+                       var tcpTunnelStreamID uint32 = 3
+
+                       headers := ConnectUdpHeaders(c)
+                       hp := http2.HeadersFrameParam{
+                               StreamID:      udpTunnelStreamID,
+                               EndStream:     false,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       err = spec.VerifyHeadersFrame(conn, udpTunnelStreamID)
+                       if err != nil {
+                               return err
+                       }
+
+                       pathSplit := strings.Split(c.Path, "/")
+                       path := fmt.Sprintf("%s:%s", pathSplit[4], pathSplit[5])
+                       headers = []hpack.HeaderField{
+                               spec.HeaderField(":method", "CONNECT"),
+                               spec.HeaderField(":authority", path),
+                       }
+                       hp = http2.HeadersFrameParam{
+                               StreamID:      tcpTunnelStreamID,
+                               EndStream:     false,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       err = spec.VerifyHeadersFrame(conn, tcpTunnelStreamID)
+                       if err != nil {
+                               return err
+                       }
+
+                       // send and receive data over UDP tunnel
+                       udpData := []byte("hello UDP")
+                       err = writeCapsule(conn, udpTunnelStreamID, false, udpData)
+                       if err != nil {
+                               return err
+                       }
+                       udpResp, err := readCapsule(conn, udpTunnelStreamID)
+                       if err != nil {
+                               return err
+                       }
+                       if !bytes.Equal(udpData, udpResp) {
+                               return &spec.TestError{
+                                       Expected: []string{"capsule payload: " + string(udpData)},
+                                       Actual:   "capsule payload:" + string(udpResp),
+                               }
+                       }
+
+                       // send and receive data over TCP tunnel
+                       tcpData := []byte("hello TCP")
+                       conn.WriteData(tcpTunnelStreamID, false, tcpData)
+                       tcpResp, err := readTcpTunnel(conn, tcpTunnelStreamID)
+                       if !bytes.Equal(tcpData, tcpResp) {
+                               return &spec.TestError{
+                                       Expected: []string{"payload: " + string(tcpData)},
+                                       Actual:   "payload:" + string(tcpResp),
+                               }
+                       }
+
+                       // send and receive data over TCP tunnel
+                       conn.WriteData(tcpTunnelStreamID, false, tcpData)
+                       tcpResp, err = readTcpTunnel(conn, tcpTunnelStreamID)
+                       if !bytes.Equal(tcpData, tcpResp) {
+                               return &spec.TestError{
+                                       Expected: []string{"payload: " + string(tcpData)},
+                                       Actual:   "payload:" + string(tcpResp),
+                               }
+                       }
+
+                       // send and receive data over UDP tunnel
+                       err = writeCapsule(conn, udpTunnelStreamID, false, udpData)
+                       if err != nil {
+                               return err
+                       }
+                       udpResp, err = readCapsule(conn, udpTunnelStreamID)
+                       if err != nil {
+                               return err
+                       }
+                       if !bytes.Equal(udpData, udpResp) {
+                               return &spec.TestError{
+                                       Expected: []string{"capsule payload: " + string(udpData)},
+                                       Actual:   "capsule payload:" + string(udpResp),
+                               }
+                       }
+
+                       return nil
+               },
+       })
+
        return tg
 }
index d1c125e..d43a588 100644 (file)
@@ -13,7 +13,6 @@ import (
        "os"
        "reflect"
        "runtime"
-       "strconv"
        "strings"
        "time"
 
@@ -242,22 +241,6 @@ func handleConn(conn net.Conn) {
        }
 }
 
-func (s *VppProxySuite) StartEchoServer() *net.TCPListener {
-       listener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(s.ServerAddr()), Port: int(s.Ports.Server)})
-       s.AssertNil(err, fmt.Sprint(err))
-       go func() {
-               for {
-                       conn, err := listener.Accept()
-                       if err != nil {
-                               continue
-                       }
-                       go handleConn(conn)
-               }
-       }()
-       s.Log("* started tcp echo server " + s.ServerAddr() + ":" + strconv.Itoa(int(s.Ports.Server)))
-       return listener
-}
-
 var _ = Describe("VppProxySuite", Ordered, ContinueOnFailure, func() {
        var s VppProxySuite
        BeforeAll(func() {
index c424790..912ab64 100644 (file)
@@ -8,7 +8,6 @@ import (
        "os"
        "reflect"
        "runtime"
-       "strconv"
        "strings"
        "time"
 
@@ -118,25 +117,6 @@ func (s *VppUdpProxySuite) ClientAddr() string {
        return s.Interfaces.Client.Ip4AddressString()
 }
 
-func (s *VppUdpProxySuite) StartEchoServer() *net.UDPConn {
-       conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(s.ServerAddr()), Port: s.Ports.Server})
-       s.AssertNil(err, fmt.Sprint(err))
-       go func() {
-               for {
-                       b := make([]byte, 1500)
-                       n, addr, err := conn.ReadFrom(b)
-                       if err != nil {
-                               return
-                       }
-                       if _, err := conn.WriteTo(b[:n], addr); err != nil {
-                               return
-                       }
-               }
-       }()
-       s.Log("* started udp echo server " + s.ServerAddr() + ":" + strconv.Itoa(s.Ports.Server))
-       return conn
-}
-
 func (s *VppUdpProxySuite) ClientSendReceive(toSend []byte, rcvBuffer []byte) (int, error) {
        proxiedConn, err := net.DialUDP("udp",
                &net.UDPAddr{IP: net.ParseIP(s.ClientAddr()), Port: 0},
@@ -270,6 +250,11 @@ var _ = Describe("H2SpecUdpProxySuite", Ordered, ContinueOnFailure, func() {
        }{
                {desc: "extras/3/1"},
                {desc: "extras/3/2"},
+               {desc: "extras/3.1/1"},
+               {desc: "extras/3.1/2"},
+               {desc: "extras/3.1/3"},
+               {desc: "extras/3.1/4"},
+               {desc: "extras/3.1/5"},
        }
 
        for _, test := range testCases {
@@ -278,8 +263,13 @@ var _ = Describe("H2SpecUdpProxySuite", Ordered, ContinueOnFailure, func() {
                It(testName, func(ctx SpecContext) {
                        s.Log(testName + ": BEGIN")
                        vppProxy := s.Containers.VppProxy.VppInstance
-                       remoteServerConn := s.StartEchoServer()
+                       remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
                        defer remoteServerConn.Close()
+                       // this one will open TCP tunnel too
+                       if strings.Contains(test.desc, "extras/3.1/5") {
+                               remoteTcpServerConn := s.StartTcpEchoServer(s.ServerAddr(), s.Ports.Server)
+                               defer remoteTcpServerConn.Close()
+                       }
                        cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri https://%s/%d", s.VppProxyAddr(), s.Ports.Proxy)
                        s.Log(vppProxy.Vppctl(cmd))
                        path := fmt.Sprintf("/.well-known/masque/udp/%s/%d/", s.ServerAddr(), s.Ports.Server)
@@ -287,7 +277,7 @@ var _ = Describe("H2SpecUdpProxySuite", Ordered, ContinueOnFailure, func() {
                                Host:         s.VppProxyAddr(),
                                Port:         s.Ports.Proxy,
                                Path:         path,
-                               Timeout:      time.Second * s.MaxTimeout,
+                               Timeout:      s.MaxTimeout,
                                MaxHeaderLen: 4096,
                                TLS:          true,
                                Insecure:     true,
index e320345..14d0840 100644 (file)
@@ -10,6 +10,7 @@ import (
        "net/http/httputil"
        "os"
        "os/exec"
+       "strconv"
        "strings"
        "time"
 
@@ -327,3 +328,38 @@ func (s *HstSuite) GetCoreProcessName(file string) (string, bool) {
        }
        return "", false
 }
+
+func (s *HstSuite) StartTcpEchoServer(addr string, port int) *net.TCPListener {
+       listener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(addr), Port: port})
+       s.AssertNil(err, fmt.Sprint(err))
+       go func() {
+               for {
+                       conn, err := listener.Accept()
+                       if err != nil {
+                               continue
+                       }
+                       go handleConn(conn)
+               }
+       }()
+       s.Log("* started tcp echo server " + addr + ":" + strconv.Itoa(port))
+       return listener
+}
+
+func (s *HstSuite) StartUdpEchoServer(addr string, port int) *net.UDPConn {
+       conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(addr), Port: port})
+       s.AssertNil(err, fmt.Sprint(err))
+       go func() {
+               for {
+                       b := make([]byte, 1500)
+                       n, addr, err := conn.ReadFrom(b)
+                       if err != nil {
+                               return
+                       }
+                       if _, err := conn.WriteTo(b[:n], addr); err != nil {
+                               return
+                       }
+               }
+       }()
+       s.Log("* started udp echo server " + addr + ":" + strconv.Itoa(port))
+       return conn
+}
index 40f3558..183cca7 100644 (file)
@@ -327,7 +327,7 @@ func vppConnectProxyStressLoad(s *VppProxySuite, proxyPort string) {
 }
 
 func VppConnectProxyStressTest(s *VppProxySuite) {
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartTcpEchoServer(s.ServerAddr(), int(s.Ports.Server))
        defer remoteServerConn.Close()
 
        s.ConfigureVppProxy("http", s.Ports.Proxy)
@@ -341,7 +341,7 @@ func VppConnectProxyStressTest(s *VppProxySuite) {
 func VppConnectProxyStressMWTest(s *VppProxySuite) {
        s.CpusPerVppContainer = 3
        s.SetupTest()
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartTcpEchoServer(s.ServerAddr(), int(s.Ports.Server))
        defer remoteServerConn.Close()
 
        vppProxy := s.Containers.VppProxy.VppInstance
@@ -355,7 +355,7 @@ func VppConnectProxyStressMWTest(s *VppProxySuite) {
 }
 
 func VppProxyUdpTest(s *VppUdpProxySuite) {
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
        defer remoteServerConn.Close()
 
        vppProxy := s.Containers.VppProxy.VppInstance
@@ -372,7 +372,7 @@ func VppProxyUdpTest(s *VppUdpProxySuite) {
 func VppProxyUdpMigrationMWTest(s *VppUdpProxySuite) {
        s.CpusPerVppContainer = 3
        s.SetupTest()
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
        defer remoteServerConn.Close()
 
        vppProxy := s.Containers.VppProxy.VppInstance
@@ -394,7 +394,7 @@ func VppProxyUdpMigrationMWTest(s *VppUdpProxySuite) {
 }
 
 func VppConnectUdpProxyTest(s *VppUdpProxySuite) {
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
        defer remoteServerConn.Close()
 
        vppProxy := s.Containers.VppProxy.VppInstance
@@ -441,7 +441,7 @@ func VppConnectUdpInvalidTargetTest(s *VppUdpProxySuite) {
 }
 
 func VppConnectUdpInvalidCapsuleTest(s *VppUdpProxySuite) {
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
        defer remoteServerConn.Close()
 
        vppProxy := s.Containers.VppProxy.VppInstance
@@ -471,7 +471,7 @@ func VppConnectUdpInvalidCapsuleTest(s *VppUdpProxySuite) {
 }
 
 func VppConnectUdpUnknownCapsuleTest(s *VppUdpProxySuite) {
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
        defer remoteServerConn.Close()
 
        vppProxy := s.Containers.VppProxy.VppInstance
@@ -500,7 +500,7 @@ func VppConnectUdpUnknownCapsuleTest(s *VppUdpProxySuite) {
 }
 
 func VppConnectUdpClientCloseTest(s *VppUdpProxySuite) {
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
        defer remoteServerConn.Close()
 
        vppProxy := s.Containers.VppProxy.VppInstance
@@ -631,7 +631,7 @@ func vppConnectUdpStressLoad(s *VppUdpProxySuite) {
 }
 
 func VppConnectUdpStressTest(s *VppUdpProxySuite) {
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
        defer remoteServerConn.Close()
 
        vppProxy := s.Containers.VppProxy.VppInstance
@@ -647,7 +647,7 @@ func VppConnectUdpStressTest(s *VppUdpProxySuite) {
 func VppConnectUdpStressMWTest(s *VppUdpProxySuite) {
        s.CpusPerVppContainer = 3
        s.SetupTest()
-       remoteServerConn := s.StartEchoServer()
+       remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
        defer remoteServerConn.Close()
 
        vppProxy := s.Containers.VppProxy.VppInstance
index 5888733..8e9e6d9 100644 (file)
@@ -57,6 +57,7 @@ typedef struct http2_req_
   u8 *payload;
   u32 payload_len;
   clib_llist_anchor_t sched_list;
+  http_req_state_t app_reply_next_state;
   void (*dispatch_headers_cb) (struct http2_req_ *req, http_conn_t *hc,
                               u8 *n_emissions, clib_llist_index_t *next_ri);
   void (*dispatch_data_cb) (struct http2_req_ *req, http_conn_t *hc,
@@ -618,6 +619,95 @@ http2_sched_dispatch_tunnel (http2_req_t *req, http_conn_t *hc,
   http_io_ts_after_write (hc, 0);
 }
 
+static void
+http2_sched_dispatch_udp_tunnel (http2_req_t *req, http_conn_t *hc,
+                                u8 *n_emissions)
+{
+  http2_conn_ctx_t *h2c;
+  u32 max_write, max_read, dgram_size, capsule_size, n_written;
+  session_dgram_hdr_t hdr;
+  u8 fh[HTTP2_FRAME_HEADER_SIZE];
+  u8 *buf, *payload;
+
+  *n_emissions += HTTP2_SCHED_WEIGHT_DATA_INLINE;
+
+  max_read = http_io_as_max_read (&req->base);
+  if (max_read == 0)
+    {
+      HTTP_DBG (2, "max_read == 0");
+      transport_connection_reschedule (&req->base.connection);
+      return;
+    }
+  /* read datagram header */
+  http_io_as_read (&req->base, (u8 *) &hdr, sizeof (hdr), 1);
+  ASSERT (hdr.data_length <= HTTP_UDP_PAYLOAD_MAX_LEN);
+  dgram_size = hdr.data_length + SESSION_CONN_HDR_LEN;
+  ASSERT (max_read >= dgram_size);
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+
+  if (PREDICT_FALSE (
+       (hdr.data_length + HTTP_UDP_PROXY_DATAGRAM_CAPSULE_OVERHEAD) >
+       h2c->peer_settings.max_frame_size))
+    {
+      /* drop datagram if not fit into frame */
+      HTTP_DBG (1, "datagram too large, dropped");
+      http_io_as_drain (&req->base, dgram_size);
+      return;
+    }
+
+  if (req->peer_window <
+      (hdr.data_length + HTTP_UDP_PROXY_DATAGRAM_CAPSULE_OVERHEAD))
+    {
+      /* mark that we need window update on stream */
+      HTTP_DBG (1, "not enough space in stream window for capsule");
+      req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+    }
+
+  max_write = http_io_ts_max_write (hc, 0);
+  max_write -= HTTP2_FRAME_HEADER_SIZE;
+  max_write -= HTTP_UDP_PROXY_DATAGRAM_CAPSULE_OVERHEAD;
+  max_write = clib_min (max_write, h2c->peer_window);
+  if (PREDICT_FALSE (max_write < hdr.data_length))
+    {
+      /* we should have at least 16kB free space in underlying transport,
+       * maybe peer is doing small connection window updates */
+      HTTP_DBG (1, "datagram dropped");
+      http_io_as_drain (&req->base, dgram_size);
+      return;
+    }
+
+  buf = http_get_tx_buf (hc);
+  /* create capsule header */
+  payload = http_encap_udp_payload_datagram (buf, hdr.data_length);
+  capsule_size = (payload - buf) + hdr.data_length;
+  /* read payload */
+  http_io_as_read (&req->base, payload, hdr.data_length, 1);
+  http_io_as_drain (&req->base, dgram_size);
+
+  req->peer_window -= capsule_size;
+  h2c->peer_window -= capsule_size;
+
+  http2_frame_write_data_header (capsule_size, req->stream_id, 0, fh);
+
+  svm_fifo_seg_t segs[2] = { { fh, HTTP2_FRAME_HEADER_SIZE },
+                            { buf, capsule_size } };
+  n_written = http_io_ts_write_segs (hc, segs, 2, 0);
+  ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + capsule_size));
+
+  if (max_read - dgram_size)
+    {
+      /* schedule for next round if we have more data */
+      HTTP_DBG (1, "adding to data queue req_index %x",
+               ((http_req_handle_t) req->base.hr_req_handle).req_index);
+      http2_req_schedule_data_tx (hc, req);
+    }
+  else
+    transport_connection_reschedule (&req->base.connection);
+
+  http_io_ts_after_write (hc, 0);
+}
+
 static void
 http2_sched_dispatch_continuation (http2_req_t *req, http_conn_t *hc,
                                   u8 *n_emissions,
@@ -701,13 +791,33 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc,
   response = http_get_tx_buf (hc);
   date = format (0, "%U", format_http_time_now, hc);
 
-  control_data.sc = msg.code;
   control_data.content_len = msg.data.body_len;
   control_data.server_name = hc->app_name;
   control_data.server_name_len = vec_len (hc->app_name);
   control_data.date = date;
   control_data.date_len = vec_len (date);
 
+  if (req->base.is_tunnel)
+    {
+      switch (msg.code)
+       {
+       case HTTP_STATUS_SWITCHING_PROTOCOLS:
+         /* remap status code for extended connect response */
+         msg.code = HTTP_STATUS_OK;
+       case HTTP_STATUS_OK:
+       case HTTP_STATUS_CREATED:
+       case HTTP_STATUS_ACCEPTED:
+         /* tunnel established if 2xx (Successful) response to CONNECT */
+         control_data.content_len = HPACK_ENCODER_SKIP_CONTENT_LEN;
+         break;
+       default:
+         /* tunnel not established */
+         req->base.is_tunnel = 0;
+         break;
+       }
+    }
+  control_data.sc = msg.code;
+
   if (msg.data.headers_len)
     {
       n_deq += msg.data.type == HTTP_MSG_DATA_PTR ? sizeof (uword) :
@@ -728,10 +838,6 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc,
 
   stream_id = req->stream_id;
 
-  /* tunnel established if 2xx (Successful) response to CONNECT */
-  req->base.is_tunnel =
-    (req->base.is_tunnel && http_status_code_str[msg.code][0] == '2');
-
   /* END_STREAM flag need to be set in HEADERS frame */
   if (msg.data.body_len)
     {
@@ -757,7 +863,11 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc,
        }
       else if (req->base.is_tunnel)
        {
-         req->dispatch_data_cb = http2_sched_dispatch_tunnel;
+         if (req->base.upgrade_proto == HTTP_UPGRADE_PROTO_CONNECT_UDP &&
+             hc->udp_tunnel_mode == HTTP_UDP_TUNNEL_DGRAM)
+           req->dispatch_data_cb = http2_sched_dispatch_udp_tunnel;
+         else
+           req->dispatch_data_cb = http2_sched_dispatch_tunnel;
          transport_connection_reschedule (&req->base.connection);
          /* cleanup some stuff we don't need anymore in tunnel mode */
          vec_free (req->base.headers);
@@ -914,6 +1024,8 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
   req->base.headers_offset = control_data.headers - wrk->header_list;
   req->base.headers_len = control_data.headers_len;
 
+  req->app_reply_next_state = HTTP_REQ_STATE_APP_IO_MORE_DATA;
+
   if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_METHOD_PARSED))
     {
       HTTP_DBG (1, ":method pseudo-header missing in request");
@@ -954,6 +1066,7 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
     }
   if (control_data.method == HTTP_REQ_CONNECT)
     {
+      req->app_reply_next_state = HTTP_REQ_STATE_TUNNEL;
       if (control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PROTOCOL_PARSED)
        {
          /* extended CONNECT (RFC8441) */
@@ -985,6 +1098,9 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
            http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp);
            return HTTP_SM_STOP;
          }
+         if (req->base.upgrade_proto == HTTP_UPGRADE_PROTO_CONNECT_UDP &&
+             hc->udp_tunnel_mode == HTTP_UDP_TUNNEL_DGRAM)
+           req->app_reply_next_state = HTTP_REQ_STATE_UDP_TUNNEL;
        }
       else
        {
@@ -1154,9 +1270,84 @@ http2_req_state_tunnel_rx (http_conn_t *hc, http2_req_t *req,
   http_io_as_write (&req->base, req->payload, req->payload_len);
   http_app_worker_rx_notify (&req->base);
 
+  switch (req->stream_state)
+    {
+    case HTTP2_STREAM_STATE_HALF_CLOSED:
+      HTTP_DBG (1, "client want to close tunnel");
+      session_transport_closing_notify (&req->base.connection);
+      break;
+    case HTTP2_STREAM_STATE_CLOSED:
+      HTTP_DBG (1, "client closed tunnel");
+      http2_stream_close (req, hc);
+      break;
+    default:
+      break;
+    }
+
   return HTTP_SM_STOP;
 }
 
+static http_sm_result_t
+http2_req_state_udp_tunnel_rx (http_conn_t *hc, http2_req_t *req,
+                              transport_send_params_t *sp,
+                              http2_error_t *error)
+{
+  int rv;
+  u8 payload_offset;
+  u64 payload_len;
+  session_dgram_hdr_t hdr;
+
+  HTTP_DBG (1, "udp tunnel received data from client");
+
+  rv = http_decap_udp_payload_datagram (req->payload, req->payload_len,
+                                       &payload_offset, &payload_len);
+  HTTP_DBG (1, "rv=%d, payload_offset=%u, payload_len=%llu", rv,
+           payload_offset, payload_len);
+  if (PREDICT_FALSE (rv != 0))
+    {
+      if (rv < 0)
+       {
+         /* capsule datagram is invalid (stream need to be aborted) */
+         http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+         return HTTP_SM_STOP;
+       }
+      else
+       {
+         /* unknown capsule should be skipped */
+         return HTTP_SM_STOP;
+       }
+    }
+  /* check if we have the full capsule */
+  if (PREDICT_FALSE (req->payload_len != (payload_offset + payload_len)))
+    {
+      HTTP_DBG (1, "capsule not complete");
+      http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
+  if (http_io_as_max_write (&req->base) < (sizeof (hdr) + payload_len))
+    {
+      clib_warning ("app's rx fifo full");
+      http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
+
+  hdr.data_length = payload_len;
+  hdr.data_offset = 0;
+
+  /* send datagram header and payload */
+  svm_fifo_seg_t segs[2] = { { (u8 *) &hdr, sizeof (hdr) },
+                            { req->payload + payload_offset, payload_len } };
+  http_io_as_write_segs (&req->base, segs, 2);
+  http_app_worker_rx_notify (&req->base);
+
+  if (req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED)
+    {
+      HTTP_DBG (1, "client want to close tunnel");
+      session_transport_closing_notify (&req->base.connection);
+    }
+
+  return HTTP_SM_STOP;
+}
 /*************************************/
 /* request state machine handlers TX */
 /*************************************/
@@ -1180,9 +1371,7 @@ http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req,
   clib_llist_add_tail (wrk->req_pool, sched_list, req, he);
   http2_conn_schedule (h2c, hc->c_thread_index);
 
-  http_req_state_change (&req->base, req->base.is_tunnel ?
-                                      HTTP_REQ_STATE_TUNNEL :
-                                      HTTP_REQ_STATE_APP_IO_MORE_DATA);
+  http_req_state_change (&req->base, req->app_reply_next_state);
   http_req_deschedule (&req->base, sp);
 
   return HTTP_SM_STOP;
@@ -1250,8 +1439,9 @@ static http2_sm_handler tx_state_funcs[HTTP_REQ_N_STATES] = {
   0, /* wait transport method */
   http2_req_state_wait_app_reply,
   http2_req_state_app_io_more_data,
+  /* both can be same, we use different scheduler data dispatch cb */
+  http2_req_state_tunnel_tx,
   http2_req_state_tunnel_tx,
-  0, /* udp tunnel */
 };
 
 static http2_sm_handler rx_state_funcs[HTTP_REQ_N_STATES] = {
@@ -1263,7 +1453,7 @@ static http2_sm_handler rx_state_funcs[HTTP_REQ_N_STATES] = {
   0, /* wait app reply */
   0, /* app io more data */
   http2_req_state_tunnel_rx,
-  0, /* udp tunnel */
+  http2_req_state_udp_tunnel_rx,
 };
 
 static_always_inline int
@@ -1476,7 +1666,7 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
 
   /* bogus state */
   if (hc->flags & HTTP_CONN_F_IS_SERVER &&
-      req->stream_state != HTTP2_STREAM_STATE_OPEN)
+      req->stream_state != HTTP2_STREAM_STATE_OPEN && !req->base.is_tunnel)
     {
       HTTP_DBG (1, "error: stream already half-closed");
       http2_stream_error (hc, req, HTTP2_ERROR_STREAM_CLOSED, 0);
@@ -1497,15 +1687,32 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
 
   if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM)
     {
-      req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+      HTTP_DBG (1, "END_STREAM flag set");
       if (req->base.is_tunnel)
        {
-         session_transport_closing_notify (&req->base.connection);
-         HTTP_DBG (1, "client closed tunnel");
+         /* client can initiate or confirm tunnel close */
+         req->stream_state =
+           req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED ?
+             HTTP2_STREAM_STATE_CLOSED :
+             HTTP2_STREAM_STATE_HALF_CLOSED;
          /* final DATA frame could be empty */
          if (fh->length == 0)
-           return HTTP2_ERROR_NO_ERROR;
+           {
+             if (req->stream_state == HTTP2_STREAM_STATE_CLOSED)
+               {
+                 HTTP_DBG (1, "client closed tunnel");
+                 http2_stream_close (req, hc);
+               }
+             else
+               {
+                 HTTP_DBG (1, "client want to close tunnel");
+                 session_transport_closing_notify (&req->base.connection);
+               }
+             return HTTP2_ERROR_NO_ERROR;
+           }
        }
+      else
+       req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
     }
 
   rx_buf = http_get_rx_buf (hc);
@@ -1995,7 +2202,7 @@ http2_app_close_callback (http_conn_t *hc, u32 req_index,
        case HTTP2_STREAM_STATE_HALF_CLOSED:
          HTTP_DBG (1, "proxy confirmed close");
          http2_tunnel_send_close (hc, req);
-         session_transport_closed_notify (&req->base.connection);
+         http2_stream_close (req, hc);
          break;
        default:
          session_transport_closed_notify (&req->base.connection);