session: async rx event notifications
[vpp.git] / src / vnet / session / session_input.c
1 /* SPDX-License-Identifier: Apache-2.0
2  * Copyright(c) 2023 Cisco Systems, Inc.
3  */
4
5 #include <vnet/session/session.h>
6 #include <vnet/session/application.h>
7
8 static inline int
9 mq_try_lock (svm_msg_q_t *mq)
10 {
11   int rv, n_try = 0;
12
13   while (n_try < 100)
14     {
15       rv = svm_msg_q_try_lock (mq);
16       if (!rv)
17         return 0;
18       n_try += 1;
19       usleep (1);
20     }
21
22   return -1;
23 }
24
25 always_inline u8
26 mq_event_ring_index (session_evt_type_t et)
27 {
28   return (et >= SESSION_CTRL_EVT_RPC ? SESSION_MQ_CTRL_EVT_RING :
29                                              SESSION_MQ_IO_EVT_RING);
30 }
31
32 void
33 app_worker_del_all_events (app_worker_t *app_wrk)
34 {
35   session_worker_t *wrk;
36   session_event_t *evt;
37   u32 thread_index;
38   session_t *s;
39
40   for (thread_index = 0; thread_index < vec_len (app_wrk->wrk_evts);
41        thread_index++)
42     {
43       while (clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
44         {
45           clib_fifo_sub2 (app_wrk->wrk_evts[thread_index], evt);
46           switch (evt->event_type)
47             {
48             case SESSION_CTRL_EVT_MIGRATED:
49               s = session_get (evt->session_index, thread_index);
50               transport_cleanup (session_get_transport_proto (s),
51                                  s->connection_index, s->thread_index);
52               session_free (s);
53               break;
54             case SESSION_CTRL_EVT_CLEANUP:
55               s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
56               if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
57                 break;
58               uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
59               break;
60             case SESSION_CTRL_EVT_HALF_CLEANUP:
61               s = ho_session_get (evt->session_index);
62               pool_put_index (app_wrk->half_open_table, s->ho_index);
63               session_free (s);
64               break;
65             default:
66               break;
67             }
68         }
69       wrk = session_main_get_worker (thread_index);
70       clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
71     }
72 }
73
74 always_inline int
75 app_worker_flush_events_inline (app_worker_t *app_wrk, u32 thread_index,
76                                 u8 is_builtin)
77 {
78   application_t *app = application_get (app_wrk->app_index);
79   svm_msg_q_t *mq = app_wrk->event_queue;
80   session_event_t *evt;
81   u32 n_evts = 128, i;
82   u8 ring_index, mq_is_cong;
83   session_t *s;
84
85   n_evts = clib_min (n_evts, clib_fifo_elts (app_wrk->wrk_evts[thread_index]));
86
87   if (!is_builtin)
88     {
89       mq_is_cong = app_worker_mq_is_congested (app_wrk);
90       if (mq_try_lock (mq))
91         {
92           app_worker_set_mq_wrk_congested (app_wrk, thread_index);
93           return 0;
94         }
95     }
96
97   for (i = 0; i < n_evts; i++)
98     {
99       evt = clib_fifo_head (app_wrk->wrk_evts[thread_index]);
100       if (!is_builtin)
101         {
102           ring_index = mq_event_ring_index (evt->event_type);
103           if (svm_msg_q_or_ring_is_full (mq, ring_index))
104             {
105               app_worker_set_mq_wrk_congested (app_wrk, thread_index);
106               break;
107             }
108         }
109
110       switch (evt->event_type)
111         {
112         case SESSION_IO_EVT_RX:
113           s = session_get (evt->session_index, thread_index);
114           s->flags &= ~SESSION_F_RX_EVT;
115           /* Application didn't confirm accept yet */
116           if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
117             break;
118           app->cb_fns.builtin_app_rx_callback (s);
119           break;
120         /* Handle sessions that might not be on current thread */
121         case SESSION_IO_EVT_BUILTIN_RX:
122           s = session_get_from_handle_if_valid (evt->session_handle);
123           if (!s || s->session_state == SESSION_STATE_ACCEPTING)
124             break;
125           app->cb_fns.builtin_app_rx_callback (s);
126           break;
127         case SESSION_IO_EVT_TX:
128           s = session_get (evt->session_index, thread_index);
129           app->cb_fns.builtin_app_tx_callback (s);
130           break;
131         case SESSION_IO_EVT_TX_MAIN:
132           s = session_get_from_handle_if_valid (evt->session_handle);
133           if (!s)
134             break;
135           app->cb_fns.builtin_app_tx_callback (s);
136           break;
137         case SESSION_CTRL_EVT_BOUND:
138           /* No app cb function currently */
139           if (is_builtin)
140             break;
141           mq_send_session_bound_cb (app_wrk->wrk_index, evt->as_u64[1] >> 32,
142                                     evt->session_handle,
143                                     evt->as_u64[1] & 0xffffffff);
144           break;
145         case SESSION_CTRL_EVT_ACCEPTED:
146           s = session_get (evt->session_index, thread_index);
147           app->cb_fns.session_accept_callback (s);
148           break;
149         case SESSION_CTRL_EVT_CONNECTED:
150           if (!(evt->as_u64[1] & 0xffffffff))
151             s = session_get (evt->session_index, thread_index);
152           else
153             s = 0;
154           app->cb_fns.session_connected_callback (app_wrk->wrk_index,
155                                                   evt->as_u64[1] >> 32, s,
156                                                   evt->as_u64[1] & 0xffffffff);
157           break;
158         case SESSION_CTRL_EVT_DISCONNECTED:
159           s = session_get (evt->session_index, thread_index);
160           app->cb_fns.session_disconnect_callback (s);
161           break;
162         case SESSION_CTRL_EVT_RESET:
163           s = session_get (evt->session_index, thread_index);
164           app->cb_fns.session_reset_callback (s);
165           break;
166         case SESSION_CTRL_EVT_UNLISTEN_REPLY:
167           if (is_builtin)
168             break;
169           mq_send_unlisten_reply (app_wrk, evt->session_handle,
170                                   evt->as_u64[1] >> 32,
171                                   evt->as_u64[1] & 0xffffffff);
172           break;
173         case SESSION_CTRL_EVT_MIGRATED:
174           s = session_get (evt->session_index, thread_index);
175           app->cb_fns.session_migrate_callback (s, evt->as_u64[1]);
176           transport_cleanup (session_get_transport_proto (s),
177                              s->connection_index, s->thread_index);
178           session_free (s);
179           /* Notify app that it has data on the new session */
180           s = session_get_from_handle (evt->as_u64[1]);
181           session_send_io_evt_to_thread (s->rx_fifo,
182                                          SESSION_IO_EVT_BUILTIN_RX);
183           break;
184         case SESSION_CTRL_EVT_TRANSPORT_CLOSED:
185           s = session_get (evt->session_index, thread_index);
186           if (app->cb_fns.session_transport_closed_callback)
187             app->cb_fns.session_transport_closed_callback (s);
188           break;
189         case SESSION_CTRL_EVT_CLEANUP:
190           s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
191           if (app->cb_fns.session_cleanup_callback)
192             app->cb_fns.session_cleanup_callback (s, evt->as_u64[0] >> 32);
193           if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
194             break;
195           uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
196           break;
197         case SESSION_CTRL_EVT_HALF_CLEANUP:
198           s = ho_session_get (evt->session_index);
199           ASSERT (session_vlib_thread_is_cl_thread ());
200           if (app->cb_fns.half_open_cleanup_callback)
201             app->cb_fns.half_open_cleanup_callback (s);
202           pool_put_index (app_wrk->half_open_table, s->ho_index);
203           session_free (s);
204           break;
205         case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
206           app->cb_fns.add_segment_callback (app_wrk->wrk_index,
207                                             evt->as_u64[1]);
208           break;
209         case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
210           app->cb_fns.del_segment_callback (app_wrk->wrk_index,
211                                             evt->as_u64[1]);
212           break;
213         default:
214           clib_warning ("unexpected event: %u", evt->event_type);
215           ASSERT (0);
216           break;
217         }
218       clib_fifo_advance_head (app_wrk->wrk_evts[thread_index], 1);
219     }
220
221   if (!is_builtin)
222     {
223       svm_msg_q_unlock (mq);
224       if (mq_is_cong && i == n_evts)
225         app_worker_unset_wrk_mq_congested (app_wrk, thread_index);
226     }
227
228   return 0;
229 }
230
231 int
232 app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index)
233 {
234   if (app_worker_application_is_builtin (app_wrk))
235     return app_worker_flush_events_inline (app_wrk, thread_index,
236                                            1 /* is_builtin */);
237   else
238     return app_worker_flush_events_inline (app_wrk, thread_index,
239                                            0 /* is_builtin */);
240 }
241
242 static inline int
243 session_wrk_flush_events (session_worker_t *wrk)
244 {
245   app_worker_t *app_wrk;
246   uword app_wrk_index;
247   u32 thread_index;
248
249   thread_index = wrk->vm->thread_index;
250   app_wrk_index = clib_bitmap_first_set (wrk->app_wrks_pending_ntf);
251
252   while (app_wrk_index != ~0)
253     {
254       app_wrk = app_worker_get_if_valid (app_wrk_index);
255       /* app_wrk events are flushed on free, so should be valid here */
256       ASSERT (app_wrk != 0);
257       app_wrk_flush_wrk_events (app_wrk, thread_index);
258
259       if (!clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
260         clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
261
262       app_wrk_index =
263         clib_bitmap_next_set (wrk->app_wrks_pending_ntf, app_wrk_index + 1);
264     }
265
266   if (!clib_bitmap_is_zero (wrk->app_wrks_pending_ntf))
267     vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
268
269   return 0;
270 }
271
272 VLIB_NODE_FN (session_input_node)
273 (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
274 {
275   u32 thread_index = vm->thread_index;
276   session_worker_t *wrk;
277
278   wrk = session_main_get_worker (thread_index);
279   session_wrk_flush_events (wrk);
280
281   return 0;
282 }
283
284 VLIB_REGISTER_NODE (session_input_node) = {
285   .name = "session-input",
286   .type = VLIB_NODE_TYPE_INPUT,
287   .state = VLIB_NODE_STATE_DISABLED,
288 };
289
290 /*
291  * fd.io coding-style-patch-verification: ON
292  *
293  * Local Variables:
294  * eval: (c-set-style "gnu")
295  * End:
296  */