package h2spec_extras
import (
+ "bytes"
+ "errors"
"fmt"
"slices"
"strconv"
+ "strings"
+ "github.com/quic-go/quic-go/http3"
+ "github.com/quic-go/quic-go/quicvarint"
"github.com/summerwind/h2spec/config"
"github.com/summerwind/h2spec/spec"
"golang.org/x/net/http2"
}
}
+func readTcpTunnel(conn *spec.Conn, streamID uint32) ([]byte, error) {
+ actual, passed := conn.WaitEventByType(spec.EventDataFrame)
+ switch event := actual.(type) {
+ case spec.DataFrameEvent:
+ passed = event.Header().StreamID == streamID
+ default:
+ passed = false
+ }
+ if !passed {
+ return nil, &spec.TestError{
+ Expected: []string{spec.EventDataFrame.String()},
+ Actual: actual.String(),
+ }
+ }
+ df, _ := actual.(spec.DataFrameEvent)
+ return df.Data(), nil
+}
+
func ConnectMethod() *spec.TestGroup {
tg := NewTestGroup("2", "CONNECT method")
return spec.VerifyStreamError(conn, http2.ErrCodeProtocol)
},
})
+
+ tg.AddTestGroup(ConnectUdp())
+
+ return tg
+}
+
+func ConnectUdpHeaders(c *config.Config) []hpack.HeaderField {
+
+ headers := spec.CommonHeaders(c)
+ headers[0].Value = "CONNECT"
+ headers = append(headers, spec.HeaderField(":protocol", "connect-udp"))
+ headers = append(headers, spec.HeaderField("capsule-protocol", "?1"))
+ return headers
+}
+
+func writeCapsule(conn *spec.Conn, streamID uint32, endStream bool, payload []byte) error {
+ b := make([]byte, 0)
+ b = quicvarint.Append(b, 0)
+ b = append(b, payload...)
+ var capsule bytes.Buffer
+ err := http3.WriteCapsule(&capsule, 0, b)
+ if err != nil {
+ return err
+ }
+
+ return conn.WriteData(streamID, endStream, capsule.Bytes())
+}
+
+func readCapsule(conn *spec.Conn, streamID uint32) ([]byte, error) {
+ actual, passed := conn.WaitEventByType(spec.EventDataFrame)
+ switch event := actual.(type) {
+ case spec.DataFrameEvent:
+ passed = event.Header().StreamID == streamID
+ default:
+ passed = false
+ }
+ if !passed {
+ return nil, &spec.TestError{
+ Expected: []string{spec.EventDataFrame.String()},
+ Actual: actual.String(),
+ }
+ }
+ df, _ := actual.(spec.DataFrameEvent)
+ r := bytes.NewReader(df.Data())
+ capsuleType, payloadReader, err := http3.ParseCapsule(r)
+ if err != nil {
+ return nil, err
+ }
+ if capsuleType != 0 {
+ return nil, errors.New("capsule type should be 0")
+ }
+ b := make([]byte, 1024)
+ n, err := payloadReader.Read(b)
+ if err != nil {
+ return nil, err
+ }
+ if n < 3 {
+ return nil, errors.New("response payload too short")
+ }
+ if b[0] != 0 {
+ return nil, errors.New("context id should be 0")
+ }
+ return b[1:n], nil
+}
+
+func ConnectUdp() *spec.TestGroup {
+ tg := NewTestGroup("3.1", "Proxying UDP in HTTP")
+
+ tg.AddTestCase(&spec.TestCase{
+ Desc: "Tunneling UDP over HTTP/2",
+ Requirement: "To initiate a UDP tunnel associated with a single HTTP stream, a client issues a request containing the \"connect-udp\" upgrade token. The target of the tunnel is indicated by the client to the UDP proxy via the \"target_host\" and \"target_port\" variables of the URI Template",
+ Run: func(c *config.Config, conn *spec.Conn) error {
+ var streamID uint32 = 1
+
+ err := conn.Handshake()
+ if err != nil {
+ return err
+ }
+
+ headers := ConnectUdpHeaders(c)
+ hp := http2.HeadersFrameParam{
+ StreamID: streamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ // verify response headers
+ actual, passed := conn.WaitEventByType(spec.EventHeadersFrame)
+ switch event := actual.(type) {
+ case spec.HeadersFrameEvent:
+ passed = event.Header().StreamID == streamID
+ default:
+ passed = false
+ }
+ if !passed {
+ expected := []string{
+ fmt.Sprintf("DATA Frame (length:1, flags:0x00, stream_id:%d)", streamID),
+ }
+
+ return &spec.TestError{
+ Expected: expected,
+ Actual: actual.String(),
+ }
+ }
+ hf, _ := actual.(spec.HeadersFrameEvent)
+ respHeaders := make([]hpack.HeaderField, 0, 256)
+ decoder := hpack.NewDecoder(4096, func(f hpack.HeaderField) { respHeaders = append(respHeaders, f) })
+ _, err = decoder.Write(hf.HeaderBlockFragment())
+ if err != nil {
+ return err
+ }
+ if !slices.Contains(respHeaders, spec.HeaderField("capsule-protocol", "?1")) {
+ hs := ""
+ for _, h := range respHeaders {
+ hs += h.String() + "\n"
+ }
+ return &spec.TestError{
+ Expected: []string{"\"capsule-protocol: ?1\" header received"},
+ Actual: hs,
+ }
+ }
+ if !slices.Contains(respHeaders, spec.HeaderField(":status", "200")) {
+ hs := ""
+ for _, h := range respHeaders {
+ hs += h.String() + "\n"
+ }
+ return &spec.TestError{
+ Expected: []string{"\":status: 200\" header received"},
+ Actual: hs,
+ }
+ }
+ for _, h := range respHeaders {
+ if h.Name == "content-length" {
+ return &spec.TestError{
+ Expected: []string{"\"content-length\" header must not be used"},
+ Actual: h.String(),
+ }
+ }
+ }
+
+ // send and receive data over tunnel
+ data := []byte("hello")
+ err = writeCapsule(conn, streamID, false, data)
+ if err != nil {
+ return err
+ }
+ resp, err := readCapsule(conn, streamID)
+ if err != nil {
+ return err
+ }
+ if !bytes.Equal(data, resp) {
+ return &spec.TestError{
+ Expected: []string{"capsule payload: " + string(data)},
+ Actual: "capsule payload:" + string(resp),
+ }
+ }
+ // try again
+ err = writeCapsule(conn, streamID, false, data)
+ if err != nil {
+ return err
+ }
+ resp, err = readCapsule(conn, streamID)
+ if err != nil {
+ return err
+ }
+ if !bytes.Equal(data, resp) {
+ return &spec.TestError{
+ Expected: []string{"capsule payload: " + string(data)},
+ Actual: "capsule payload:" + string(resp),
+ }
+ }
+ return nil
+ },
+ })
+
+ tg.AddTestCase(&spec.TestCase{
+ Desc: "Multiple tunnels",
+ Requirement: "In HTTP/2, the data stream of a given HTTP request consists of all bytes sent in DATA frames with the corresponding stream ID.",
+ Run: func(c *config.Config, conn *spec.Conn) error {
+ var streamID uint32 = 1
+
+ err := conn.Handshake()
+ if err != nil {
+ return err
+ }
+
+ maxStreams, ok := conn.Settings[http2.SettingMaxConcurrentStreams]
+ if !ok {
+ return spec.ErrSkipped
+ }
+
+ for i := 0; i < int(maxStreams); i++ {
+ headers := ConnectUdpHeaders(c)
+ hp := http2.HeadersFrameParam{
+ StreamID: streamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ err = spec.VerifyHeadersFrame(conn, streamID)
+ if err != nil {
+ return err
+ }
+
+ streamID += 2
+ }
+
+ streamID = 1
+ data := []byte("hello")
+ for i := 0; i < int(maxStreams); i++ {
+ err = writeCapsule(conn, streamID, false, data)
+ if err != nil {
+ return err
+ }
+ }
+
+ for i := 0; i < int(maxStreams); i++ {
+ resp, err := readCapsule(conn, streamID)
+ if err != nil {
+ return err
+ }
+ if !bytes.Equal(data, resp) {
+ return &spec.TestError{
+ Expected: []string{"capsule payload: " + string(data)},
+ Actual: "capsule payload:" + string(resp),
+ }
+ }
+ }
+
+ return nil
+ },
+ })
+
+ tg.AddTestCase(&spec.TestCase{
+ Desc: "Tunnel closed by client (with attached data)",
+ Requirement: "A proxy that receives a DATA frame with the END_STREAM flag set sends the attached data and close UDP connection.",
+ Run: func(c *config.Config, conn *spec.Conn) error {
+ var streamID uint32 = 1
+
+ err := conn.Handshake()
+ if err != nil {
+ return err
+ }
+
+ headers := ConnectUdpHeaders(c)
+ hp := http2.HeadersFrameParam{
+ StreamID: streamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ err = spec.VerifyHeadersFrame(conn, streamID)
+ if err != nil {
+ return err
+ }
+
+ // close tunnel
+ data := []byte("hello")
+ err = writeCapsule(conn, streamID, true, data)
+ if err != nil {
+ return err
+ }
+
+ // wait for DATA frame with END_STREAM flag set
+ err = VerifyTunnelClosed(conn)
+ if err != nil {
+ return err
+ }
+
+ return nil
+ },
+ })
+
+ tg.AddTestCase(&spec.TestCase{
+ Desc: "Tunnel closed by client (empty DATA frame)",
+ Requirement: "The final DATA frame could be empty.",
+ Run: func(c *config.Config, conn *spec.Conn) error {
+ var streamID uint32 = 1
+
+ err := conn.Handshake()
+ if err != nil {
+ return err
+ }
+
+ headers := ConnectUdpHeaders(c)
+ hp := http2.HeadersFrameParam{
+ StreamID: streamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ err = spec.VerifyHeadersFrame(conn, streamID)
+ if err != nil {
+ return err
+ }
+
+ // send and receive data over tunnel
+ data := []byte("hello")
+ err = writeCapsule(conn, streamID, false, data)
+ if err != nil {
+ return err
+ }
+ resp, err := readCapsule(conn, streamID)
+ if err != nil {
+ return err
+ }
+ if !bytes.Equal(data, resp) {
+ return &spec.TestError{
+ Expected: []string{"capsule payload: " + string(data)},
+ Actual: "capsule payload:" + string(resp),
+ }
+ }
+
+ // close tunnel
+ conn.WriteData(streamID, true, []byte(""))
+
+ // wait for DATA frame with END_STREAM flag set
+ err = VerifyTunnelClosed(conn)
+ if err != nil {
+ return err
+ }
+
+ return nil
+ },
+ })
+
+ tg.AddTestCase(&spec.TestCase{
+ Desc: "CONNECT and CONNECT-UDP on single connection",
+ Requirement: "One stream establish TCP tunnel and second UDP tunnel.",
+ Run: func(c *config.Config, conn *spec.Conn) error {
+ err := conn.Handshake()
+ if err != nil {
+ return err
+ }
+
+ var udpTunnelStreamID uint32 = 1
+ var tcpTunnelStreamID uint32 = 3
+
+ headers := ConnectUdpHeaders(c)
+ hp := http2.HeadersFrameParam{
+ StreamID: udpTunnelStreamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ err = spec.VerifyHeadersFrame(conn, udpTunnelStreamID)
+ if err != nil {
+ return err
+ }
+
+ pathSplit := strings.Split(c.Path, "/")
+ path := fmt.Sprintf("%s:%s", pathSplit[4], pathSplit[5])
+ headers = []hpack.HeaderField{
+ spec.HeaderField(":method", "CONNECT"),
+ spec.HeaderField(":authority", path),
+ }
+ hp = http2.HeadersFrameParam{
+ StreamID: tcpTunnelStreamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ err = spec.VerifyHeadersFrame(conn, tcpTunnelStreamID)
+ if err != nil {
+ return err
+ }
+
+ // send and receive data over UDP tunnel
+ udpData := []byte("hello UDP")
+ err = writeCapsule(conn, udpTunnelStreamID, false, udpData)
+ if err != nil {
+ return err
+ }
+ udpResp, err := readCapsule(conn, udpTunnelStreamID)
+ if err != nil {
+ return err
+ }
+ if !bytes.Equal(udpData, udpResp) {
+ return &spec.TestError{
+ Expected: []string{"capsule payload: " + string(udpData)},
+ Actual: "capsule payload:" + string(udpResp),
+ }
+ }
+
+ // send and receive data over TCP tunnel
+ tcpData := []byte("hello TCP")
+ conn.WriteData(tcpTunnelStreamID, false, tcpData)
+ tcpResp, err := readTcpTunnel(conn, tcpTunnelStreamID)
+ if !bytes.Equal(tcpData, tcpResp) {
+ return &spec.TestError{
+ Expected: []string{"payload: " + string(tcpData)},
+ Actual: "payload:" + string(tcpResp),
+ }
+ }
+
+ // send and receive data over TCP tunnel
+ conn.WriteData(tcpTunnelStreamID, false, tcpData)
+ tcpResp, err = readTcpTunnel(conn, tcpTunnelStreamID)
+ if !bytes.Equal(tcpData, tcpResp) {
+ return &spec.TestError{
+ Expected: []string{"payload: " + string(tcpData)},
+ Actual: "payload:" + string(tcpResp),
+ }
+ }
+
+ // send and receive data over UDP tunnel
+ err = writeCapsule(conn, udpTunnelStreamID, false, udpData)
+ if err != nil {
+ return err
+ }
+ udpResp, err = readCapsule(conn, udpTunnelStreamID)
+ if err != nil {
+ return err
+ }
+ if !bytes.Equal(udpData, udpResp) {
+ return &spec.TestError{
+ Expected: []string{"capsule payload: " + string(udpData)},
+ Actual: "capsule payload:" + string(udpResp),
+ }
+ }
+
+ return nil
+ },
+ })
+
return tg
}
"os"
"reflect"
"runtime"
- "strconv"
"strings"
"time"
}
}
-func (s *VppProxySuite) StartEchoServer() *net.TCPListener {
- listener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(s.ServerAddr()), Port: int(s.Ports.Server)})
- s.AssertNil(err, fmt.Sprint(err))
- go func() {
- for {
- conn, err := listener.Accept()
- if err != nil {
- continue
- }
- go handleConn(conn)
- }
- }()
- s.Log("* started tcp echo server " + s.ServerAddr() + ":" + strconv.Itoa(int(s.Ports.Server)))
- return listener
-}
-
var _ = Describe("VppProxySuite", Ordered, ContinueOnFailure, func() {
var s VppProxySuite
BeforeAll(func() {
"os"
"reflect"
"runtime"
- "strconv"
"strings"
"time"
return s.Interfaces.Client.Ip4AddressString()
}
-func (s *VppUdpProxySuite) StartEchoServer() *net.UDPConn {
- conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(s.ServerAddr()), Port: s.Ports.Server})
- s.AssertNil(err, fmt.Sprint(err))
- go func() {
- for {
- b := make([]byte, 1500)
- n, addr, err := conn.ReadFrom(b)
- if err != nil {
- return
- }
- if _, err := conn.WriteTo(b[:n], addr); err != nil {
- return
- }
- }
- }()
- s.Log("* started udp echo server " + s.ServerAddr() + ":" + strconv.Itoa(s.Ports.Server))
- return conn
-}
-
func (s *VppUdpProxySuite) ClientSendReceive(toSend []byte, rcvBuffer []byte) (int, error) {
proxiedConn, err := net.DialUDP("udp",
&net.UDPAddr{IP: net.ParseIP(s.ClientAddr()), Port: 0},
}{
{desc: "extras/3/1"},
{desc: "extras/3/2"},
+ {desc: "extras/3.1/1"},
+ {desc: "extras/3.1/2"},
+ {desc: "extras/3.1/3"},
+ {desc: "extras/3.1/4"},
+ {desc: "extras/3.1/5"},
}
for _, test := range testCases {
It(testName, func(ctx SpecContext) {
s.Log(testName + ": BEGIN")
vppProxy := s.Containers.VppProxy.VppInstance
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
defer remoteServerConn.Close()
+ // this one will open TCP tunnel too
+ if strings.Contains(test.desc, "extras/3.1/5") {
+ remoteTcpServerConn := s.StartTcpEchoServer(s.ServerAddr(), s.Ports.Server)
+ defer remoteTcpServerConn.Close()
+ }
cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri https://%s/%d", s.VppProxyAddr(), s.Ports.Proxy)
s.Log(vppProxy.Vppctl(cmd))
path := fmt.Sprintf("/.well-known/masque/udp/%s/%d/", s.ServerAddr(), s.Ports.Server)
Host: s.VppProxyAddr(),
Port: s.Ports.Proxy,
Path: path,
- Timeout: time.Second * s.MaxTimeout,
+ Timeout: s.MaxTimeout,
MaxHeaderLen: 4096,
TLS: true,
Insecure: true,
"net/http/httputil"
"os"
"os/exec"
+ "strconv"
"strings"
"time"
}
return "", false
}
+
+func (s *HstSuite) StartTcpEchoServer(addr string, port int) *net.TCPListener {
+ listener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(addr), Port: port})
+ s.AssertNil(err, fmt.Sprint(err))
+ go func() {
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ continue
+ }
+ go handleConn(conn)
+ }
+ }()
+ s.Log("* started tcp echo server " + addr + ":" + strconv.Itoa(port))
+ return listener
+}
+
+func (s *HstSuite) StartUdpEchoServer(addr string, port int) *net.UDPConn {
+ conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(addr), Port: port})
+ s.AssertNil(err, fmt.Sprint(err))
+ go func() {
+ for {
+ b := make([]byte, 1500)
+ n, addr, err := conn.ReadFrom(b)
+ if err != nil {
+ return
+ }
+ if _, err := conn.WriteTo(b[:n], addr); err != nil {
+ return
+ }
+ }
+ }()
+ s.Log("* started udp echo server " + addr + ":" + strconv.Itoa(port))
+ return conn
+}
}
func VppConnectProxyStressTest(s *VppProxySuite) {
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartTcpEchoServer(s.ServerAddr(), int(s.Ports.Server))
defer remoteServerConn.Close()
s.ConfigureVppProxy("http", s.Ports.Proxy)
func VppConnectProxyStressMWTest(s *VppProxySuite) {
s.CpusPerVppContainer = 3
s.SetupTest()
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartTcpEchoServer(s.ServerAddr(), int(s.Ports.Server))
defer remoteServerConn.Close()
vppProxy := s.Containers.VppProxy.VppInstance
}
func VppProxyUdpTest(s *VppUdpProxySuite) {
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
defer remoteServerConn.Close()
vppProxy := s.Containers.VppProxy.VppInstance
func VppProxyUdpMigrationMWTest(s *VppUdpProxySuite) {
s.CpusPerVppContainer = 3
s.SetupTest()
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
defer remoteServerConn.Close()
vppProxy := s.Containers.VppProxy.VppInstance
}
func VppConnectUdpProxyTest(s *VppUdpProxySuite) {
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
defer remoteServerConn.Close()
vppProxy := s.Containers.VppProxy.VppInstance
}
func VppConnectUdpInvalidCapsuleTest(s *VppUdpProxySuite) {
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
defer remoteServerConn.Close()
vppProxy := s.Containers.VppProxy.VppInstance
}
func VppConnectUdpUnknownCapsuleTest(s *VppUdpProxySuite) {
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
defer remoteServerConn.Close()
vppProxy := s.Containers.VppProxy.VppInstance
}
func VppConnectUdpClientCloseTest(s *VppUdpProxySuite) {
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
defer remoteServerConn.Close()
vppProxy := s.Containers.VppProxy.VppInstance
}
func VppConnectUdpStressTest(s *VppUdpProxySuite) {
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
defer remoteServerConn.Close()
vppProxy := s.Containers.VppProxy.VppInstance
func VppConnectUdpStressMWTest(s *VppUdpProxySuite) {
s.CpusPerVppContainer = 3
s.SetupTest()
- remoteServerConn := s.StartEchoServer()
+ remoteServerConn := s.StartUdpEchoServer(s.ServerAddr(), s.Ports.Server)
defer remoteServerConn.Close()
vppProxy := s.Containers.VppProxy.VppInstance
u8 *payload;
u32 payload_len;
clib_llist_anchor_t sched_list;
+ http_req_state_t app_reply_next_state;
void (*dispatch_headers_cb) (struct http2_req_ *req, http_conn_t *hc,
u8 *n_emissions, clib_llist_index_t *next_ri);
void (*dispatch_data_cb) (struct http2_req_ *req, http_conn_t *hc,
http_io_ts_after_write (hc, 0);
}
+static void
+http2_sched_dispatch_udp_tunnel (http2_req_t *req, http_conn_t *hc,
+ u8 *n_emissions)
+{
+ http2_conn_ctx_t *h2c;
+ u32 max_write, max_read, dgram_size, capsule_size, n_written;
+ session_dgram_hdr_t hdr;
+ u8 fh[HTTP2_FRAME_HEADER_SIZE];
+ u8 *buf, *payload;
+
+ *n_emissions += HTTP2_SCHED_WEIGHT_DATA_INLINE;
+
+ max_read = http_io_as_max_read (&req->base);
+ if (max_read == 0)
+ {
+ HTTP_DBG (2, "max_read == 0");
+ transport_connection_reschedule (&req->base.connection);
+ return;
+ }
+ /* read datagram header */
+ http_io_as_read (&req->base, (u8 *) &hdr, sizeof (hdr), 1);
+ ASSERT (hdr.data_length <= HTTP_UDP_PAYLOAD_MAX_LEN);
+ dgram_size = hdr.data_length + SESSION_CONN_HDR_LEN;
+ ASSERT (max_read >= dgram_size);
+
+ h2c = http2_conn_ctx_get_w_thread (hc);
+
+ if (PREDICT_FALSE (
+ (hdr.data_length + HTTP_UDP_PROXY_DATAGRAM_CAPSULE_OVERHEAD) >
+ h2c->peer_settings.max_frame_size))
+ {
+ /* drop datagram if not fit into frame */
+ HTTP_DBG (1, "datagram too large, dropped");
+ http_io_as_drain (&req->base, dgram_size);
+ return;
+ }
+
+ if (req->peer_window <
+ (hdr.data_length + HTTP_UDP_PROXY_DATAGRAM_CAPSULE_OVERHEAD))
+ {
+ /* mark that we need window update on stream */
+ HTTP_DBG (1, "not enough space in stream window for capsule");
+ req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+ }
+
+ max_write = http_io_ts_max_write (hc, 0);
+ max_write -= HTTP2_FRAME_HEADER_SIZE;
+ max_write -= HTTP_UDP_PROXY_DATAGRAM_CAPSULE_OVERHEAD;
+ max_write = clib_min (max_write, h2c->peer_window);
+ if (PREDICT_FALSE (max_write < hdr.data_length))
+ {
+ /* we should have at least 16kB free space in underlying transport,
+ * maybe peer is doing small connection window updates */
+ HTTP_DBG (1, "datagram dropped");
+ http_io_as_drain (&req->base, dgram_size);
+ return;
+ }
+
+ buf = http_get_tx_buf (hc);
+ /* create capsule header */
+ payload = http_encap_udp_payload_datagram (buf, hdr.data_length);
+ capsule_size = (payload - buf) + hdr.data_length;
+ /* read payload */
+ http_io_as_read (&req->base, payload, hdr.data_length, 1);
+ http_io_as_drain (&req->base, dgram_size);
+
+ req->peer_window -= capsule_size;
+ h2c->peer_window -= capsule_size;
+
+ http2_frame_write_data_header (capsule_size, req->stream_id, 0, fh);
+
+ svm_fifo_seg_t segs[2] = { { fh, HTTP2_FRAME_HEADER_SIZE },
+ { buf, capsule_size } };
+ n_written = http_io_ts_write_segs (hc, segs, 2, 0);
+ ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + capsule_size));
+
+ if (max_read - dgram_size)
+ {
+ /* schedule for next round if we have more data */
+ HTTP_DBG (1, "adding to data queue req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
+ http2_req_schedule_data_tx (hc, req);
+ }
+ else
+ transport_connection_reschedule (&req->base.connection);
+
+ http_io_ts_after_write (hc, 0);
+}
+
static void
http2_sched_dispatch_continuation (http2_req_t *req, http_conn_t *hc,
u8 *n_emissions,
response = http_get_tx_buf (hc);
date = format (0, "%U", format_http_time_now, hc);
- control_data.sc = msg.code;
control_data.content_len = msg.data.body_len;
control_data.server_name = hc->app_name;
control_data.server_name_len = vec_len (hc->app_name);
control_data.date = date;
control_data.date_len = vec_len (date);
+ if (req->base.is_tunnel)
+ {
+ switch (msg.code)
+ {
+ case HTTP_STATUS_SWITCHING_PROTOCOLS:
+ /* remap status code for extended connect response */
+ msg.code = HTTP_STATUS_OK;
+ case HTTP_STATUS_OK:
+ case HTTP_STATUS_CREATED:
+ case HTTP_STATUS_ACCEPTED:
+ /* tunnel established if 2xx (Successful) response to CONNECT */
+ control_data.content_len = HPACK_ENCODER_SKIP_CONTENT_LEN;
+ break;
+ default:
+ /* tunnel not established */
+ req->base.is_tunnel = 0;
+ break;
+ }
+ }
+ control_data.sc = msg.code;
+
if (msg.data.headers_len)
{
n_deq += msg.data.type == HTTP_MSG_DATA_PTR ? sizeof (uword) :
stream_id = req->stream_id;
- /* tunnel established if 2xx (Successful) response to CONNECT */
- req->base.is_tunnel =
- (req->base.is_tunnel && http_status_code_str[msg.code][0] == '2');
-
/* END_STREAM flag need to be set in HEADERS frame */
if (msg.data.body_len)
{
}
else if (req->base.is_tunnel)
{
- req->dispatch_data_cb = http2_sched_dispatch_tunnel;
+ if (req->base.upgrade_proto == HTTP_UPGRADE_PROTO_CONNECT_UDP &&
+ hc->udp_tunnel_mode == HTTP_UDP_TUNNEL_DGRAM)
+ req->dispatch_data_cb = http2_sched_dispatch_udp_tunnel;
+ else
+ req->dispatch_data_cb = http2_sched_dispatch_tunnel;
transport_connection_reschedule (&req->base.connection);
/* cleanup some stuff we don't need anymore in tunnel mode */
vec_free (req->base.headers);
req->base.headers_offset = control_data.headers - wrk->header_list;
req->base.headers_len = control_data.headers_len;
+ req->app_reply_next_state = HTTP_REQ_STATE_APP_IO_MORE_DATA;
+
if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_METHOD_PARSED))
{
HTTP_DBG (1, ":method pseudo-header missing in request");
}
if (control_data.method == HTTP_REQ_CONNECT)
{
+ req->app_reply_next_state = HTTP_REQ_STATE_TUNNEL;
if (control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PROTOCOL_PARSED)
{
/* extended CONNECT (RFC8441) */
http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp);
return HTTP_SM_STOP;
}
+ if (req->base.upgrade_proto == HTTP_UPGRADE_PROTO_CONNECT_UDP &&
+ hc->udp_tunnel_mode == HTTP_UDP_TUNNEL_DGRAM)
+ req->app_reply_next_state = HTTP_REQ_STATE_UDP_TUNNEL;
}
else
{
http_io_as_write (&req->base, req->payload, req->payload_len);
http_app_worker_rx_notify (&req->base);
+ switch (req->stream_state)
+ {
+ case HTTP2_STREAM_STATE_HALF_CLOSED:
+ HTTP_DBG (1, "client want to close tunnel");
+ session_transport_closing_notify (&req->base.connection);
+ break;
+ case HTTP2_STREAM_STATE_CLOSED:
+ HTTP_DBG (1, "client closed tunnel");
+ http2_stream_close (req, hc);
+ break;
+ default:
+ break;
+ }
+
return HTTP_SM_STOP;
}
+static http_sm_result_t
+http2_req_state_udp_tunnel_rx (http_conn_t *hc, http2_req_t *req,
+ transport_send_params_t *sp,
+ http2_error_t *error)
+{
+ int rv;
+ u8 payload_offset;
+ u64 payload_len;
+ session_dgram_hdr_t hdr;
+
+ HTTP_DBG (1, "udp tunnel received data from client");
+
+ rv = http_decap_udp_payload_datagram (req->payload, req->payload_len,
+ &payload_offset, &payload_len);
+ HTTP_DBG (1, "rv=%d, payload_offset=%u, payload_len=%llu", rv,
+ payload_offset, payload_len);
+ if (PREDICT_FALSE (rv != 0))
+ {
+ if (rv < 0)
+ {
+ /* capsule datagram is invalid (stream need to be aborted) */
+ http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+ return HTTP_SM_STOP;
+ }
+ else
+ {
+ /* unknown capsule should be skipped */
+ return HTTP_SM_STOP;
+ }
+ }
+ /* check if we have the full capsule */
+ if (PREDICT_FALSE (req->payload_len != (payload_offset + payload_len)))
+ {
+ HTTP_DBG (1, "capsule not complete");
+ http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+ return HTTP_SM_STOP;
+ }
+ if (http_io_as_max_write (&req->base) < (sizeof (hdr) + payload_len))
+ {
+ clib_warning ("app's rx fifo full");
+ http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp);
+ return HTTP_SM_STOP;
+ }
+
+ hdr.data_length = payload_len;
+ hdr.data_offset = 0;
+
+ /* send datagram header and payload */
+ svm_fifo_seg_t segs[2] = { { (u8 *) &hdr, sizeof (hdr) },
+ { req->payload + payload_offset, payload_len } };
+ http_io_as_write_segs (&req->base, segs, 2);
+ http_app_worker_rx_notify (&req->base);
+
+ if (req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED)
+ {
+ HTTP_DBG (1, "client want to close tunnel");
+ session_transport_closing_notify (&req->base.connection);
+ }
+
+ return HTTP_SM_STOP;
+}
/*************************************/
/* request state machine handlers TX */
/*************************************/
clib_llist_add_tail (wrk->req_pool, sched_list, req, he);
http2_conn_schedule (h2c, hc->c_thread_index);
- http_req_state_change (&req->base, req->base.is_tunnel ?
- HTTP_REQ_STATE_TUNNEL :
- HTTP_REQ_STATE_APP_IO_MORE_DATA);
+ http_req_state_change (&req->base, req->app_reply_next_state);
http_req_deschedule (&req->base, sp);
return HTTP_SM_STOP;
0, /* wait transport method */
http2_req_state_wait_app_reply,
http2_req_state_app_io_more_data,
+ /* both can be same, we use different scheduler data dispatch cb */
+ http2_req_state_tunnel_tx,
http2_req_state_tunnel_tx,
- 0, /* udp tunnel */
};
static http2_sm_handler rx_state_funcs[HTTP_REQ_N_STATES] = {
0, /* wait app reply */
0, /* app io more data */
http2_req_state_tunnel_rx,
- 0, /* udp tunnel */
+ http2_req_state_udp_tunnel_rx,
};
static_always_inline int
/* bogus state */
if (hc->flags & HTTP_CONN_F_IS_SERVER &&
- req->stream_state != HTTP2_STREAM_STATE_OPEN)
+ req->stream_state != HTTP2_STREAM_STATE_OPEN && !req->base.is_tunnel)
{
HTTP_DBG (1, "error: stream already half-closed");
http2_stream_error (hc, req, HTTP2_ERROR_STREAM_CLOSED, 0);
if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM)
{
- req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+ HTTP_DBG (1, "END_STREAM flag set");
if (req->base.is_tunnel)
{
- session_transport_closing_notify (&req->base.connection);
- HTTP_DBG (1, "client closed tunnel");
+ /* client can initiate or confirm tunnel close */
+ req->stream_state =
+ req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED ?
+ HTTP2_STREAM_STATE_CLOSED :
+ HTTP2_STREAM_STATE_HALF_CLOSED;
/* final DATA frame could be empty */
if (fh->length == 0)
- return HTTP2_ERROR_NO_ERROR;
+ {
+ if (req->stream_state == HTTP2_STREAM_STATE_CLOSED)
+ {
+ HTTP_DBG (1, "client closed tunnel");
+ http2_stream_close (req, hc);
+ }
+ else
+ {
+ HTTP_DBG (1, "client want to close tunnel");
+ session_transport_closing_notify (&req->base.connection);
+ }
+ return HTTP2_ERROR_NO_ERROR;
+ }
}
+ else
+ req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
}
rx_buf = http_get_rx_buf (hc);
case HTTP2_STREAM_STATE_HALF_CLOSED:
HTTP_DBG (1, "proxy confirmed close");
http2_tunnel_send_close (hc, req);
- session_transport_closed_notify (&req->base.connection);
+ http2_stream_close (req, hc);
break;
default:
session_transport_closed_notify (&req->base.connection);