http: CONNECT method for tunnelling 88/41588/6
authorMatus Fabian <[email protected]>
Fri, 20 Sep 2024 14:34:59 +0000 (16:34 +0200)
committerFlorin Coras <[email protected]>
Fri, 1 Nov 2024 22:25:45 +0000 (22:25 +0000)
Type: improvement

Change-Id: I6af16ddcc6734bb831227ce65cb39e87294fc4cd
Signed-off-by: Matus Fabian <[email protected]>
extras/hs-test/proxy_test.go
src/plugins/hs_apps/proxy.c
src/plugins/hs_apps/proxy.h
src/plugins/http/http.c
src/plugins/http/http.h

index b914242..5ca151f 100644 (file)
@@ -7,7 +7,8 @@ import (
 )
 
 func init() {
-       RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest)
+       RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest,
+               VppConnectProxyGetTest, VppConnectProxyPutTest)
        RegisterEnvoyProxyTests(EnvoyProxyHttpGetTcpTest, EnvoyProxyHttpPutTcpTest)
        RegisterNginxProxyTests(NginxMirroringTest)
        RegisterNginxProxySoloTests(MirrorMultiThreadTest)
@@ -15,14 +16,11 @@ func init() {
 
 func configureVppProxy(s *VppProxySuite, proto string, proxyPort uint16) {
        vppProxy := s.GetContainerByName(VppProxyContainerName).VppInstance
-       output := vppProxy.Vppctl(
-               "test proxy server server-uri %s://%s/%d client-uri tcp://%s/%d",
-               proto,
-               s.VppProxyAddr(),
-               proxyPort,
-               s.NginxAddr(),
-               s.NginxPort(),
-       )
+       cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri %s://%s/%d", proto, s.VppProxyAddr(), proxyPort)
+       if proto != "http" {
+               cmd += fmt.Sprintf(" client-uri tcp://%s/%d", s.NginxAddr(), s.NginxPort())
+       }
+       output := vppProxy.Vppctl(cmd)
        s.Log("proxy configured: " + output)
 }
 
@@ -83,3 +81,23 @@ func nginxMirroring(s *NginxProxySuite, multiThreadWorkers bool) {
        uri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ProxyAddr(), s.ProxyPort())
        s.CurlDownloadResource(uri)
 }
+
+func VppConnectProxyGetTest(s *VppProxySuite) {
+       var proxyPort uint16 = 8080
+
+       configureVppProxy(s, "http", proxyPort)
+
+       targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.NginxAddr(), s.NginxPort())
+       proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), proxyPort)
+       s.CurlDownloadResourceViaTunnel(targetUri, proxyUri)
+}
+
+func VppConnectProxyPutTest(s *VppProxySuite) {
+       var proxyPort uint16 = 8080
+
+       configureVppProxy(s, "http", proxyPort)
+
+       proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), proxyPort)
+       targetUri := fmt.Sprintf("http://%s:%d/upload/testFile", s.NginxAddr(), s.NginxPort())
+       s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile)
+}
index 999f8d1..0d24ebc 100644 (file)
@@ -19,6 +19,8 @@
 #include <vnet/session/application_interface.h>
 #include <hs_apps/proxy.h>
 #include <vnet/tcp/tcp.h>
+#include <http/http.h>
+#include <http/http_header_names.h>
 
 proxy_main_t proxy_main;
 
@@ -49,6 +51,41 @@ proxy_session_side_ctx_get (proxy_worker_t *wrk, u32 ctx_index)
   return pool_elt_at_index (wrk->ctx_pool, ctx_index);
 }
 
