hsa: make http cli client thread safe
[vpp.git] / src / plugins / hs_apps / http_client_cli.c
1 /*
2  * Copyright (c) 2022 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application.h>
17 #include <vnet/session/application_interface.h>
18 #include <vnet/session/session.h>
19 #include <http/http.h>
20 #include <hs_apps/http_cli.h>
21
22 typedef struct
23 {
24   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
25   u32 session_index;
26   u32 thread_index;
27   u32 rx_offset;
28   u32 vpp_session_index;
29   u32 to_recv;
30   u8 is_closed;
31 } hcc_session_t;
32
33 typedef struct
34 {
35   hcc_session_t *sessions;
36   u8 *rx_buf;
37   u32 thread_index;
38 } hcc_worker_t;
39
40 typedef struct
41 {
42   hcc_worker_t *wrk;
43   u32 app_index;
44
45   u32 prealloc_fifos;
46   u32 private_segment_size;
47   u32 fifo_size;
48   u8 *uri;
49   u8 *http_query;
50   session_endpoint_cfg_t connect_sep;
51
52   u8 test_client_attached;
53   vlib_main_t *vlib_main;
54   u32 cli_node_index;
55   u8 *http_response;
56 } hcc_main_t;
57
58 typedef enum
59 {
60   HCC_REPLY_RECEIVED = 100,
61 } hcc_cli_signal_t;
62
63 static hcc_main_t hcc_main;
64
65 static hcc_worker_t *
66 hcc_worker_get (u32 thread_index)
67 {
68   return vec_elt_at_index (hcc_main.wrk, thread_index);
69 }
70
71 static hcc_session_t *
72 hcc_session_alloc (hcc_worker_t *wrk)
73 {
74   hcc_session_t *hs;
75   pool_get_zero (wrk->sessions, hs);
76   hs->session_index = hs - wrk->sessions;
77   hs->thread_index = wrk->thread_index;
78   return hs;
79 }
80
81 static hcc_session_t *
82 hcc_session_get (u32 hs_index, u32 thread_index)
83 {
84   hcc_worker_t *wrk = hcc_worker_get (thread_index);
85   return pool_elt_at_index (wrk->sessions, hs_index);
86 }
87
88 static void
89 hcc_session_free (u32 thread_index, hcc_session_t *hs)
90 {
91   hcc_worker_t *wrk = hcc_worker_get (thread_index);
92   pool_put (wrk->sessions, hs);
93 }
94
95 static int
96 hcc_ts_accept_callback (session_t *ts)
97 {
98   clib_warning ("bug");
99   return -1;
100 }
101
102 static void
103 hcc_ts_disconnect_callback (session_t *s)
104 {
105   hcc_main_t *hcm = &hcc_main;
106   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
107
108   a->handle = session_handle (s);
109   a->app_index = hcm->app_index;
110   vnet_disconnect_session (a);
111 }
112
113 static int
114 hcc_ts_connected_callback (u32 app_index, u32 hc_index, session_t *as,
115                            session_error_t err)
116 {
117   hcc_main_t *hcm = &hcc_main;
118   hcc_session_t *hs, *new_hs;
119   hcc_worker_t *wrk;
120   http_msg_t msg;
121   int rv;
122
123   if (err)
124     {
125       clib_warning ("connected error: hc_index(%d): %U", hc_index,
126                     format_session_error, err);
127       return -1;
128     }
129
130   /* TODO delete half open session once the support is added in http layer */
131   hs = hcc_session_get (hc_index, 0);
132   wrk = hcc_worker_get (as->thread_index);
133   new_hs = hcc_session_alloc (wrk);
134   clib_memcpy_fast (new_hs, hs, sizeof (*hs));
135
136   hs->vpp_session_index = as->session_index;
137
138   msg.type = HTTP_MSG_REQUEST;
139   msg.method_type = HTTP_REQ_GET;
140   msg.content_type = HTTP_CONTENT_TEXT_HTML;
141   msg.data.type = HTTP_MSG_DATA_INLINE;
142   msg.data.len = vec_len (hcm->http_query);
143
144   svm_fifo_seg_t segs[2] = { { (u8 *) &msg, sizeof (msg) },
145                              { hcm->http_query, vec_len (hcm->http_query) } };
146
147   rv = svm_fifo_enqueue_segments (as->tx_fifo, segs, 2, 0 /* allow partial */);
148   if (rv < 0 || rv != sizeof (msg) + vec_len (hcm->http_query))
149     {
150       clib_warning ("failed app enqueue");
151       return -1;
152     }
153
154   if (svm_fifo_set_event (as->tx_fifo))
155     session_send_io_evt_to_thread (as->tx_fifo, SESSION_IO_EVT_TX);
156
157   return 0;
158 }
159
160 static void
161 hcc_ts_reset_callback (session_t *s)
162 {
163   hcc_main_t *hcm = &hcc_main;
164   hcc_session_t *hs;
165   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
166
167   hs = hcc_session_get (s->opaque, s->thread_index);
168   hs->is_closed = 1;
169
170   a->handle = session_handle (s);
171   a->app_index = hcm->app_index;
172   vnet_disconnect_session (a);
173 }
174
175 static int
176 hcc_ts_tx_callback (session_t *ts)
177 {
178   clib_warning ("bug");
179   return -1;
180 }
181
182 static void
183 hcc_session_disconnect (session_t *s)
184 {
185   hcc_main_t *hcm = &hcc_main;
186   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
187   a->handle = session_handle (s);
188   a->app_index = hcm->app_index;
189   vnet_disconnect_session (a);
190 }
191
192 static int
193 hcc_ts_rx_callback (session_t *ts)
194 {
195   hcc_main_t *hcm = &hcc_main;
196   hcc_session_t *hs;
197   http_msg_t msg;
198   int rv;
199
200   hs = hcc_session_get (ts->opaque, ts->thread_index);
201
202   if (hs->is_closed)
203     {
204       clib_warning ("session is closed");
205       return 0;
206     }
207
208   if (!hs->to_recv)
209     {
210       rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
211       ASSERT (rv == sizeof (msg));
212
213       if (msg.type != HTTP_MSG_REPLY || msg.code != HTTP_STATUS_OK)
214         {
215           clib_warning ("unexpected msg type %d", msg.type);
216           return 0;
217         }
218       vec_validate (hcm->http_response, msg.data.len - 1);
219       vec_reset_length (hcm->http_response);
220       hs->to_recv = msg.data.len;
221     }
222
223   u32 max_deq = svm_fifo_max_dequeue (ts->rx_fifo);
224
225   u32 n_deq = clib_min (hs->to_recv, max_deq);
226   u32 curr = vec_len (hcm->http_response);
227   rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hcm->http_response + curr);
228   if (rv < 0)
229     {
230       clib_warning ("app dequeue failed");
231       return -1;
232     }
233
234   if (rv != n_deq)
235     return -1;
236
237   vec_set_len (hcm->http_response, curr + n_deq);
238   ASSERT (hs->to_recv >= rv);
239   hs->to_recv -= rv;
240
241   if (hs->to_recv == 0)
242     {
243       hcc_session_disconnect (ts);
244       vlib_process_signal_event_mt (hcm->vlib_main, hcm->cli_node_index,
245                                     HCC_REPLY_RECEIVED, 0);
246     }
247
248   return 0;
249 }
250
251 static void
252 hcc_ts_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf)
253 {
254   hcc_session_t *hs;
255
256   hs = hcc_session_get (s->thread_index, s->opaque);
257   if (!hs)
258     return;
259
260   hcc_session_free (s->thread_index, hs);
261 }
262
263 static session_cb_vft_t hcc_session_cb_vft = {
264   .session_accept_callback = hcc_ts_accept_callback,
265   .session_disconnect_callback = hcc_ts_disconnect_callback,
266   .session_connected_callback = hcc_ts_connected_callback,
267   .builtin_app_rx_callback = hcc_ts_rx_callback,
268   .builtin_app_tx_callback = hcc_ts_tx_callback,
269   .session_reset_callback = hcc_ts_reset_callback,
270   .session_cleanup_callback = hcc_ts_cleanup_callback,
271 };
272
273 static clib_error_t *
274 hcc_attach ()
275 {
276   hcc_main_t *hcm = &hcc_main;
277   vnet_app_attach_args_t _a, *a = &_a;
278   u64 options[18];
279   u32 segment_size = 128 << 20;
280   int rv;
281
282   if (hcm->private_segment_size)
283     segment_size = hcm->private_segment_size;
284
285   clib_memset (a, 0, sizeof (*a));
286   clib_memset (options, 0, sizeof (options));
287
288   a->api_client_index = ~0;
289   a->name = format (0, "http_cli_client");
290   a->session_cb_vft = &hcc_session_cb_vft;
291   a->options = options;
292   a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
293   a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = segment_size;
294   a->options[APP_OPTIONS_RX_FIFO_SIZE] =
295     hcm->fifo_size ? hcm->fifo_size : 8 << 10;
296   a->options[APP_OPTIONS_TX_FIFO_SIZE] =
297     hcm->fifo_size ? hcm->fifo_size : 32 << 10;
298   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
299   a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = hcm->prealloc_fifos;
300
301   if ((rv = vnet_application_attach (a)))
302     return clib_error_return (0, "attach returned %d", rv);
303
304   hcm->app_index = a->app_index;
305   vec_free (a->name);
306   hcm->test_client_attached = 1;
307   return 0;
308 }
309
310 static int
311 hcc_connect_rpc (void *rpc_args)
312 {
313   vnet_connect_args_t *a = rpc_args;
314   int rv;
315
316   rv = vnet_connect (a);
317   if (rv)
318     clib_warning (0, "connect returned: %U", format_session_error, rv);
319
320   vec_free (a);
321   return rv;
322 }
323
324 static void
325 hcc_program_connect (vnet_connect_args_t *a)
326 {
327   session_send_rpc_evt_to_thread_force (transport_cl_thread (),
328                                         hcc_connect_rpc, a);
329 }
330
331 static clib_error_t *
332 hcc_connect ()
333 {
334   vnet_connect_args_t *a = 0;
335   hcc_main_t *hcm = &hcc_main;
336   hcc_worker_t *wrk;
337   hcc_session_t *hs;
338
339   vec_validate (a, 0);
340   clib_memset (a, 0, sizeof (a[0]));
341
342   clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
343   a->app_index = hcm->app_index;
344
345   /* allocate http session on main thread */
346   wrk = hcc_worker_get (0);
347   hs = hcc_session_alloc (wrk);
348   a->api_context = hs->session_index;
349
350   hcc_program_connect (a);
351   return 0;
352 }
353
354 static clib_error_t *
355 hcc_run (vlib_main_t *vm)
356 {
357   vlib_thread_main_t *vtm = vlib_get_thread_main ();
358   hcc_main_t *hcm = &hcc_main;
359   uword event_type, *event_data = 0;
360   u32 num_threads;
361   clib_error_t *err = 0;
362   hcc_worker_t *wrk;
363
364   num_threads = 1 /* main thread */ + vtm->n_threads;
365   vec_validate (hcm->wrk, num_threads);
366   vec_foreach (wrk, hcm->wrk)
367     {
368       wrk->thread_index = wrk - hcm->wrk;
369     }
370
371   if ((err = hcc_attach ()))
372     {
373       return clib_error_return (0, "http client attach: %U", format_clib_error,
374                                 err);
375     }
376
377   if ((err = hcc_connect ()))
378     {
379       return clib_error_return (0, "http client connect: %U",
380                                 format_clib_error, err);
381     }
382
383   vlib_process_wait_for_event_or_clock (vm, 10);
384   event_type = vlib_process_get_events (vm, &event_data);
385   switch (event_type)
386     {
387     case ~0:
388       err = clib_error_return (0, "timeout");
389       goto cleanup;
390
391     case HCC_REPLY_RECEIVED:
392       vlib_cli_output (vm, "%v", hcm->http_response);
393       vec_free (hcm->http_response);
394       break;
395     default:
396       clib_error_return (0, "unexpected event %d", event_type);
397       break;
398     }
399
400 cleanup:
401   vec_free (event_data);
402   return err;
403 }
404
405 static int
406 hcc_detach ()
407 {
408   hcc_main_t *hcm = &hcc_main;
409   vnet_app_detach_args_t _da, *da = &_da;
410   int rv;
411
412   if (!hcm->test_client_attached)
413     return 0;
414
415   da->app_index = hcm->app_index;
416   da->api_client_index = ~0;
417   rv = vnet_application_detach (da);
418   hcm->test_client_attached = 0;
419   hcm->app_index = ~0;
420
421   return rv;
422 }
423
424 static clib_error_t *
425 hcc_command_fn (vlib_main_t *vm, unformat_input_t *input,
426                 vlib_cli_command_t *cmd)
427 {
428   unformat_input_t _line_input, *line_input = &_line_input;
429   hcc_main_t *hcm = &hcc_main;
430   u64 seg_size;
431   clib_error_t *err = 0;
432   int rv;
433
434   hcm->prealloc_fifos = 0;
435   hcm->private_segment_size = 0;
436   hcm->fifo_size = 0;
437
438   if (hcm->test_client_attached)
439     return clib_error_return (0, "failed: already running!");
440
441   /* Get a line of input. */
442   if (!unformat_user (input, unformat_line_input, line_input))
443     return clib_error_return (0, "expected URI");
444
445   while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
446     {
447       if (unformat (line_input, "prealloc-fifos %d", &hcm->prealloc_fifos))
448         ;
449       else if (unformat (line_input, "private-segment-size %U",
450                          unformat_memory_size, &seg_size))
451         hcm->private_segment_size = seg_size;
452       else if (unformat (line_input, "fifo-size %d", &hcm->fifo_size))
453         hcm->fifo_size <<= 10;
454       else if (unformat (line_input, "uri %s", &hcm->uri))
455         ;
456       else if (unformat (line_input, "query %s", &hcm->http_query))
457         ;
458       else
459         {
460           err = clib_error_return (0, "unknown input `%U'",
461                                    format_unformat_error, line_input);
462           goto done;
463         }
464     }
465
466   hcm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index;
467
468   if (!hcm->uri)
469     {
470       err = clib_error_return (0, "URI not defined");
471       goto done;
472     }
473
474   if ((rv = parse_uri ((char *) hcm->uri, &hcm->connect_sep)))
475     {
476       err = clib_error_return (0, "Uri parse error: %d", rv);
477       goto done;
478     }
479
480   vlib_worker_thread_barrier_sync (vm);
481   vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */);
482   vlib_worker_thread_barrier_release (vm);
483
484   err = hcc_run (vm);
485
486   if (hcc_detach ())
487     {
488       /* don't override last error */
489       if (!err)
490         err = clib_error_return (0, "failed: app detach");
491       clib_warning ("WARNING: app detach failed...");
492     }
493
494 done:
495   vec_free (hcm->uri);
496   vec_free (hcm->http_query);
497   unformat_free (line_input);
498   return err;
499 }
500
501 VLIB_CLI_COMMAND (hcc_command, static) = {
502   .path = "http cli client",
503   .short_help = "uri http://<ip-addr> query <query-string>",
504   .function = hcc_command_fn,
505   .is_mp_safe = 1,
506 };
507
508 static clib_error_t *
509 hcc_main_init (vlib_main_t *vm)
510 {
511   hcc_main_t *hcm = &hcc_main;
512
513   hcm->app_index = ~0;
514   hcm->vlib_main = vm;
515   return 0;
516 }
517
518 VLIB_INIT_FUNCTION (hcc_main_init);
519
520 /*
521  * fd.io coding-style-patch-verification: ON
522  *
523  * Local Variables:
524  * eval: (c-set-style "gnu")
525  * End:
526  */