From: Matus Fabian Date: Fri, 13 Jun 2025 15:39:23 +0000 (-0400) Subject: http: Tunneling UDP over HTTP/2 X-Git-Tag: v26.02-rc0~183 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F76%2F43176%2F8;p=vpp.git http: Tunneling UDP over HTTP/2 Type: feature Change-Id: I94f6af893872ae28669b7b9c30d61e58c0b65422 Signed-off-by: Matus Fabian --- diff --git a/extras/hs-test/h2spec_extras/h2spec_extras.go b/extras/hs-test/h2spec_extras/h2spec_extras.go index b1d86a0b3ad..a48cdac137a 100644 --- a/extras/hs-test/h2spec_extras/h2spec_extras.go +++ b/extras/hs-test/h2spec_extras/h2spec_extras.go @@ -1,10 +1,15 @@ package h2spec_extras import ( + "bytes" + "errors" "fmt" "slices" "strconv" + "strings" + "github.com/quic-go/quic-go/http3" + "github.com/quic-go/quic-go/quicvarint" "github.com/summerwind/h2spec/config" "github.com/summerwind/h2spec/spec" "golang.org/x/net/http2" @@ -208,6 +213,24 @@ func ConnectHeaders(c *config.Config) []hpack.HeaderField { } } +func readTcpTunnel(conn *spec.Conn, streamID uint32) ([]byte, error) { + actual, passed := conn.WaitEventByType(spec.EventDataFrame) + switch event := actual.(type) { + case spec.DataFrameEvent: + passed = event.Header().StreamID == streamID + default: + passed = false + } + if !passed { + return nil, &spec.TestError{ + Expected: []string{spec.EventDataFrame.String()}, + Actual: actual.String(), + } + } + df, _ := actual.(spec.DataFrameEvent) + return df.Data(), nil +} + func ConnectMethod() *spec.TestGroup { tg := NewTestGroup("2", "CONNECT method") @@ -480,5 +503,436 @@ func ExtendedConnectMethod() *spec.TestGroup { return spec.VerifyStreamError(conn, http2.ErrCodeProtocol) }, }) + + tg.AddTestGroup(ConnectUdp()) + + return tg +} + +func ConnectUdpHeaders(c *config.Config) []hpack.HeaderField { + + headers := spec.CommonHeaders(c) + headers[0].Value = "CONNECT" + headers = append(headers, spec.HeaderField(":protocol", "connect-udp")) + headers = append(headers, spec.HeaderField("capsule-protocol", "?1")) + return headers +} + +func writeCapsule(conn *spec.Conn, streamID uint32, endStream bool, payload []byte) error { + b := make([]byte, 0) + b = quicvarint.Append(b, 0) + b = append(b, payload...) + var capsule bytes.Buffer + err := http3.WriteCapsule(&capsule, 0, b) + if err != nil { + return err + } + + return conn.WriteData(streamID, endStream, capsule.Bytes()) +} + +func readCapsule(conn *spec.Conn, streamID uint32) ([]byte, error) { + actual, passed := conn.WaitEventByType(spec.EventDataFrame) + switch event := actual.(type) { + case spec.DataFrameEvent: + passed = event.Header().StreamID == streamID + default: + passed = false + } + if !passed { + return nil, &spec.TestError{ + Expected: []string{spec.EventDataFrame.String()}, + Actual: actual.String(), + } + } + df, _ := actual.(spec.DataFrameEvent) + r := bytes.NewReader(df.Data()) + capsuleType, payloadReader, err := http3.ParseCapsule(r) + if err != nil { + return nil, err + } + if capsuleType != 0 { + return nil, errors.New("capsule type should be 0") + } + b := make([]byte, 1024) + n, err := payloadReader.Read(b) + if err != nil { + return nil, err + } + if n < 3 { + return nil, errors.New("response payload too short") + } + if b[0] != 0 { + return nil, errors.New("context id should be 0") + } + return b[1:n], nil +} + +func ConnectUdp() *spec.TestGroup { + tg := NewTestGroup("3.1", "Proxying UDP in HTTP") + + tg.AddTestCase(&spec.TestCase{ + Desc: "Tunneling UDP over HTTP/2", + Requirement: "To initiate a UDP tunnel associated with a single HTTP stream, a client issues a request containing the \"connect-udp\" upgrade token. The target of the tunnel is indicated by the client to the UDP proxy via the \"target_host\" and \"target_port\" variables of the URI Template", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + err := conn.Handshake() + if err != nil { + return err + } + + headers := ConnectUdpHeaders(c) + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + // verify response headers + actual, passed := conn.WaitEventByType(spec.EventHeadersFrame) + switch event := actual.(type) { + case spec.HeadersFrameEvent: + passed = event.Header().StreamID == streamID + default: + passed = false + } + if !passed { + expected := []string{ + fmt.Sprintf("DATA Frame (length:1, flags:0x00, stream_id:%d)", streamID), + } + + return &spec.TestError{ + Expected: expected, + Actual: actual.String(), + } + } + hf, _ := actual.(spec.HeadersFrameEvent) + respHeaders := make([]hpack.HeaderField, 0, 256) + decoder := hpack.NewDecoder(4096, func(f hpack.HeaderField) { respHeaders = append(respHeaders, f) }) + _, err = decoder.Write(hf.HeaderBlockFragment()) + if err != nil { + return err + } + if !slices.Contains(respHeaders, spec.HeaderField("capsule-protocol", "?1")) { + hs := "" + for _, h := range respHeaders { + hs += h.String() + "\n" + } + return &spec.TestError{ + Expected: []string{"\"capsule-protocol: ?1\" header received"}, + Actual: hs, + } + } + if !slices.Contains(respHeaders, spec.HeaderField(":status", "200")) { + hs := "" + for _, h := range respHeaders { + hs += h.String() + "\n" + } + return &spec.TestError{ + Expected: []string{"\":status: 200\" header received"}, + Actual: hs, + } + } + for _, h := range respHeaders { + if h.Name == "content-length" { + return &spec.TestError{ + Expected: []string{"\"content-length\" header must not be used"}, + Actual: h.String(), + } + } + } + + // send and receive data over tunnel + data := []byte("hello") + err = writeCapsule(conn, streamID, false, data) + if err != nil { + return err + } + resp, err := readCapsule(conn, streamID) + if err != nil { + return err + } + if !bytes.Equal(data, resp) { + return &spec.TestError{ + Expected: []string{"capsule payload: " + string(data)}, + Actual: "capsule payload:" + string(resp), + } + } + // try again + err = writeCapsule(conn, streamID, false, data) + if err != nil { + return err + } + resp, err = readCapsule(conn, streamID) + if err != nil { + return err + } + if !bytes.Equal(data, resp) { + return &spec.TestError{ + Expected: []string{"capsule payload: " + string(data)}, + Actual: "capsule payload:" + string(resp), + } + } + return nil + }, + }) + + tg.AddTestCase(&spec.TestCase{ + Desc: "Multiple tunnels", + Requirement: "In HTTP/2, the data stream of a given HTTP request consists of all bytes sent in DATA frames with the corresponding stream ID.", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + err := conn.Handshake() + if err != nil { + return err + } + + maxStreams, ok := conn.Settings[http2.SettingMaxConcurrentStreams] + if !ok { + return spec.ErrSkipped + } + + for i := 0; i < int(maxStreams); i++ { + headers := ConnectUdpHeaders(c) + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + err = spec.VerifyHeadersFrame(conn, streamID) + if err != nil { + return err + } + + streamID += 2 + } + + streamID = 1 + data := []byte("hello") + for i := 0; i < int(maxStreams); i++ { + err = writeCapsule(conn, streamID, false, data) + if err != nil { + return err + } + } + + for i := 0; i < int(maxStreams); i++ { + resp, err := readCapsule(conn, streamID) + if err != nil { + return err + } + if !bytes.Equal(data, resp) { + return &spec.TestError{ + Expected: []string{"capsule payload: " + string(data)}, + Actual: "capsule payload:" + string(resp), + } + } + } + + return nil + }, + }) + + tg.AddTestCase(&spec.TestCase{ + Desc: "Tunnel closed by client (with attached data)", + Requirement: "A proxy that receives a DATA frame with the END_STREAM flag set sends the attached data and close UDP connection.", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + err := conn.Handshake() + if err != nil { + return err + } + + headers := ConnectUdpHeaders(c) + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + err = spec.VerifyHeadersFrame(conn, streamID) + if err != nil { + return err + } + + // close tunnel + data := []byte("hello") + err = writeCapsule(conn, streamID, true, data) + if err != nil { + return err + } + + // wait for DATA frame with END_STREAM flag set + err = VerifyTunnelClosed(conn) + if err != nil { + return err + } + + return nil + }, + }) + + tg.AddTestCase(&spec.TestCase{ + Desc: "Tunnel closed by client (empty DATA frame)", + Requirement: "The final DATA frame could be empty.", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + err := conn.Handshake() + if err != nil { + return err + } + + headers := ConnectUdpHeaders(c) + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + err = spec.VerifyHeadersFrame(conn, streamID) + if err != nil { + return err + } + + // send and receive data over tunnel + data := []byte("hello") + err = writeCapsule(conn, streamID, false, data) + if err != nil { + return err + } + resp, err := readCapsule(conn, streamID) + if err != nil { + return err + } + if !bytes.Equal(data, resp) { + return &spec.TestError{ + Expected: []string{"capsule payload: " + string(data)}, + Actual: "capsule payload:" + string(resp), + } + } + + // close tunnel + conn.WriteData(streamID, true, []byte("")) + + // wait for DATA frame with END_STREAM flag set + err = VerifyTunnelClosed(conn) + if err != nil { + return err + } + + return nil + }, + }) + + tg.AddTestCase(&spec.TestCase{ + Desc: "CONNECT and CONNECT-UDP on single connection", + Requirement: "One stream establish TCP tunnel and second UDP tunnel.", + Run: func(c *config.Config, conn *spec.Conn) error { + err := conn.Handshake() + if err != nil { + return err + } + + var udpTunnelStreamID uint32 = 1 + var tcpTunnelStreamID uint32 = 3 + + headers := ConnectUdpHeaders(c) + hp := http2.HeadersFrameParam{ + StreamID: udpTunnelStreamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + err = spec.VerifyHeadersFrame(conn, udpTunnelStreamID) + if err != nil { + return err + } + + pathSplit := strings.Split(c.Path, "/") + path := fmt.Sprintf("%s:%s", pathSplit[4], pathSplit[5]) + headers = []hpack.HeaderField{ + spec.HeaderField(":method", "CONNECT"), + spec.HeaderField(":authority", path), + } + hp = http2.HeadersFrameParam{ + StreamID: tcpTunnelStreamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + err = spec.VerifyHeadersFrame(conn, tcpTunnelStreamID) + if err != nil { + return err + } + + // send and receive data over UDP tunnel + udpData := []byte("hello UDP") + err = writeCapsule(conn, udpTunnelStreamID, false, udpData) + if err != nil { + return err + } + udpResp, err := readCapsule(conn, udpTunnelStreamID) + if err != nil { + return err + } + if !bytes.Equal(udpData, udpResp) { + return &spec.TestError{ + Expected: []string{"capsule payload: " + string(udpData)}, + Actual: "capsule payload:" + string(udpResp), + } + } + + // send and receive data over TCP tunnel + tcpData := []byte("hello TCP") + conn.WriteData(tcpTunnelStreamID, false, tcpData) + tcpResp, err := readTcpTunnel(conn, tcpTunnelStreamID) + if !bytes.Equal(tcpData, tcpResp) { + return &spec.TestError{ + Expected: []string{"payload: " + string(tcpData)}, + Actual: "payload:" + string(tcpResp), + } + } + + // send and receive data over TCP tunnel + conn.WriteData(tcpTunnelStreamID, false, tcpData) + tcpResp, err = readTcpTunnel(conn, tcpTunnelStreamID) + if !bytes.Equal(tcpData, tcpResp) { + return &spec.TestError{ + Expected: []string{"payload: " + string(tcpData)}, + Actual: "payload:" + string(tcpResp), + } + } + + // send and receive data over UDP tunnel + err = writeCapsule(conn, udpTunnelStreamID, false, udpData) + if err != nil { + return err + } + udpResp, err = readCapsule(conn, udpTunnelStreamID) + if err != nil { + return err + } + if !bytes.Equal(udpData, udpResp) { + return &spec.TestError{ + Expected: []string{"capsule payload: " + string(udpData)}, + Actual: "capsule payload:" + string(udpResp), + } + } + + return nil + }, + }) + return tg } diff --git a/extras/hs-test/infra/suite_vpp_proxy.go b/extras/hs-test/infra/suite_vpp_proxy.go index d1c125e3a5a..d43a588fe45 100644 --- a/extras/hs-test/infra/suite_vpp_proxy.go +++ b/extras/hs-test/infra/suite_vpp_proxy.go @@ -13,7 +13,6 @@ import ( "os" "reflect" "runtime" - "strconv" "strings" "time" @@ -242,22 +241,6 @@ func handleConn(conn net.Conn) { } } -func (s *VppProxySuite) StartEchoServer() *net.TCPListener { - listener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(s.ServerAddr()), Port: int(s.Ports.Server)}) - s.AssertNil(err, fmt.Sprint(err)) - go func() { - for { - conn, err := listener.Accept() - if err != nil { - continue - } - go handleConn(conn) - } - }() - s.Log("* started tcp echo server " + s.ServerAddr() + ":" + strconv.Itoa(int(s.Ports.Server))) - return listener -} - var _ = Describe("VppProxySuite", Ordered, ContinueOnFailure, func() { var s VppProxySuite BeforeAll(func() { diff --git a/extras/hs-test/infra/suite_vpp_udp_proxy.go b/extras/hs-test/infra/suite_vpp_udp_proxy.go index c424790db70..912ab64eaa3 100644 --- a/extras/hs-test/infra/suite_vpp_udp_proxy.go +++ b/extras/hs-test/infra/suite_vpp_udp_proxy.go @@ -8,7 +8,6 @@ import ( "os" "reflect" "runtime" - "strconv" "strings" "time" @@ -118,25 +117,6 @@ func (s *VppUdpProxySuite) ClientAddr() string { return s.Interfaces.Client.Ip4AddressString() } -func (s *VppUdpProxySuite) StartEchoServer() *net.UDPConn { - conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(s.ServerAddr()), Port: s.Ports.Server}) - s.AssertNil(err, fmt.Sprint(err)) - go func() { - for { - b := make([]byte, 1500) - n, addr, err := conn.ReadFrom(b) - if err != nil { - return - } - if _, err := conn.WriteTo(b[:n], addr); err != nil { - return - } - } - }() - s.Log("* started udp echo server " + s.ServerAddr() + ":" + strconv.Itoa(s.Ports.Server)) - return conn -} - func (s *VppUdpProxySuite) ClientSendReceive(toSend []byte, rcvBuffer []byte) (int, error) { proxiedConn, err := net.DialUDP("udp", &net.UDPAddr{IP: net.ParseIP(s.ClientAddr()), Port: 0}, @@ -270,6 +250,11 @@ var _ = Describe("H2SpecUdpProxySuite", Ordered, ContinueOnFailure, func() { }{ {desc: "extras/3/1"}, {desc: "extras/3/2"}, + {desc: "extras/3.1/1"}, + {desc: "extras/3.1/2"}, + {desc: "extras/3.1/3"}, + {desc: "extras/3.1/4"}, + {desc: "extras/3.1/5"}, } for _, test := range testCases { @@ -278,8 +263,13 @@ var _ = Describe("H2SpecUdpProxySuite", Ordered, ContinueOnFailure, func() { It(testName, func(ctx SpecContext) { s.Log(testName + ": BEGIN") vppProxy := s.Containers.VppProxy.VppInstance - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server) defer remoteServerConn.Close() + // this one will open TCP tunnel too + if strings.Contains(test.desc, "extras/3.1/5") { + remoteTcpServerConn := s.StartTcpEchoServer(s.ServerAddr(), s.Ports.Server) + defer remoteTcpServerConn.Close() + } cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri https://%s/%d", s.VppProxyAddr(), s.Ports.Proxy) s.Log(vppProxy.Vppctl(cmd)) path := fmt.Sprintf("/.well-known/masque/udp/%s/%d/", s.ServerAddr(), s.Ports.Server) @@ -287,7 +277,7 @@ var _ = Describe("H2SpecUdpProxySuite", Ordered, ContinueOnFailure, func() { Host: s.VppProxyAddr(), Port: s.Ports.Proxy, Path: path, - Timeout: time.Second * s.MaxTimeout, + Timeout: s.MaxTimeout, MaxHeaderLen: 4096, TLS: true, Insecure: true, diff --git a/extras/hs-test/infra/utils.go b/extras/hs-test/infra/utils.go index e320345d5ab..14d084014bc 100644 --- a/extras/hs-test/infra/utils.go +++ b/extras/hs-test/infra/utils.go @@ -10,6 +10,7 @@ import ( "net/http/httputil" "os" "os/exec" + "strconv" "strings" "time" @@ -327,3 +328,38 @@ func (s *HstSuite) GetCoreProcessName(file string) (string, bool) { } return "", false } + +func (s *HstSuite) StartTcpEchoServer(addr string, port int) *net.TCPListener { + listener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(addr), Port: port}) + s.AssertNil(err, fmt.Sprint(err)) + go func() { + for { + conn, err := listener.Accept() + if err != nil { + continue + } + go handleConn(conn) + } + }() + s.Log("* started tcp echo server " + addr + ":" + strconv.Itoa(port)) + return listener +} + +func (s *HstSuite) StartUdpEchoServer(addr string, port int) *net.UDPConn { + conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(addr), Port: port}) + s.AssertNil(err, fmt.Sprint(err)) + go func() { + for { + b := make([]byte, 1500) + n, addr, err := conn.ReadFrom(b) + if err != nil { + return + } + if _, err := conn.WriteTo(b[:n], addr); err != nil { + return + } + } + }() + s.Log("* started udp echo server " + addr + ":" + strconv.Itoa(port)) + return conn +} diff --git a/extras/hs-test/proxy_test.go b/extras/hs-test/proxy_test.go index 40f3558b4ca..183cca72523 100644 --- a/extras/hs-test/proxy_test.go +++ b/extras/hs-test/proxy_test.go @@ -327,7 +327,7 @@ func vppConnectProxyStressLoad(s *VppProxySuite, proxyPort string) { } func VppConnectProxyStressTest(s *VppProxySuite) { - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartTcpEchoServer(s.ServerAddr(), int(s.Ports.Server)) defer remoteServerConn.Close() s.ConfigureVppProxy("http", s.Ports.Proxy) @@ -341,7 +341,7 @@ func VppConnectProxyStressTest(s *VppProxySuite) { func VppConnectProxyStressMWTest(s *VppProxySuite) { s.CpusPerVppContainer = 3 s.SetupTest() - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartTcpEchoServer(s.ServerAddr(), int(s.Ports.Server)) defer remoteServerConn.Close() vppProxy := s.Containers.VppProxy.VppInstance @@ -355,7 +355,7 @@ func VppConnectProxyStressMWTest(s *VppProxySuite) { } func VppProxyUdpTest(s *VppUdpProxySuite) { - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server) defer remoteServerConn.Close() vppProxy := s.Containers.VppProxy.VppInstance @@ -372,7 +372,7 @@ func VppProxyUdpTest(s *VppUdpProxySuite) { func VppProxyUdpMigrationMWTest(s *VppUdpProxySuite) { s.CpusPerVppContainer = 3 s.SetupTest() - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server) defer remoteServerConn.Close() vppProxy := s.Containers.VppProxy.VppInstance @@ -394,7 +394,7 @@ func VppProxyUdpMigrationMWTest(s *VppUdpProxySuite) { } func VppConnectUdpProxyTest(s *VppUdpProxySuite) { - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server) defer remoteServerConn.Close() vppProxy := s.Containers.VppProxy.VppInstance @@ -441,7 +441,7 @@ func VppConnectUdpInvalidTargetTest(s *VppUdpProxySuite) { } func VppConnectUdpInvalidCapsuleTest(s *VppUdpProxySuite) { - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server) defer remoteServerConn.Close() vppProxy := s.Containers.VppProxy.VppInstance @@ -471,7 +471,7 @@ func VppConnectUdpInvalidCapsuleTest(s *VppUdpProxySuite) { } func VppConnectUdpUnknownCapsuleTest(s *VppUdpProxySuite) { - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server) defer remoteServerConn.Close() vppProxy := s.Containers.VppProxy.VppInstance @@ -500,7 +500,7 @@ func VppConnectUdpUnknownCapsuleTest(s *VppUdpProxySuite) { } func VppConnectUdpClientCloseTest(s *VppUdpProxySuite) { - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server) defer remoteServerConn.Close() vppProxy := s.Containers.VppProxy.VppInstance @@ -631,7 +631,7 @@ func vppConnectUdpStressLoad(s *VppUdpProxySuite) { } func VppConnectUdpStressTest(s *VppUdpProxySuite) { - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server) defer remoteServerConn.Close() vppProxy := s.Containers.VppProxy.VppInstance @@ -647,7 +647,7 @@ func VppConnectUdpStressTest(s *VppUdpProxySuite) { func VppConnectUdpStressMWTest(s *VppUdpProxySuite) { s.CpusPerVppContainer = 3 s.SetupTest() - remoteServerConn := s.StartEchoServer() + remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server) defer remoteServerConn.Close() vppProxy := s.Containers.VppProxy.VppInstance diff --git a/src/plugins/http/http2/http2.c b/src/plugins/http/http2/http2.c index 58887334909..8e9e6d923a8 100644 --- a/src/plugins/http/http2/http2.c +++ b/src/plugins/http/http2/http2.c @@ -57,6 +57,7 @@ typedef struct http2_req_ u8 *payload; u32 payload_len; clib_llist_anchor_t sched_list; + http_req_state_t app_reply_next_state; void (*dispatch_headers_cb) (struct http2_req_ *req, http_conn_t *hc, u8 *n_emissions, clib_llist_index_t *next_ri); void (*dispatch_data_cb) (struct http2_req_ *req, http_conn_t *hc, @@ -618,6 +619,95 @@ http2_sched_dispatch_tunnel (http2_req_t *req, http_conn_t *hc, http_io_ts_after_write (hc, 0); } +static void +http2_sched_dispatch_udp_tunnel (http2_req_t *req, http_conn_t *hc, + u8 *n_emissions) +{ + http2_conn_ctx_t *h2c; + u32 max_write, max_read, dgram_size, capsule_size, n_written; + session_dgram_hdr_t hdr; + u8 fh[HTTP2_FRAME_HEADER_SIZE]; + u8 *buf, *payload; + + *n_emissions += HTTP2_SCHED_WEIGHT_DATA_INLINE; + + max_read = http_io_as_max_read (&req->base); + if (max_read == 0) + { + HTTP_DBG (2, "max_read == 0"); + transport_connection_reschedule (&req->base.connection); + return; + } + /* read datagram header */ + http_io_as_read (&req->base, (u8 *) &hdr, sizeof (hdr), 1); + ASSERT (hdr.data_length <= HTTP_UDP_PAYLOAD_MAX_LEN); + dgram_size = hdr.data_length + SESSION_CONN_HDR_LEN; + ASSERT (max_read >= dgram_size); + + h2c = http2_conn_ctx_get_w_thread (hc); + + if (PREDICT_FALSE ( + (hdr.data_length + HTTP_UDP_PROXY_DATAGRAM_CAPSULE_OVERHEAD) > + h2c->peer_settings.max_frame_size)) + { + /* drop datagram if not fit into frame */ + HTTP_DBG (1, "datagram too large, dropped"); + http_io_as_drain (&req->base, dgram_size); + return; + } + + if (req->peer_window < + (hdr.data_length + HTTP_UDP_PROXY_DATAGRAM_CAPSULE_OVERHEAD)) + { + /* mark that we need window update on stream */ + HTTP_DBG (1, "not enough space in stream window for capsule"); + req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE; + } + + max_write = http_io_ts_max_write (hc, 0); + max_write -= HTTP2_FRAME_HEADER_SIZE; + max_write -= HTTP_UDP_PROXY_DATAGRAM_CAPSULE_OVERHEAD; + max_write = clib_min (max_write, h2c->peer_window); + if (PREDICT_FALSE (max_write < hdr.data_length)) + { + /* we should have at least 16kB free space in underlying transport, + * maybe peer is doing small connection window updates */ + HTTP_DBG (1, "datagram dropped"); + http_io_as_drain (&req->base, dgram_size); + return; + } + + buf = http_get_tx_buf (hc); + /* create capsule header */ + payload = http_encap_udp_payload_datagram (buf, hdr.data_length); + capsule_size = (payload - buf) + hdr.data_length; + /* read payload */ + http_io_as_read (&req->base, payload, hdr.data_length, 1); + http_io_as_drain (&req->base, dgram_size); + + req->peer_window -= capsule_size; + h2c->peer_window -= capsule_size; + + http2_frame_write_data_header (capsule_size, req->stream_id, 0, fh); + + svm_fifo_seg_t segs[2] = { { fh, HTTP2_FRAME_HEADER_SIZE }, + { buf, capsule_size } }; + n_written = http_io_ts_write_segs (hc, segs, 2, 0); + ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + capsule_size)); + + if (max_read - dgram_size) + { + /* schedule for next round if we have more data */ + HTTP_DBG (1, "adding to data queue req_index %x", + ((http_req_handle_t) req->base.hr_req_handle).req_index); + http2_req_schedule_data_tx (hc, req); + } + else + transport_connection_reschedule (&req->base.connection); + + http_io_ts_after_write (hc, 0); +} + static void http2_sched_dispatch_continuation (http2_req_t *req, http_conn_t *hc, u8 *n_emissions, @@ -701,13 +791,33 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc, response = http_get_tx_buf (hc); date = format (0, "%U", format_http_time_now, hc); - control_data.sc = msg.code; control_data.content_len = msg.data.body_len; control_data.server_name = hc->app_name; control_data.server_name_len = vec_len (hc->app_name); control_data.date = date; control_data.date_len = vec_len (date); + if (req->base.is_tunnel) + { + switch (msg.code) + { + case HTTP_STATUS_SWITCHING_PROTOCOLS: + /* remap status code for extended connect response */ + msg.code = HTTP_STATUS_OK; + case HTTP_STATUS_OK: + case HTTP_STATUS_CREATED: + case HTTP_STATUS_ACCEPTED: + /* tunnel established if 2xx (Successful) response to CONNECT */ + control_data.content_len = HPACK_ENCODER_SKIP_CONTENT_LEN; + break; + default: + /* tunnel not established */ + req->base.is_tunnel = 0; + break; + } + } + control_data.sc = msg.code; + if (msg.data.headers_len) { n_deq += msg.data.type == HTTP_MSG_DATA_PTR ? sizeof (uword) : @@ -728,10 +838,6 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc, stream_id = req->stream_id; - /* tunnel established if 2xx (Successful) response to CONNECT */ - req->base.is_tunnel = - (req->base.is_tunnel && http_status_code_str[msg.code][0] == '2'); - /* END_STREAM flag need to be set in HEADERS frame */ if (msg.data.body_len) { @@ -757,7 +863,11 @@ http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc, } else if (req->base.is_tunnel) { - req->dispatch_data_cb = http2_sched_dispatch_tunnel; + if (req->base.upgrade_proto == HTTP_UPGRADE_PROTO_CONNECT_UDP && + hc->udp_tunnel_mode == HTTP_UDP_TUNNEL_DGRAM) + req->dispatch_data_cb = http2_sched_dispatch_udp_tunnel; + else + req->dispatch_data_cb = http2_sched_dispatch_tunnel; transport_connection_reschedule (&req->base.connection); /* cleanup some stuff we don't need anymore in tunnel mode */ vec_free (req->base.headers); @@ -914,6 +1024,8 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req, req->base.headers_offset = control_data.headers - wrk->header_list; req->base.headers_len = control_data.headers_len; + req->app_reply_next_state = HTTP_REQ_STATE_APP_IO_MORE_DATA; + if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_METHOD_PARSED)) { HTTP_DBG (1, ":method pseudo-header missing in request"); @@ -954,6 +1066,7 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req, } if (control_data.method == HTTP_REQ_CONNECT) { + req->app_reply_next_state = HTTP_REQ_STATE_TUNNEL; if (control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PROTOCOL_PARSED) { /* extended CONNECT (RFC8441) */ @@ -985,6 +1098,9 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req, http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp); return HTTP_SM_STOP; } + if (req->base.upgrade_proto == HTTP_UPGRADE_PROTO_CONNECT_UDP && + hc->udp_tunnel_mode == HTTP_UDP_TUNNEL_DGRAM) + req->app_reply_next_state = HTTP_REQ_STATE_UDP_TUNNEL; } else { @@ -1154,9 +1270,84 @@ http2_req_state_tunnel_rx (http_conn_t *hc, http2_req_t *req, http_io_as_write (&req->base, req->payload, req->payload_len); http_app_worker_rx_notify (&req->base); + switch (req->stream_state) + { + case HTTP2_STREAM_STATE_HALF_CLOSED: + HTTP_DBG (1, "client want to close tunnel"); + session_transport_closing_notify (&req->base.connection); + break; + case HTTP2_STREAM_STATE_CLOSED: + HTTP_DBG (1, "client closed tunnel"); + http2_stream_close (req, hc); + break; + default: + break; + } + return HTTP_SM_STOP; } +static http_sm_result_t +http2_req_state_udp_tunnel_rx (http_conn_t *hc, http2_req_t *req, + transport_send_params_t *sp, + http2_error_t *error) +{ + int rv; + u8 payload_offset; + u64 payload_len; + session_dgram_hdr_t hdr; + + HTTP_DBG (1, "udp tunnel received data from client"); + + rv = http_decap_udp_payload_datagram (req->payload, req->payload_len, + &payload_offset, &payload_len); + HTTP_DBG (1, "rv=%d, payload_offset=%u, payload_len=%llu", rv, + payload_offset, payload_len); + if (PREDICT_FALSE (rv != 0)) + { + if (rv < 0) + { + /* capsule datagram is invalid (stream need to be aborted) */ + http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); + return HTTP_SM_STOP; + } + else + { + /* unknown capsule should be skipped */ + return HTTP_SM_STOP; + } + } + /* check if we have the full capsule */ + if (PREDICT_FALSE (req->payload_len != (payload_offset + payload_len))) + { + HTTP_DBG (1, "capsule not complete"); + http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); + return HTTP_SM_STOP; + } + if (http_io_as_max_write (&req->base) < (sizeof (hdr) + payload_len)) + { + clib_warning ("app's rx fifo full"); + http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp); + return HTTP_SM_STOP; + } + + hdr.data_length = payload_len; + hdr.data_offset = 0; + + /* send datagram header and payload */ + svm_fifo_seg_t segs[2] = { { (u8 *) &hdr, sizeof (hdr) }, + { req->payload + payload_offset, payload_len } }; + http_io_as_write_segs (&req->base, segs, 2); + http_app_worker_rx_notify (&req->base); + + if (req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED) + { + HTTP_DBG (1, "client want to close tunnel"); + session_transport_closing_notify (&req->base.connection); + } + + return HTTP_SM_STOP; +} /*************************************/ /* request state machine handlers TX */ /*************************************/ @@ -1180,9 +1371,7 @@ http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req, clib_llist_add_tail (wrk->req_pool, sched_list, req, he); http2_conn_schedule (h2c, hc->c_thread_index); - http_req_state_change (&req->base, req->base.is_tunnel ? - HTTP_REQ_STATE_TUNNEL : - HTTP_REQ_STATE_APP_IO_MORE_DATA); + http_req_state_change (&req->base, req->app_reply_next_state); http_req_deschedule (&req->base, sp); return HTTP_SM_STOP; @@ -1250,8 +1439,9 @@ static http2_sm_handler tx_state_funcs[HTTP_REQ_N_STATES] = { 0, /* wait transport method */ http2_req_state_wait_app_reply, http2_req_state_app_io_more_data, + /* both can be same, we use different scheduler data dispatch cb */ + http2_req_state_tunnel_tx, http2_req_state_tunnel_tx, - 0, /* udp tunnel */ }; static http2_sm_handler rx_state_funcs[HTTP_REQ_N_STATES] = { @@ -1263,7 +1453,7 @@ static http2_sm_handler rx_state_funcs[HTTP_REQ_N_STATES] = { 0, /* wait app reply */ 0, /* app io more data */ http2_req_state_tunnel_rx, - 0, /* udp tunnel */ + http2_req_state_udp_tunnel_rx, }; static_always_inline int @@ -1476,7 +1666,7 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh) /* bogus state */ if (hc->flags & HTTP_CONN_F_IS_SERVER && - req->stream_state != HTTP2_STREAM_STATE_OPEN) + req->stream_state != HTTP2_STREAM_STATE_OPEN && !req->base.is_tunnel) { HTTP_DBG (1, "error: stream already half-closed"); http2_stream_error (hc, req, HTTP2_ERROR_STREAM_CLOSED, 0); @@ -1497,15 +1687,32 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh) if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM) { - req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED; + HTTP_DBG (1, "END_STREAM flag set"); if (req->base.is_tunnel) { - session_transport_closing_notify (&req->base.connection); - HTTP_DBG (1, "client closed tunnel"); + /* client can initiate or confirm tunnel close */ + req->stream_state = + req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED ? + HTTP2_STREAM_STATE_CLOSED : + HTTP2_STREAM_STATE_HALF_CLOSED; /* final DATA frame could be empty */ if (fh->length == 0) - return HTTP2_ERROR_NO_ERROR; + { + if (req->stream_state == HTTP2_STREAM_STATE_CLOSED) + { + HTTP_DBG (1, "client closed tunnel"); + http2_stream_close (req, hc); + } + else + { + HTTP_DBG (1, "client want to close tunnel"); + session_transport_closing_notify (&req->base.connection); + } + return HTTP2_ERROR_NO_ERROR; + } } + else + req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED; } rx_buf = http_get_rx_buf (hc); @@ -1995,7 +2202,7 @@ http2_app_close_callback (http_conn_t *hc, u32 req_index, case HTTP2_STREAM_STATE_HALF_CLOSED: HTTP_DBG (1, "proxy confirmed close"); http2_tunnel_send_close (hc, req); - session_transport_closed_notify (&req->base.connection); + http2_stream_close (req, hc); break; default: session_transport_closed_notify (&req->base.connection);