+static void
+proxy_send_http_resp (session_t *s, http_status_code_t sc,
+                     http_header_t *resp_headers)
+{
+  http_msg_t msg;
+  int rv;
+  u8 *headers_buf = 0;
+
+  if (vec_len (resp_headers))
+    {
+      headers_buf = http_serialize_headers (resp_headers);
+      msg.data.len = msg.data.headers_len = vec_len (headers_buf);
+    }
+  else
+    msg.data.len = msg.data.headers_len = 0;
+
+  msg.type = HTTP_MSG_REPLY;
+  msg.code = sc;
+  msg.data.type = HTTP_MSG_DATA_INLINE;
+  msg.data.headers_offset = 0;
+  msg.data.body_len = 0;
+  msg.data.body_offset = 0;
+  rv = svm_fifo_enqueue (s->tx_fifo, sizeof (msg), (u8 *) &msg);
+  ASSERT (rv == sizeof (msg));
+  if (msg.data.headers_len)
+    {
+      rv = svm_fifo_enqueue (s->tx_fifo, vec_len (headers_buf), headers_buf);
+      ASSERT (rv == vec_len (headers_buf));
+      vec_free (headers_buf);
+    }
+
+  if (svm_fifo_set_event (s->tx_fifo))
+    session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
+}
+
 static void
 proxy_do_connect (vnet_connect_args_t *a)
 {
@@ -387,6 +424,7 @@ proxy_accept_callback (session_t * s)
   proxy_session_side_ctx_t *sc;
   proxy_session_t *ps;
   proxy_worker_t *wrk;
+  transport_proto_t tp = session_get_transport_proto (s);
 
   wrk = proxy_worker_get (s->thread_index);
   sc = proxy_session_side_ctx_alloc (wrk);
@@ -402,6 +440,7 @@ proxy_accept_callback (session_t * s)
 
   ps->ao.session_handle = SESSION_INVALID_HANDLE;
   sc->ps_index = ps->ps_index;
+  sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0;
 
   clib_spinlock_unlock_if_init (&pm->sessions_lock);
 
@@ -450,6 +489,7 @@ proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s)
   proxy_main_t *pm = &proxy_main;
   u32 max_dequeue, ps_index;
   proxy_session_t *ps;
+  transport_proto_t tp = session_get_transport_proto (s);
 
   clib_spinlock_lock_if_init (&pm->sessions_lock);
 
@@ -467,20 +507,79 @@ proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s)
 
   clib_spinlock_unlock_if_init (&pm->sessions_lock);
 
-  max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
-  if (PREDICT_FALSE (max_dequeue == 0))
-    return;
+  if (tp == TRANSPORT_PROTO_HTTP)
+    {
+      http_msg_t msg;
+      u8 *target_buf = 0;
+      http_uri_t target_uri;
+      http_header_t *resp_headers = 0;
+      session_endpoint_cfg_t target_sep = SESSION_ENDPOINT_CFG_NULL;
+      int rv;
 
-  max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
-  actual_transfer = svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */,
-                                  max_dequeue, pm->rx_buf[s->thread_index]);
+      rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg);
+      ASSERT (rv == sizeof (msg));
 
-  /* Expectation is that here actual data just received is parsed and based
-   * on its contents, the destination and parameters of the connect to the
-   * upstream are decided
-   */
+      if (msg.type != HTTP_MSG_REQUEST)
+       {
+         proxy_send_http_resp (s, HTTP_STATUS_INTERNAL_ERROR, 0);
+         return;
+       }
+      if (msg.method_type != HTTP_REQ_CONNECT)
+       {
+         http_add_header (&resp_headers,
+                          http_header_name_token (HTTP_HEADER_ALLOW),
+                          http_token_lit ("CONNECT"));
+         proxy_send_http_resp (s, HTTP_STATUS_METHOD_NOT_ALLOWED,
+                               resp_headers);
+         vec_free (resp_headers);
+         return;
+       }
+
+      if (msg.data.target_form != HTTP_TARGET_AUTHORITY_FORM ||
+         msg.data.target_path_len == 0)
+       {
+         proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0);
+         return;
+       }
+
+      /* read target uri */
+      target_buf = vec_new (u8, msg.data.target_path_len);
+      rv = svm_fifo_peek (s->rx_fifo, msg.data.target_path_offset,
+                         msg.data.target_path_len, target_buf);
+      ASSERT (rv == msg.data.target_path_len);
+      svm_fifo_dequeue_drop (s->rx_fifo, msg.data.len);
+      rv = http_parse_authority_form_target (target_buf, &target_uri);
+      vec_free (target_buf);
+      if (rv)
+       {
+         proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0);
+         return;
+       }
+      target_sep.is_ip4 = target_uri.is_ip4;
+      target_sep.ip = target_uri.ip;
+      target_sep.port = target_uri.port;
+      target_sep.transport_proto = TRANSPORT_PROTO_TCP;
+      clib_memcpy (&a->sep_ext, &target_sep, sizeof (target_sep));
+    }
+  else
+    {
+      max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
+      if (PREDICT_FALSE (max_dequeue == 0))
+       return;
+
+      max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
+      actual_transfer =
+       svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */, max_dequeue,
+                      pm->rx_buf[s->thread_index]);
+
+      /* Expectation is that here actual data just received is parsed and based
+       * on its contents, the destination and parameters of the connect to the
+       * upstream are decided
+       */
+
+      clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
+    }
 
