prom: concurrent connections fix 37/41237/9
authorMatus Fabian <[email protected]>
Fri, 28 Jun 2024 14:11:04 +0000 (16:11 +0200)
committerFlorin Coras <[email protected]>
Tue, 23 Jul 2024 20:37:29 +0000 (20:37 +0000)
Type: fix

Change-Id: I57814edb735e9dac916f2e01de95ccfb739ce655
Signed-off-by: Matus Fabian <[email protected]>
extras/hs-test/http_test.go
extras/hs-test/infra/hst_suite.go
src/plugins/prom/prom.c

index 29bd272..2983ba3 100644 (file)
@@ -9,9 +9,11 @@ import (
        "net/http"
        "net/http/httptrace"
        "os"
+       "sync"
        "time"
 
        . "fd.io/hs-test/infra"
+       . "github.com/onsi/ginkgo/v2"
 )
 
 func init() {
@@ -24,7 +26,7 @@ func init() {
                HttpStaticMacTimeTest, HttpStaticBuildInUrlGetVersionVerboseTest, HttpVersionNotSupportedTest,
                HttpInvalidContentLengthTest, HttpInvalidTargetSyntaxTest, HttpStaticPathTraversalTest, HttpUriDecodeTest,
                HttpHeadersTest, HttpStaticFileHandler)
-       RegisterNoTopoSoloTests(HttpStaticPromTest, HttpTpsTest, HttpTpsInterruptModeTest)
+       RegisterNoTopoSoloTests(HttpStaticPromTest, HttpTpsTest, HttpTpsInterruptModeTest, PromConcurrentConnections)
 }
 
 const wwwRootPath = "/tmp/www_root"
@@ -203,6 +205,37 @@ func HttpStaticPromTest(s *NoTopoSuite) {
        _, err = io.ReadAll(resp.Body)
 }
 
+func promReq(s *NoTopoSuite, url string, wg *sync.WaitGroup) {
+       defer GinkgoRecover()
+       defer wg.Done()
+       client := NewHttpClient()
+       req, err := http.NewRequest("GET", url, nil)
+       s.AssertNil(err, fmt.Sprint(err))
+       resp, err := client.Do(req)
+       s.AssertNil(err, fmt.Sprint(err))
+       defer resp.Body.Close()
+       s.AssertEqual(200, resp.StatusCode)
+       _, err = io.ReadAll(resp.Body)
+}
+
+func PromConcurrentConnections(s *NoTopoSuite) {
+       vpp := s.GetContainerByName("vpp").VppInstance
+       serverAddress := s.GetInterfaceByName(TapInterfaceName).Peer.Ip4AddressString()
+       url := "http://" + serverAddress + ":80/stats.prom"
+
+       s.Log(vpp.Vppctl("http static server uri tcp://" + serverAddress + "/80 url-handlers"))
+       s.Log(vpp.Vppctl("prom enable"))
+       time.Sleep(time.Second * 5)
+
+       var wg sync.WaitGroup
+       for i := 0; i < 20; i++ {
+               wg.Add(1)
+               go promReq(s, url, &wg)
+       }
+       wg.Wait()
+       s.Log(vpp.Vppctl("show session verbose proto http"))
+}
+
 func HttpStaticFileHandler(s *NoTopoSuite) {
        content := "<http><body><p>Hello</p></body></http>"
        content2 := "<http><body><p>Page</p></body></http>"
index 1f1d54b..028ab0b 100644 (file)
@@ -565,6 +565,7 @@ runBenchmark creates Gomega's experiment with the passed-in name and samples the
 passing in suite context, experiment and your data.
 
 You can also instruct runBenchmark to run with multiple concurrent workers.
+Note that if running in parallel Gomega returns from Sample when spins up all samples and does not wait until all finished.
 You can record multiple named measurements (float64 or duration) within passed-in callback.
 runBenchmark then produces report to show statistical distribution of measurements.
 */
index 76899a2..475e98b 100644 (file)
@@ -208,7 +208,7 @@ static uword
 prom_scraper_process (vlib_main_t *vm, vlib_node_runtime_t *rt,
                      vlib_frame_t *f)
 {
-  uword *event_data = 0, event_type;
+  uword *event_data = 0, event_type, *sh_as_uword;
   prom_main_t *pm = &prom_main;
   hss_session_handle_t sh;
   f64 timeout = 10000.0;
@@ -223,12 +223,15 @@ prom_scraper_process (vlib_main_t *vm, vlib_node_runtime_t *rt,
          /* timeout, do nothing */
          break;
        case PROM_SCRAPER_EVT_RUN:
-         sh.as_u64 = event_data[0];
          vec_reset_length (pm->stats);
          pm->stats = scrape_stats_segment (pm->stats, pm->stats_patterns,
                                            pm->used_only);
-         session_send_rpc_evt_to_thread_force (sh.thread_index,
-                                               send_data_to_hss_rpc, &sh);
+         vec_foreach (sh_as_uword, event_data)
+           {
+             sh.as_u64 = (u64) *sh_as_uword;
+             session_send_rpc_evt_to_thread_force (
+               sh.thread_index, send_data_to_hss_rpc, sh_as_uword);
+           }
          pm->last_scrape = vlib_time_now (vm);
          break;
        default: