import (
"fmt"
+ "slices"
"github.com/summerwind/h2spec/config"
"github.com/summerwind/h2spec/spec"
"golang.org/x/net/http2"
+ "golang.org/x/net/http2/hpack"
)
var key = "extras"
}
tg.AddTestGroup(FlowControl())
+ tg.AddTestGroup(ConnectMethod())
return tg
}
return nil
}
+func VerifyTunnelClosed(conn *spec.Conn) error {
+ var streamClosed = false
+ var lastEvent spec.Event
+ for !conn.Closed {
+ ev := conn.WaitEvent()
+ lastEvent = ev
+ switch event := ev.(type) {
+ case spec.DataFrameEvent:
+ if event.StreamEnded() {
+ streamClosed = true
+ goto done
+ }
+ case spec.TimeoutEvent:
+ goto done
+ }
+ }
+done:
+ if !streamClosed {
+ return &spec.TestError{
+ Expected: []string{spec.ExpectedStreamClosed},
+ Actual: lastEvent.String(),
+ }
+ }
+ return nil
+}
+
func FlowControl() *spec.TestGroup {
tg := NewTestGroup("1", "Flow control")
tg.AddTestCase(&spec.TestCase{
})
return tg
}
+
+func ConnectHeaders(c *config.Config) []hpack.HeaderField {
+
+ return []hpack.HeaderField{
+ spec.HeaderField(":method", "CONNECT"),
+ spec.HeaderField(":authority", c.Path),
+ }
+}
+
+func ConnectMethod() *spec.TestGroup {
+ tg := NewTestGroup("2", "CONNECT method")
+
+ tg.AddTestCase(&spec.TestCase{
+ Desc: "Tunnel closed by target",
+ Requirement: "A proxy that receives a TCP segment with the FIN bit set sends a DATA frame with the END_STREAM flag set.",
+ Run: func(c *config.Config, conn *spec.Conn) error {
+ var streamID uint32 = 1
+
+ err := conn.Handshake()
+ if err != nil {
+ return err
+ }
+
+ headers := ConnectHeaders(c)
+ hp := http2.HeadersFrameParam{
+ StreamID: streamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ err = spec.VerifyHeadersFrame(conn, streamID)
+ if err != nil {
+ return err
+ }
+
+ // send http/1.0 so target will close connection when send response
+ conn.WriteData(streamID, false, []byte("GET /index.html HTTP/1.0\r\n\r\n"))
+
+ // wait for DATA frame with END_STREAM flag set
+ err = VerifyTunnelClosed(conn)
+ if err != nil {
+ return err
+ }
+
+ // client is expected to send DATA frame with the END_STREAM flag set
+ conn.WriteData(streamID, true, []byte(""))
+
+ return nil
+ },
+ })
+
+ tg.AddTestCase(&spec.TestCase{
+ Desc: "Tunnel closed by client (with attached data)",
+ Requirement: "A proxy that receives a DATA frame with the END_STREAM flag set sends the attached data with the FIN bit set on the last TCP segment.",
+ Run: func(c *config.Config, conn *spec.Conn) error {
+ var streamID uint32 = 1
+
+ err := conn.Handshake()
+ if err != nil {
+ return err
+ }
+
+ headers := ConnectHeaders(c)
+ hp := http2.HeadersFrameParam{
+ StreamID: streamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ err = spec.VerifyHeadersFrame(conn, streamID)
+ if err != nil {
+ return err
+ }
+
+ // close tunnel
+ conn.WriteData(streamID, true, []byte("HEAD /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n"))
+
+ // wait for DATA frame with END_STREAM flag set
+ err = VerifyTunnelClosed(conn)
+ if err != nil {
+ return err
+ }
+
+ return nil
+ },
+ })
+
+ tg.AddTestCase(&spec.TestCase{
+ Desc: "Tunnel closed by client (empty DATA frame)",
+ Requirement: "The final DATA frame could be empty.",
+ Run: func(c *config.Config, conn *spec.Conn) error {
+ var streamID uint32 = 1
+
+ err := conn.Handshake()
+ if err != nil {
+ return err
+ }
+
+ headers := ConnectHeaders(c)
+ hp := http2.HeadersFrameParam{
+ StreamID: streamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ err = spec.VerifyHeadersFrame(conn, streamID)
+ if err != nil {
+ return err
+ }
+
+ conn.WriteData(streamID, false, []byte("HEAD /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n"))
+
+ // verify reception of response DATA frame
+ err = spec.VerifyEventType(conn, spec.EventDataFrame)
+ if err != nil {
+ return err
+ }
+
+ // close tunnel
+ conn.WriteData(streamID, true, []byte(""))
+
+ // wait for DATA frame with END_STREAM flag set
+ err = VerifyTunnelClosed(conn)
+ if err != nil {
+ return err
+ }
+
+ return nil
+ },
+ })
+
+ tg.AddTestCase(&spec.TestCase{
+ Desc: "Multiple tunnels",
+ Requirement: "In HTTP/2, the CONNECT method establishes a tunnel over a single HTTP/2 stream to a remote host, rather than converting the entire connection to a tunnel.",
+ Run: func(c *config.Config, conn *spec.Conn) error {
+ var streamID uint32 = 1
+
+ err := conn.Handshake()
+ if err != nil {
+ return err
+ }
+
+ maxStreams, ok := conn.Settings[http2.SettingMaxConcurrentStreams]
+ if !ok {
+ return spec.ErrSkipped
+ }
+
+ for i := 0; i < int(maxStreams); i++ {
+ headers := ConnectHeaders(c)
+ hp := http2.HeadersFrameParam{
+ StreamID: streamID,
+ EndStream: false,
+ EndHeaders: true,
+ BlockFragment: conn.EncodeHeaders(headers),
+ }
+ conn.WriteHeaders(hp)
+ err = spec.VerifyHeadersFrame(conn, streamID)
+ if err != nil {
+ return err
+ }
+
+ streamID += 2
+ }
+
+ streamID = 1
+ for i := 0; i < int(maxStreams); i++ {
+ conn.WriteData(streamID, false, []byte("HEAD /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n"))
+ streamID += 2
+ }
+
+ var receivedResp []uint32
+ for i := 0; i < int(maxStreams); i++ {
+ actual, passed := conn.WaitEventByType(spec.EventDataFrame)
+ switch event := actual.(type) {
+ case spec.DataFrameEvent:
+ passed = !slices.Contains(receivedResp, event.StreamID)
+ default:
+ passed = false
+ }
+ if !passed {
+ expected := []string{
+ "Receive one response per stream (tunnel)",
+ }
+
+ return &spec.TestError{
+ Expected: expected,
+ Actual: actual.String(),
+ }
+ }
+ }
+
+ return nil
+ },
+ })
+ return tg
+}
package hst
import (
+ "bytes"
"fmt"
+ "io"
"net"
+ "os"
"reflect"
"runtime"
"strconv"
"strings"
+ "time"
+ "fd.io/hs-test/h2spec_extras"
. "fd.io/hs-test/infra/common"
. "github.com/onsi/ginkgo/v2"
+ "github.com/summerwind/h2spec/config"
)
const (
s.AssertNil(s.Containers.NginxServerTransient.Start())
}
+func (s *VppProxySuite) ConfigureVppProxy(proto string, proxyPort uint16) {
+ vppProxy := s.Containers.VppProxy.VppInstance
+ cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri %s://%s:%d", proto, s.VppProxyAddr(), proxyPort)
+ if proto != "http" && proto != "https" && proto != "udp" {
+ proto = "tcp"
+ }
+ if proto != "http" && proto != "https" {
+ cmd += fmt.Sprintf(" client-uri %s://%s:%d", proto, s.ServerAddr(), s.Ports.Server)
+ }
+
+ output := vppProxy.Vppctl(cmd)
+ s.Log("proxy configured: " + output)
+}
+
func (s *VppProxySuite) ServerAddr() string {
return s.Interfaces.Server.Ip4AddressString()
}
s.AssertNotContains(log, "Operation timed out")
}
-func (s *VppProxySuite) CurlDownloadResourceViaTunnel(uri string, proxyUri string) {
- args := fmt.Sprintf("-w @/tmp/write_out_download_connect --max-time %d --insecure --proxy-insecure -p -x %s --remote-name --output-dir /tmp %s", s.maxTimeout, proxyUri, uri)
+func (s *VppProxySuite) CurlDownloadResourceViaTunnel(uri string, proxyUri string, extraArgs ...string) (string, string) {
+ extras := ""
+ if len(extraArgs) > 0 {
+ extras = strings.Join(extraArgs, " ")
+ extras += " "
+ }
+ args := fmt.Sprintf("%s-w @/tmp/write_out_download_connect --max-time %d --insecure --proxy-insecure -p -x %s --remote-name --output-dir /tmp %s", extras, s.maxTimeout, proxyUri, uri)
writeOut, log := s.RunCurlContainer(s.Containers.Curl, args)
- s.AssertContains(writeOut, "CONNECT response code: 200")
+ if strings.Contains(extras, "proxy-http2") {
+ // in case of h2 connect response code is 000 in write out
+ s.AssertContains(log, "CONNECT tunnel established, response 200")
+ } else {
+ s.AssertContains(writeOut, "CONNECT response code: 200")
+ }
s.AssertContains(writeOut, "GET response code: 200")
s.AssertNotContains(log, "bytes remaining to read")
s.AssertNotContains(log, "Operation timed out")
s.AssertNotContains(log, "Upgrade:")
+ return writeOut, log
}
-func (s *VppProxySuite) CurlUploadResourceViaTunnel(uri, proxyUri, file string) {
- args := fmt.Sprintf("-w @/tmp/write_out_upload_connect --max-time %d --insecure --proxy-insecure -p -x %s -T %s %s", s.maxTimeout, proxyUri, file, uri)
+func (s *VppProxySuite) CurlUploadResourceViaTunnel(uri, proxyUri, file string, extraArgs ...string) (string, string) {
+ extras := ""
+ if len(extraArgs) > 0 {
+ extras = strings.Join(extraArgs, " ")
+ extras += " "
+ }
+ args := fmt.Sprintf("%s-w @/tmp/write_out_upload_connect --max-time %d --insecure --proxy-insecure -p -x %s -T %s %s", extras, s.maxTimeout, proxyUri, file, uri)
writeOut, log := s.RunCurlContainer(s.Containers.Curl, args)
- s.AssertContains(writeOut, "CONNECT response code: 200")
+ if strings.Contains(extras, "proxy-http2") {
+ // in case of h2 connect response code is 000 in write out
+ s.AssertContains(log, "CONNECT tunnel established, response 200")
+ } else {
+ s.AssertContains(writeOut, "CONNECT response code: 200")
+ }
s.AssertContains(writeOut, "PUT response code: 201")
s.AssertNotContains(log, "Operation timed out")
s.AssertNotContains(log, "Upgrade:")
+ return writeOut, log
}
func handleConn(conn net.Conn) {
}
}
})
+
+var _ = Describe("H2SpecProxySuite", Ordered, ContinueOnFailure, func() {
+ var s VppProxySuite
+ BeforeAll(func() {
+ s.SetupSuite()
+ })
+ BeforeEach(func() {
+ s.SetupTest()
+ })
+ AfterAll(func() {
+ s.TeardownSuite()
+ })
+ AfterEach(func() {
+ s.TeardownTest()
+ })
+
+ testCases := []struct {
+ desc string
+ }{
+ {desc: "extras/2/1"},
+ {desc: "extras/2/2"},
+ {desc: "extras/2/3"},
+ {desc: "extras/2/4"},
+ }
+
+ for _, test := range testCases {
+ test := test
+ testName := "proxy_test.go/h2spec_" + strings.ReplaceAll(test.desc, "/", "_")
+ It(testName, func(ctx SpecContext) {
+ s.Log(testName + ": BEGIN")
+ s.SetupNginxServer()
+ s.ConfigureVppProxy("https", s.Ports.Proxy)
+ path := fmt.Sprintf("%s:%d", s.ServerAddr(), s.Ports.Server)
+ conf := &config.Config{
+ Host: s.VppProxyAddr(),
+ Port: int(s.Ports.Proxy),
+ Path: path,
+ Timeout: time.Second * time.Duration(s.maxTimeout),
+ MaxHeaderLen: 4096,
+ TLS: true,
+ Insecure: true,
+ Sections: []string{test.desc},
+ Verbose: true,
+ }
+ // capture h2spec output so it will be in log
+ oldStdout := os.Stdout
+ r, w, _ := os.Pipe()
+ os.Stdout = w
+
+ tg := h2spec_extras.Spec()
+ tg.Test(conf)
+
+ oChan := make(chan string)
+ go func() {
+ var buf bytes.Buffer
+ io.Copy(&buf, r)
+ oChan <- buf.String()
+ }()
+
+ // restore to normal state
+ w.Close()
+ os.Stdout = oldStdout
+ o := <-oChan
+ s.Log(o)
+ s.AssertEqual(0, tg.FailedCount)
+ }, SpecTimeout(TestTimeout))
+ }
+
+})
func init() {
RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest,
- VppConnectProxyGetTest, VppConnectProxyPutTest, VppHttpsConnectProxyGetTest)
+ VppConnectProxyGetTest, VppConnectProxyPutTest, VppHttpsConnectProxyGetTest, VppH2ConnectProxyGetTest,
+ VppH2ConnectProxyPutTest)
RegisterVppProxySoloTests(VppProxyHttpGetTcpMTTest, VppProxyHttpPutTcpMTTest, VppProxyTcpIperfMTTest,
VppProxyUdpIperfMTTest, VppConnectProxyStressTest, VppConnectProxyStressMTTest, VppConnectProxyConnectionFailedMTTest)
RegisterVppUdpProxyTests(VppProxyUdpTest, VppConnectUdpProxyTest, VppConnectUdpInvalidCapsuleTest,
RegisterNginxProxySoloTests(NginxMirroringTest, MirrorMultiThreadTest)
}
-func configureVppProxy(s *VppProxySuite, proto string, proxyPort uint16) {
- vppProxy := s.Containers.VppProxy.VppInstance
- cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri %s://%s:%d", proto, s.VppProxyAddr(), proxyPort)
- if proto != "http" && proto != "https" && proto != "udp" {
- proto = "tcp"
- }
- if proto != "http" && proto != "https" {
- cmd += fmt.Sprintf(" client-uri %s://%s:%d", proto, s.ServerAddr(), s.Ports.Server)
- }
-
- output := vppProxy.Vppctl(cmd)
- s.Log("proxy configured: " + output)
-}
-
func VppProxyHttpGetTcpMTTest(s *VppProxySuite) {
VppProxyHttpGetTcpTest(s)
}
s.AssertNil(vppProxy.DeleteTap(s.Interfaces.Client))
s.AssertNil(vppProxy.CreateTap(s.Interfaces.Client, false, 2, uint32(s.Interfaces.Client.Peer.Index), Consistent_qp))
- configureVppProxy(s, "tcp", s.Ports.Proxy)
+ s.ConfigureVppProxy("tcp", s.Ports.Proxy)
if proto == "udp" {
- configureVppProxy(s, "udp", s.Ports.Proxy)
+ s.ConfigureVppProxy("udp", s.Ports.Proxy)
proto = "-u"
} else {
proto = ""
func VppProxyHttpGetTcpTest(s *VppProxySuite) {
s.SetupNginxServer()
- configureVppProxy(s, "tcp", s.Ports.Proxy)
+ s.ConfigureVppProxy("tcp", s.Ports.Proxy)
uri := fmt.Sprintf("http://%s:%d/httpTestFile", s.VppProxyAddr(), s.Ports.Proxy)
s.CurlDownloadResource(uri)
}
func VppProxyHttpGetTlsTest(s *VppProxySuite) {
s.SetupNginxServer()
- configureVppProxy(s, "tls", s.Ports.Proxy)
+ s.ConfigureVppProxy("tls", s.Ports.Proxy)
uri := fmt.Sprintf("https://%s:%d/httpTestFile", s.VppProxyAddr(), s.Ports.Proxy)
s.CurlDownloadResource(uri)
}
func VppProxyHttpPutTcpTest(s *VppProxySuite) {
s.SetupNginxServer()
- configureVppProxy(s, "tcp", s.Ports.Proxy)
+ s.ConfigureVppProxy("tcp", s.Ports.Proxy)
uri := fmt.Sprintf("http://%s:%d/upload/testFile", s.VppProxyAddr(), s.Ports.Proxy)
s.CurlUploadResource(uri, CurlContainerTestFile)
}
func VppProxyHttpPutTlsTest(s *VppProxySuite) {
s.SetupNginxServer()
- configureVppProxy(s, "tls", s.Ports.Proxy)
+ s.ConfigureVppProxy("tls", s.Ports.Proxy)
uri := fmt.Sprintf("https://%s:%d/upload/testFile", s.VppProxyAddr(), s.Ports.Proxy)
s.CurlUploadResource(uri, CurlContainerTestFile)
}
func VppConnectProxyGetTest(s *VppProxySuite) {
s.SetupNginxServer()
- configureVppProxy(s, "http", s.Ports.Proxy)
+ s.ConfigureVppProxy("http", s.Ports.Proxy)
targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server)
proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
func VppHttpsConnectProxyGetTest(s *VppProxySuite) {
s.SetupNginxServer()
- configureVppProxy(s, "https", s.Ports.Proxy)
+ s.ConfigureVppProxy("https", s.Ports.Proxy)
targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server)
proxyUri := fmt.Sprintf("https://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
s.CurlDownloadResourceViaTunnel(targetUri, proxyUri)
}
+func VppH2ConnectProxyGetTest(s *VppProxySuite) {
+ s.SetupNginxServer()
+ s.ConfigureVppProxy("https", s.Ports.Proxy)
+
+ targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server)
+ proxyUri := fmt.Sprintf("https://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
+ _, log := s.CurlDownloadResourceViaTunnel(targetUri, proxyUri, "--proxy-http2")
+ // ALPN result check
+ s.AssertContains(log, "CONNECT tunnel: HTTP/2 negotiated")
+}
+
func VppConnectProxyConnectionFailedMTTest(s *VppProxySuite) {
s.SetupNginxServer()
- configureVppProxy(s, "http", s.Ports.Proxy)
+ s.ConfigureVppProxy("http", s.Ports.Proxy)
targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ServerAddr(), s.Ports.Server+1)
proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
func VppConnectProxyPutTest(s *VppProxySuite) {
s.SetupNginxServer()
- configureVppProxy(s, "http", s.Ports.Proxy)
+ s.ConfigureVppProxy("http", s.Ports.Proxy)
proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
targetUri := fmt.Sprintf("http://%s:%d/upload/testFile", s.ServerAddr(), s.Ports.Server)
s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile)
}
+func VppH2ConnectProxyPutTest(s *VppProxySuite) {
+ s.SetupNginxServer()
+ s.ConfigureVppProxy("https", s.Ports.Proxy)
+
+ proxyUri := fmt.Sprintf("https://%s:%d", s.VppProxyAddr(), s.Ports.Proxy)
+ targetUri := fmt.Sprintf("http://%s:%d/upload/testFile", s.ServerAddr(), s.Ports.Server)
+ _, log := s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile, "--proxy-http2")
+ // ALPN result check
+ s.AssertContains(log, "CONNECT tunnel: HTTP/2 negotiated")
+}
+
func vppConnectProxyStressLoad(s *VppProxySuite, proxyPort string) {
var (
connectError, timeout, readError, writeError, invalidData, total atomic.Uint32
remoteServerConn := s.StartEchoServer()
defer remoteServerConn.Close()
- configureVppProxy(s, "http", s.Ports.Proxy)
+ s.ConfigureVppProxy("http", s.Ports.Proxy)
// no goVPP less noise
s.Containers.VppProxy.VppInstance.Disconnect()
s.AssertNil(vppProxy.DeleteTap(s.Interfaces.Client))
s.AssertNil(vppProxy.CreateTap(s.Interfaces.Client, false, 2, uint32(s.Interfaces.Client.Peer.Index), Consistent_qp))
- configureVppProxy(s, "http", s.Ports.Proxy)
+ s.ConfigureVppProxy("http", s.Ports.Proxy)
// no goVPP less noise
vppProxy.Disconnect()
#include <http/http2/frame.h>
#include <http/http_private.h>
#include <http/http_timer.h>
+#include <http/http_status_codes.h>
#define HTTP2_WIN_SIZE_MAX 0x7FFFFFFF
#define HTTP2_INITIAL_WIN_SIZE 65535
clib_llist_anchor_t sched_list;
void (*dispatch_headers_cb) (struct http2_req_ *req, http_conn_t *hc,
u8 *n_emissions, clib_llist_index_t *next_ri);
+ void (*dispatch_data_cb) (struct http2_req_ *req, http_conn_t *hc,
+ u8 *n_emissions);
} http2_req_t;
#define foreach_http2_conn_flags \
http_io_ts_after_write (hc, 1);
}
+always_inline void
+http2_tunnel_send_close (http_conn_t *hc, http2_req_t *req)
+{
+ u8 *response;
+
+ response = http_get_tx_buf (hc);
+ http2_frame_write_data_header (0, req->stream_id,
+ HTTP2_FRAME_FLAG_END_STREAM, response);
+ http_io_ts_write (hc, response, HTTP2_FRAME_HEADER_SIZE, 0);
+ http_io_ts_after_write (hc, 1);
+}
+
/* send RST_STREAM frame and notify app */
always_inline void
http2_stream_error (http_conn_t *hc, http2_req_t *req, http2_error_t error,
/* stream TX scheduler */
/***********************/
+static void
+http2_sched_dispatch_data (http2_req_t *req, http_conn_t *hc, u8 *n_emissions)
+{
+ u32 max_write, max_read, n_segs, n_read, n_written = 0;
+ svm_fifo_seg_t *app_segs, *segs = 0;
+ http_buffer_t *hb = &req->base.tx_buf;
+ u8 fh[HTTP2_FRAME_HEADER_SIZE];
+ u8 finished = 0, flags = 0;
+ http2_conn_ctx_t *h2c;
+
+ ASSERT (http_buffer_bytes_left (hb) > 0);
+
+ *n_emissions += hb->type == HTTP_BUFFER_PTR ? HTTP2_SCHED_WEIGHT_DATA_PTR :
+ HTTP2_SCHED_WEIGHT_DATA_INLINE;
+
+ h2c = http2_conn_ctx_get_w_thread (hc);
+
+ max_write = http_io_ts_max_write (hc, 0);
+ max_write -= HTTP2_FRAME_HEADER_SIZE;
+ max_write = clib_min (max_write, (u32) req->peer_window);
+ max_write = clib_min (max_write, h2c->peer_window);
+ max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
+
+ max_read = http_buffer_bytes_left (hb);
+
+ n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs);
+ if (n_read == 0)
+ {
+ HTTP_DBG (1, "no data to deq");
+ transport_connection_reschedule (&req->base.connection);
+ return;
+ }
+
+ finished = (max_read - n_read) == 0;
+ flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0;
+ http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
+ vec_validate (segs, 0);
+ segs[0].len = HTTP2_FRAME_HEADER_SIZE;
+ segs[0].data = fh;
+ vec_append (segs, app_segs);
+
+ n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0);
+ n_written -= HTTP2_FRAME_HEADER_SIZE;
+ vec_free (segs);
+ http_buffer_drain (hb, n_written);
+ req->peer_window -= n_written;
+ h2c->peer_window -= n_written;
+
+ if (finished)
+ {
+ /* all done, close stream */
+ http_buffer_free (hb);
+ if (hc->flags & HTTP_CONN_F_IS_SERVER)
+ http2_stream_close (req, hc);
+ else
+ req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+ }
+ else
+ {
+ if (req->peer_window == 0)
+ {
+ /* mark that we need window update on stream */
+ HTTP_DBG (1, "stream window is full");
+ req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+ }
+ else
+ {
+ /* schedule for next round */
+ HTTP_DBG (1, "adding to data queue req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
+ http2_req_schedule_data_tx (hc, req);
+ http_io_as_dequeue_notify (&req->base, n_written);
+ }
+ }
+
+ http_io_ts_after_write (hc, finished);
+}
+
+static void
+http2_sched_dispatch_tunnel (http2_req_t *req, http_conn_t *hc,
+ u8 *n_emissions)
+{
+ u32 max_write, max_read, n_segs = 2, n_read, n_written = 0;
+ svm_fifo_seg_t segs[n_segs + 1];
+ u8 fh[HTTP2_FRAME_HEADER_SIZE];
+ u8 flags = 0;
+ http2_conn_ctx_t *h2c;
+
+ *n_emissions += HTTP2_SCHED_WEIGHT_DATA_INLINE;
+
+ h2c = http2_conn_ctx_get_w_thread (hc);
+
+ max_read = http_io_as_max_read (&req->base);
+ if (max_read == 0)
+ {
+ HTTP_DBG (2, "max_read == 0");
+ transport_connection_reschedule (&req->base.connection);
+ return;
+ }
+ max_write = http_io_ts_max_write (hc, 0);
+ max_write -= HTTP2_FRAME_HEADER_SIZE;
+ max_write = clib_min (max_write, (u32) req->peer_window);
+ max_write = clib_min (max_write, h2c->peer_window);
+ max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
+ n_read = clib_min (max_write, max_read);
+
+ if (req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED &&
+ max_write >= max_read)
+ {
+ HTTP_DBG (1, "closing tunnel");
+ session_transport_closed_notify (&req->base.connection);
+ flags = HTTP2_FRAME_FLAG_END_STREAM;
+ }
+ http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
+ segs[0].len = HTTP2_FRAME_HEADER_SIZE;
+ segs[0].data = fh;
+
+ http_io_as_read_segs (&req->base, segs + 1, &n_segs, n_read);
+
+ n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0);
+ n_written -= HTTP2_FRAME_HEADER_SIZE;
+ http_io_as_drain (&req->base, n_written);
+ req->peer_window -= n_written;
+ h2c->peer_window -= n_written;
+
+ if (req->peer_window == 0)
+ {
+ /* mark that we need window update on stream */
+ HTTP_DBG (1, "stream window is full");
+ req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+ }
+ else if (max_read - n_written)
+ {
+ /* schedule for next round if we have more data */
+ HTTP_DBG (1, "adding to data queue req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
+ http2_req_schedule_data_tx (hc, req);
+ }
+ else
+ transport_connection_reschedule (&req->base.connection);
+
+ http_io_ts_after_write (hc, 0);
+}
+
static void
http2_sched_dispatch_continuation (http2_req_t *req, http_conn_t *hc,
u8 *n_emissions,
if (http_buffer_bytes_left (&req->base.tx_buf))
{
/* start sending the actual data */
+ req->dispatch_data_cb = http2_sched_dispatch_data;
HTTP_DBG (1, "adding to data queue req_index %x",
((http_req_handle_t) req->base.hr_req_handle).req_index);
http2_req_schedule_data_tx (hc, req);
stream_id = req->stream_id;
+ /* tunnel established if 2xx (Successful) response to CONNECT */
+ req->base.is_tunnel =
+ (req->base.is_tunnel && http_status_code_str[msg.code][0] == '2');
+
/* END_STREAM flag need to be set in HEADERS frame */
if (msg.data.body_len)
{
+ ASSERT (req->base.is_tunnel == 0);
http_req_tx_buffer_init (&req->base, &msg);
http_io_as_dequeue_notify (&req->base, n_deq);
}
else
- flags |= HTTP2_FRAME_FLAG_END_STREAM;
+ flags |= req->base.is_tunnel ? 0 : HTTP2_FRAME_FLAG_END_STREAM;
if (headers_len <= max_write)
{
if (msg.data.body_len)
{
/* start sending the actual data */
+ req->dispatch_data_cb = http2_sched_dispatch_data;
HTTP_DBG (1, "adding to data queue req_index %x",
((http_req_handle_t) req->base.hr_req_handle).req_index);
http2_req_schedule_data_tx (hc, req);
}
+ else if (req->base.is_tunnel)
+ {
+ req->dispatch_data_cb = http2_sched_dispatch_tunnel;
+ transport_connection_reschedule (&req->base.connection);
+ /* cleanup some stuff we don't need anymore in tunnel mode */
+ vec_free (req->base.headers);
+ }
else
- http2_stream_close (req, hc);
+ {
+ /* otherwise we are done */
+ http2_stream_close (req, hc);
+ }
}
else
{
http_io_ts_after_write (hc, 0);
}
-static void
-http2_sched_dispatch_data (http2_req_t *req, http_conn_t *hc, u8 *n_emissions)
-{
- u32 max_write, max_read, n_segs, n_read, n_written = 0;
- svm_fifo_seg_t *app_segs, *segs = 0;
- http_buffer_t *hb = &req->base.tx_buf;
- u8 fh[HTTP2_FRAME_HEADER_SIZE];
- u8 finished = 0, flags = 0;
- http2_conn_ctx_t *h2c;
-
- ASSERT (http_buffer_bytes_left (hb) > 0);
-
- *n_emissions += hb->type == HTTP_BUFFER_PTR ? HTTP2_SCHED_WEIGHT_DATA_PTR :
- HTTP2_SCHED_WEIGHT_DATA_INLINE;
-
- h2c = http2_conn_ctx_get_w_thread (hc);
-
- max_write = http_io_ts_max_write (hc, 0);
- max_write -= HTTP2_FRAME_HEADER_SIZE;
- max_write = clib_min (max_write, (u32) req->peer_window);
- max_write = clib_min (max_write, h2c->peer_window);
- max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
-
- max_read = http_buffer_bytes_left (hb);
-
- n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs);
- if (n_read == 0)
- {
- HTTP_DBG (1, "no data to deq");
- transport_connection_reschedule (&req->base.connection);
- return;
- }
-
- finished = (max_read - n_read) == 0;
- flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0;
- http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
- vec_validate (segs, 0);
- segs[0].len = HTTP2_FRAME_HEADER_SIZE;
- segs[0].data = fh;
- vec_append (segs, app_segs);
-
- n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0);
- ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read));
- vec_free (segs);
- http_buffer_drain (hb, n_read);
- req->peer_window -= n_read;
- h2c->peer_window -= n_read;
-
- if (finished)
- {
- /* all done, close stream */
- http_buffer_free (hb);
- if (hc->flags & HTTP_CONN_F_IS_SERVER)
- http2_stream_close (req, hc);
- else
- req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
- }
- else
- {
- if (req->peer_window == 0)
- {
- /* mark that we need window update on stream */
- HTTP_DBG (1, "stream window is full");
- req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
- }
- else
- {
- /* schedule for next round */
- HTTP_DBG (1, "adding to data queue req_index %x",
- ((http_req_handle_t) req->base.hr_req_handle).req_index);
- http2_req_schedule_data_tx (hc, req);
- http_io_as_dequeue_notify (&req->base, n_read);
- }
- }
-
- http_io_ts_after_write (hc, finished);
-}
-
static void
http2_update_time_callback (f64 now, u8 thread_index)
{
1, "sending data req_index %x",
((http_req_handle_t) req->base.hr_req_handle).req_index);
clib_llist_remove (wrk->req_pool, sched_list, req);
- http2_sched_dispatch_data (req, hc, &n_emissions);
+ req->dispatch_data_cb (req, hc, &n_emissions);
if (ri == old_ti)
break;
http2_conn_ctx_t *h2c;
hpack_request_control_data_t control_data;
http_msg_t msg;
+ u8 *p;
int rv;
http_req_state_t new_state = HTTP_REQ_STATE_WAIT_APP_REPLY;
http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
return HTTP_SM_STOP;
}
- if (control_data.method == HTTP_REQ_UNKNOWN ||
- control_data.method == HTTP_REQ_CONNECT)
+ if (control_data.method == HTTP_REQ_UNKNOWN)
{
HTTP_DBG (1, "unsupported method");
http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
return HTTP_SM_STOP;
}
- if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_AUTHORITY_PARSED) &&
- control_data.method != HTTP_REQ_CONNECT)
+ if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_AUTHORITY_PARSED))
{
- HTTP_DBG (1, ":path pseudo-header missing in request");
+ HTTP_DBG (1, ":authority pseudo-header missing in request");
http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
return HTTP_SM_STOP;
}
+ if (control_data.method == HTTP_REQ_CONNECT)
+ {
+ if (control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_SCHEME_PARSED ||
+ control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PATH_PARSED)
+ {
+ HTTP_DBG (1, ":scheme and :path pseudo-header must be omitted for "
+ "CONNECT method");
+ http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+ return HTTP_SM_STOP;
+ }
+ /* quick check if port is present */
+ p = control_data.authority + control_data.authority_len;
+ p--;
+ if (!isdigit (*p))
+ {
+ HTTP_DBG (1, "port not present in authority");
+ http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+ return HTTP_SM_STOP;
+ }
+ p--;
+ for (; p > control_data.authority; p--)
+ {
+ if (!isdigit (*p))
+ break;
+ }
+ if (*p != ':')
+ {
+ HTTP_DBG (1, "port not present in authority");
+ http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+ return HTTP_SM_STOP;
+ }
+ req->base.is_tunnel = 1;
+ http_io_as_add_want_read_ntf (&req->base);
+ }
req->base.control_data_len = control_data.control_data_len;
req->base.headers_offset = control_data.headers - wrk->header_list;
http_io_as_add_want_read_ntf (&req->base);
}
/* TODO: message framing without content length using END_STREAM flag */
- if (req->base.body_len == 0 && req->stream_state == HTTP2_STREAM_STATE_OPEN)
+ if (req->base.body_len == 0 &&
+ req->stream_state == HTTP2_STREAM_STATE_OPEN &&
+ control_data.method != HTTP_REQ_CONNECT)
{
HTTP_DBG (1, "no content-length and DATA frame expected");
*error = HTTP2_ERROR_INTERNAL_ERROR;
}
req->base.to_recv = req->base.body_len;
- req->base.target_path_len = control_data.path_len;
- req->base.target_path_offset = control_data.path - wrk->header_list;
- /* drop leading slash */
- req->base.target_path_offset++;
- req->base.target_path_len--;
req->base.target_query_offset = 0;
req->base.target_query_len = 0;
- http_identify_optional_query (&req->base, wrk->header_list);
+ if (control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PATH_PARSED)
+ {
+ req->base.target_path_len = control_data.path_len;
+ req->base.target_path_offset = control_data.path - wrk->header_list;
+ /* drop leading slash */
+ req->base.target_path_offset++;
+ req->base.target_path_len--;
+ http_identify_optional_query (&req->base, wrk->header_list);
+ }
+ else
+ {
+ req->base.target_path_len = 0;
+ req->base.target_path_offset = 0;
+ }
msg.type = HTTP_MSG_REQUEST;
msg.method_type = control_data.method;
return HTTP_SM_STOP;
}
+static http_sm_result_t
+http2_req_state_tunnel_rx (http_conn_t *hc, http2_req_t *req,
+ transport_send_params_t *sp, http2_error_t *error)
+{
+ u32 max_enq;
+
+ HTTP_DBG (1, "tunnel received data from client");
+
+ max_enq = http_io_as_max_write (&req->base);
+ if (max_enq < req->payload_len)
+ {
+ clib_warning ("app's rx fifo full");
+ http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp);
+ return HTTP_SM_STOP;
+ }
+ http_io_as_write (&req->base, req->payload, req->payload_len);
+ http_app_worker_rx_notify (&req->base);
+
+ return HTTP_SM_STOP;
+}
+
/*************************************/
/* request state machine handlers TX */
/*************************************/
clib_llist_add_tail (wrk->req_pool, sched_list, req, he);
http2_conn_schedule (h2c, hc->c_thread_index);
- http_req_state_change (&req->base, HTTP_REQ_STATE_APP_IO_MORE_DATA);
+ http_req_state_change (&req->base, req->base.is_tunnel ?
+ HTTP_REQ_STATE_TUNNEL :
+ HTTP_REQ_STATE_APP_IO_MORE_DATA);
http_req_deschedule (&req->base, sp);
return HTTP_SM_STOP;
return HTTP_SM_STOP;
}
+static http_sm_result_t
+http2_req_state_tunnel_tx (http_conn_t *hc, http2_req_t *req,
+ transport_send_params_t *sp, http2_error_t *error)
+{
+ http2_conn_ctx_t *h2c;
+
+ ASSERT (!clib_llist_elt_is_linked (req, sched_list));
+
+ HTTP_DBG (1, "tunnel received data from target");
+
+ /* add data back to stream scheduler */
+ HTTP_DBG (1, "adding to data queue req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
+ http2_req_schedule_data_tx (hc, req);
+ h2c = http2_conn_ctx_get_w_thread (hc);
+ if (h2c->peer_window > 0)
+ http2_conn_schedule (h2c, hc->c_thread_index);
+
+ http_req_deschedule (&req->base, sp);
+
+ return HTTP_SM_STOP;
+}
+
/*************************/
/* request state machine */
/*************************/
0, /* wait transport method */
http2_req_state_wait_app_reply,
http2_req_state_app_io_more_data,
- 0, /* tunnel */
+ http2_req_state_tunnel_tx,
0, /* udp tunnel */
};
http2_req_state_wait_transport_method,
0, /* wait app reply */
0, /* app io more data */
- 0, /* tunnel */
+ http2_req_state_tunnel_rx,
0, /* udp tunnel */
};
}
if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM)
- req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+ {
+ req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+ if (req->base.is_tunnel)
+ {
+ session_transport_closing_notify (&req->base.connection);
+ HTTP_DBG (1, "client closed tunnel");
+ /* final DATA frame could be empty */
+ if (fh->length == 0)
+ return HTTP2_ERROR_NO_ERROR;
+ }
+ }
rx_buf = http_get_rx_buf (hc);
vec_validate (rx_buf, fh->length - 1);
http2_app_rx_evt_callback (http_conn_t *hc, u32 req_index,
clib_thread_index_t thread_index)
{
- /* TODO: continue tunnel RX */
http2_req_t *req;
u8 *response;
u32 increment;
response = http_get_tx_buf (hc);
increment = http_io_as_max_write (&req->base) - req->our_window;
HTTP_DBG (1, "stream window increment %u", increment);
+ if (increment == 0)
+ return;
req->our_window += increment;
http2_frame_write_window_update (increment, req->stream_id, &response);
http_io_ts_write (hc, response, vec_len (response), 0);
HTTP_DBG (1, "nothing more to send, confirm close");
session_transport_closed_notify (&req->base.connection);
}
+ else if (req->base.is_tunnel)
+ {
+ switch (req->stream_state)
+ {
+ case HTTP2_STREAM_STATE_OPEN:
+ HTTP_DBG (1, "proxy closing connection");
+ req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+ if (http_io_as_max_read (&req->base))
+ {
+ HTTP_DBG (1, "wait for all data to be written to ts");
+ req->flags |= HTTP2_REQ_F_APP_CLOSED;
+ }
+ else
+ {
+ HTTP_DBG (1, "nothing more to send, closing tunnel");
+ http2_tunnel_send_close (hc, req);
+ }
+ break;
+ case HTTP2_STREAM_STATE_HALF_CLOSED:
+ HTTP_DBG (1, "proxy confirmed close");
+ http2_tunnel_send_close (hc, req);
+ session_transport_closed_notify (&req->base.connection);
+ break;
+ default:
+ session_transport_closed_notify (&req->base.connection);
+ break;
+ }
+ }
else
{
HTTP_DBG (1, "wait for all data to be written to ts");