-  clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
   a->api_context = ps_index;
   a->app_index = pm->active_open_app_index;
 
@@ -664,6 +763,8 @@ active_open_connected_callback (u32 app_index, u32 opaque,
   proxy_session_t *ps;
   proxy_worker_t *wrk;
   proxy_session_side_ctx_t *sc;
+  session_t *po_s;
+  transport_proto_t tp;
 
   /* Connection failed */
   if (err)
@@ -671,6 +772,12 @@ active_open_connected_callback (u32 app_index, u32 opaque,
       clib_spinlock_lock_if_init (&pm->sessions_lock);
 
       ps = proxy_session_get (opaque);
+      po_s = session_get_from_handle (ps->po.session_handle);
+      tp = session_get_transport_proto (po_s);
+      if (tp == TRANSPORT_PROTO_HTTP)
+       {
+         proxy_send_http_resp (po_s, HTTP_STATUS_BAD_GATEWAY, 0);
+       }
       ps->ao_disconnected = 1;
       proxy_session_close_po (ps);
 
@@ -700,6 +807,9 @@ active_open_connected_callback (u32 app_index, u32 opaque,
       return -1;
     }
 
+  po_s = session_get_from_handle (ps->po.session_handle);
+  tp = session_get_transport_proto (po_s);
+
   sc = proxy_session_side_ctx_alloc (wrk);
   sc->pair = ps->po;
   sc->ps_index = ps->ps_index;
@@ -708,13 +818,21 @@ active_open_connected_callback (u32 app_index, u32 opaque,
 
   sc->state = PROXY_SC_S_ESTABLISHED;
   s->opaque = sc->sc_index;
+  sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0;
 
-  /*
-   * Send event for active open tx fifo
-   */
-  ASSERT (s->thread_index == vlib_get_thread_index ());
-  if (svm_fifo_set_event (s->tx_fifo))
-    session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX);
+  if (tp == TRANSPORT_PROTO_HTTP)
+    {
+      proxy_send_http_resp (po_s, HTTP_STATUS_OK, 0);
+    }
+  else
+    {
+      /*
+       * Send event for active open tx fifo
+       */
+      ASSERT (s->thread_index == vlib_get_thread_index ());
+      if (svm_fifo_set_event (s->tx_fifo))
+       session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX);
+    }
 
   return 0;
 }
@@ -881,11 +999,21 @@ active_open_tx_callback (session_t * ao_s)
   if (sc->state < PROXY_SC_S_ESTABLISHED)
     return 0;
 
-  /* Force ack on proxy side to update rcv wnd */
-  void *arg = uword_to_pointer (sc->pair.session_handle, void *);
-  session_send_rpc_evt_to_thread (
-    session_thread_from_handle (sc->pair.session_handle), proxy_force_ack,
-    arg);
+  if (sc->is_http)
+    {
+      /* notify HTTP transport */
+      session_t *po = session_get_from_handle (sc->pair.session_handle);
+      session_send_io_evt_to_thread_custom (
+       &po->session_index, po->thread_index, SESSION_IO_EVT_RX);
+    }
+  else
+    {
+      /* Force ack on proxy side to update rcv wnd */
+      void *arg = uword_to_pointer (sc->pair.session_handle, void *);
+      session_send_rpc_evt_to_thread (
+       session_thread_from_handle (sc->pair.session_handle), proxy_force_ack,
+       arg);
+    }
 
   return 0;
 }
@@ -1066,11 +1194,6 @@ proxy_server_create (vlib_main_t * vm)
       clib_warning ("failed to attach server app");
       return -1;
     }
-  if (proxy_server_listen ())
-    {
-      clib_warning ("failed to start listening");
-      return -1;
-    }
   if (active_open_attach ())
     {
       clib_warning ("failed to attach active open app");
@@ -1147,38 +1270,45 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
                    default_server_uri);
       server_uri = format (0, "%s%c", default_server_uri, 0);
     }
-  if (!client_uri)
-    {
-      clib_warning ("No client-uri provided, Using default: %s",
-                   default_client_uri);
-      client_uri = format (0, "%s%c", default_client_uri, 0);
-    }
-
   if (parse_uri ((char *) server_uri, &pm->server_sep))
     {
       error = clib_error_return (0, "Invalid server uri %v", server_uri);
       goto done;
     }
-  if (parse_uri ((char *) client_uri, &pm->client_sep))
+
+  /* http proxy get target within request */
+  if (pm->server_sep.transport_proto != TRANSPORT_PROTO_HTTP)
     {
-      error = clib_error_return (0, "Invalid client uri %v", client_uri);
-      goto done;
+      if (!client_uri)
+       {
+         clib_warning ("No client-uri provided, Using default: %s",
+                       default_client_uri);
+         client_uri = format (0, "%s%c", default_client_uri, 0);
+       }
+      if (parse_uri ((char *) client_uri, &pm->client_sep))
+       {
+         error = clib_error_return (0, "Invalid client uri %v", client_uri);
+         goto done;
+       }
     }
 
-  session_enable_disable_args_t args = { .is_en = 1,
-                                        .rt_engine_type =
-                                          RT_BACKEND_ENGINE_RULE_TABLE };
-  vnet_session_enable_disable (vm, &args);
-
-  rv = proxy_server_create (vm);
-  switch (rv)
+  if (pm->server_app_index == APP_INVALID_INDEX)
     {
-    case 0:
-      break;
-    default:
-      error = clib_error_return (0, "server_create returned %d", rv);
+      session_enable_disable_args_t args = { .is_en = 1,
+                                            .rt_engine_type =
+                                              RT_BACKEND_ENGINE_RULE_TABLE };
+      vnet_session_enable_disable (vm, &args);
+      rv = proxy_server_create (vm);
+      if (rv)
+       {
+         error = clib_error_return (0, "server_create returned %d", rv);
+         goto done;
+       }
     }
 
+  if (proxy_server_listen ())
+    error = clib_error_return (0, "failed to start listening");
+
 done:
   unformat_free (line_input);
   vec_free (client_uri);
@@ -1186,14 +1316,13 @@ done:
   return error;
 }
 
-VLIB_CLI_COMMAND (proxy_create_command, static) =
-{
+VLIB_CLI_COMMAND (proxy_create_command, static) = {
   .path = "test proxy server",
-  .short_help = "test proxy server [server-uri <tcp://ip/port>]"
-      "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
-      "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
-      "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
-      "[private-segment-size <mem>][private-segment-count <nn>]",
+  .short_help = "test proxy server [server-uri <proto://ip/port>]"
+               "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
+               "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
+               "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
+               "[private-segment-size <mem>][private-segment-count <nn>]",
   .function = proxy_server_create_command_fn,
 };
 
@@ -1203,6 +1332,7 @@ proxy_main_init (vlib_main_t * vm)
   proxy_main_t *pm = &proxy_main;
   pm->server_client_index = ~0;
   pm->active_open_client_index = ~0;
+  pm->server_app_index = APP_INVALID_INDEX;
 
   return 0;
 }
index 86db69c..789e561 100644 (file)
@@ -51,6 +51,7 @@ typedef struct proxy_session_side_ctx_
   proxy_session_side_state_t state;
   u32 sc_index;
   u32 ps_index;
+  u8 is_http;
 } proxy_session_side_ctx_t;
 
 typedef struct
index 1a92797..6659de9 100644 (file)
@@ -447,9 +447,9 @@ static const char *http_error_template = "HTTP/1.1 %s\r\n"
  */
 static const char *http_response_template = "HTTP/1.1 %s\r\n"
                                            "Date: %U GMT\r\n"
-                                           "Server: %v\r\n"
-                                           "Content-Length: %llu\r\n"
-                                           "%s";
+                                           "Server: %v\r\n";
+
+static const char *content_len_template = "Content-Length: %llu\r\n";
 
 /**
  * http request boilerplate
@@ -705,6 +705,13 @@ http_parse_request_line (http_conn_t *hc, http_status_code_t *ec)
       hc->method = HTTP_REQ_POST;
       hc->target_path_offset = method_offset + 5;
     }
+  else if (!memcmp (hc->rx_buf + method_offset, "CONNECT ", 8))
+    {
+      HTTP_DBG (0, "CONNECT method");
+      hc->method = HTTP_REQ_CONNECT;
+      hc->target_path_offset = method_offset + 8;
+      hc->is_tunnel = 1;
+    }
   else
     {
       if (hc->rx_buf[method_offset] - 'A' <= 'Z' - 'A')
@@ -930,6 +937,11 @@ http_identify_message_body (http_conn_t *hc, http_status_code_t *ec)
       HTTP_DBG (2, "no header, no message-body");
       return 0;
     }
+  if (hc->is_tunnel)
+    {
+      HTTP_DBG (2, "tunnel, no message-body");
+      return 0;
+    }
 
   /* TODO check for chunked transfer coding */
 
@@ -1271,11 +1283,21 @@ http_state_wait_app_reply (http_conn_t *hc, transport_send_params_t *sp)
                     /* Date */
                     format_clib_timebase_time, now,
                     /* Server */
-                    hc->app_name,
-                    /* Length */
-                    msg.data.body_len,
-                    /* Any headers from app? */
-                    msg.data.headers_len ? "" : "\r\n");
+                    hc->app_name);
+
+  /* RFC9110 9.3.6: A server MUST NOT send Content-Length header field in a
+   * 2xx (Successful) response to CONNECT. */
+  if (hc->is_tunnel && http_status_code_str[msg.code][0] == '2')
+    {
+      ASSERT (msg.data.body_len == 0);
+      hc->state = HTTP_CONN_STATE_TUNNEL;
+      /* cleanup some stuff we don't need anymore in tunnel mode */
+      http_conn_timer_stop (hc);
+      vec_free (hc->rx_buf);
+      http_buffer_free (&hc->tx_buf);
+    }
+  else
+    response = format (response, content_len_template, msg.data.body_len);
 
   /* Add headers from app (if any) */
   if (msg.data.headers_len)
@@ -1298,6 +1320,11 @@ http_state_wait_app_reply (http_conn_t *hc, transport_send_params_t *sp)
          ASSERT (rv == msg.data.headers_len);
        }
     }
+  else
+    {
+      /* No headers from app */
+      response = format (response, "\r\n");
+    }
   HTTP_DBG (3, "%v", response);
 
   sent = http_send_data (hc, response, vec_len (response));
@@ -1649,6 +1676,47 @@ http_req_run_state_machine (http_conn_t *hc, transport_send_params_t *sp)
   http_conn_timer_update (hc);
 }
 
+static int
+http_tunnel_rx (session_t *ts, http_conn_t *hc)
+{
+  u32 max_deq, max_enq, max_read, n_segs = 2;
+  svm_fifo_seg_t segs[n_segs];
+  int n_written = 0;
+  session_t *as;
+  app_worker_t *app_wrk;
+
+  HTTP_DBG (1, "tunnel received data from client");
+
+  as = session_get_from_handle (hc->h_pa_session_handle);
+
+  max_deq = svm_fifo_max_dequeue (ts->rx_fifo);
+  if (PREDICT_FALSE (max_deq == 0))
+    {
+      HTTP_DBG (1, "max_deq == 0");
+      return 0;
+    }
+  max_enq = svm_fifo_max_enqueue (as->rx_fifo);
+  if (max_enq == 0)
+    {
+      HTTP_DBG (1, "app's rx fifo full");
+      svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+      return 0;
+    }
+  max_read = clib_min (max_enq, max_deq);
+  svm_fifo_segments (ts->rx_fifo, 0, segs, &n_segs, max_read);
+  n_written = svm_fifo_enqueue_segments (as->rx_fifo, segs, n_segs, 0);
+  ASSERT (n_written > 0);
+  HTTP_DBG (1, "transfered %u bytes", n_written);
+  svm_fifo_dequeue_drop (ts->rx_fifo, n_written);
+  app_wrk = app_worker_get_if_valid (as->app_wrk_index);
+  if (app_wrk)
+    app_worker_rx_notify (app_wrk, as);
+  if (svm_fifo_max_dequeue_cons (ts->rx_fifo))
+    session_program_rx_io_evt (session_handle (ts));
+
+  return 0;
+}
+
 static int
 http_ts_rx_callback (session_t *ts)
 {
@@ -1665,6 +1733,9 @@ http_ts_rx_callback (session_t *ts)
       return 0;
     }
 
+  if (hc->state == HTTP_CONN_STATE_TUNNEL)
+    return http_tunnel_rx (ts, hc);
+
   if (!http_state_is_rx_valid (hc))
     {
       if (hc->state != HTTP_CONN_STATE_CLOSED)
@@ -1691,6 +1762,7 @@ http_ts_builtin_tx_callback (session_t *ts)
   http_conn_t *hc;
 
   hc = http_conn_get_w_thread (ts->opaque, ts->thread_index);
+  HTTP_DBG (1, "transport connection reschedule");
   transport_connection_reschedule (&hc->connection);
 
   return 0;
@@ -2017,6 +2089,54 @@ http_transport_get_listener (u32 listener_index)
   return &lhc->connection;
 }
 
+static int
+http_tunnel_tx (http_conn_t *hc, session_t *as, transport_send_params_t *sp)
+{
+  u32 max_deq, max_enq, max_read, n_segs = 2;
+  svm_fifo_seg_t segs[n_segs];
+  session_t *ts;
+  int n_written = 0;
+
+  HTTP_DBG (1, "tunnel received data from target");
+
+  ts = session_get_from_handle (hc->h_tc_session_handle);
+
+  max_deq = svm_fifo_max_dequeue_cons (as->tx_fifo);
+  if (PREDICT_FALSE (max_deq == 0))
+    {
+      HTTP_DBG (1, "max_deq == 0");
+      goto check_fifo;
+    }
+  max_enq = svm_fifo_max_enqueue_prod (ts->tx_fifo);
+  if (max_enq == 0)
+    {
+      HTTP_DBG (1, "ts tx fifo full");
+      goto check_fifo;
+    }
+  max_read = clib_min (max_enq, max_deq);
+  max_read = clib_min (max_read, sp->max_burst_size);
+  svm_fifo_segments (as->tx_fifo, 0, segs, &n_segs, max_read);
+  n_written = svm_fifo_enqueue_segments (ts->tx_fifo, segs, n_segs, 0);
+  ASSERT (n_written > 0);
+  HTTP_DBG (1, "transfered %u bytes", n_written);
+  sp->bytes_dequeued += n_written;
+  sp->max_burst_size -= n_written;
+  svm_fifo_dequeue_drop (as->tx_fifo, n_written);
+  if (svm_fifo_set_event (ts->tx_fifo))
+    session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX);
+
+check_fifo:
+  /* Deschedule and wait for deq notification if ts fifo is almost full */
+  if (svm_fifo_max_enqueue (ts->tx_fifo) < HTTP_FIFO_THRESH)
+    {
+      svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+      transport_connection_deschedule (&hc->connection);
+      sp->flags |= TRANSPORT_SND_F_DESCHED;
+    }
+
+  return n_written > 0 ? clib_max (n_written / TRANSPORT_PACER_MIN_MSS, 1) : 0;
+}
+
 static int
 http_app_tx_callback (void *session, transport_send_params_t *sp)
 {
@@ -2027,6 +2147,13 @@ http_app_tx_callback (void *session, transport_send_params_t *sp)
   HTTP_DBG (1, "hc [%u]%x", as->thread_index, as->connection_index);
 
   hc = http_conn_get_w_thread (as->connection_index, as->thread_index);
+
+  max_burst_sz = sp->max_burst_size * TRANSPORT_PACER_MIN_MSS;
+  sp->max_burst_size = max_burst_sz;
+
+  if (hc->state == HTTP_CONN_STATE_TUNNEL)
+    return http_tunnel_tx (hc, as, sp);
+
   if (!http_state_is_tx_valid (hc))
     {
       if (hc->state != HTTP_CONN_STATE_CLOSED)
@@ -2040,9 +2167,6 @@ http_app_tx_callback (void *session, transport_send_params_t *sp)
       return 0;
     }
 
-  max_burst_sz = sp->max_burst_size * TRANSPORT_PACER_MIN_MSS;
-  sp->max_burst_size = max_burst_sz;
-
   HTTP_DBG (1, "run state machine");
   http_req_run_state_machine (hc, sp);
 
@@ -2057,6 +2181,19 @@ http_app_tx_callback (void *session, transport_send_params_t *sp)
   return sent > 0 ? clib_max (sent / TRANSPORT_PACER_MIN_MSS, 1) : 0;
 }
 
+static int
+http_app_rx_evt_cb (transport_connection_t *tc)
+{
+  http_conn_t *hc = (http_conn_t *) tc;
+  HTTP_DBG (1, "hc [%u]%x", vlib_get_thread_index (), hc->h_hc_index);
+  session_t *ts = session_get_from_handle (hc->h_tc_session_handle);
+
+  if (hc->state == HTTP_CONN_STATE_TUNNEL)
+    return http_tunnel_rx (ts, hc);
+
+  return 0;
+}
+
 static void
 http_transport_get_endpoint (u32 hc_index, u32 thread_index,
                             transport_endpoint_t *tep, u8 is_lcl)
@@ -2114,6 +2251,9 @@ format_http_conn_state (u8 *s, va_list *args)
     case HTTP_CONN_STATE_ESTABLISHED:
       s = format (s, "ESTABLISHED");
       break;
+    case HTTP_CONN_STATE_TUNNEL:
+      s = format (s, "TUNNEL");
+      break;
     case HTTP_CONN_STATE_TRANSPORT_CLOSED:
       s = format (s, "TRANSPORT_CLOSED");
       break;
@@ -2212,6 +2352,7 @@ static const transport_proto_vft_t http_proto = {
   .close = http_transport_close,
   .cleanup_ho = http_transport_cleanup_ho,
   .custom_tx = http_app_tx_callback,
+  .app_rx_evt = http_app_rx_evt_cb,
   .get_connection = http_transport_get_connection,
   .get_listener = http_transport_get_listener,
   .get_half_open = http_transport_get_ho,
index 04c53d1..a117f37 100644 (file)
@@ -64,6 +64,7 @@ typedef enum http_conn_state_
   HTTP_CONN_STATE_LISTEN,
   HTTP_CONN_STATE_CONNECTING,
   HTTP_CONN_STATE_ESTABLISHED,
+  HTTP_CONN_STATE_TUNNEL,
   HTTP_CONN_STATE_TRANSPORT_CLOSED,
   HTTP_CONN_STATE_APP_CLOSED,
   HTTP_CONN_STATE_CLOSED
@@ -85,6 +86,7 @@ typedef enum http_req_method_
 {
   HTTP_REQ_GET = 0,
   HTTP_REQ_POST,
+  HTTP_REQ_CONNECT,
 } http_req_method_t;
 
 typedef enum http_msg_type_
@@ -415,6 +417,7 @@ typedef struct http_tc_
   u32 body_offset;
   u64 body_len;
   u16 status_code;
+  u8 is_tunnel;
 } http_conn_t;
 
 typedef struct http_worker_