e08f5f33adc3b9aaa59a65306786a8d1292f7b37
[vpp.git] / src / vcl / vcl_event.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vppinfra/fifo.h>
17 #include <vppinfra/pool.h>
18 #include <vppinfra/hash.h>
19 #include <vnet/api_errno.h>
20
21 #include <vcl/vppcom.h>
22 #include <vcl/vcl_event.h>
23 #include <vcl/vcl_private.h>
24
25 /**
26  * @file
27  * @brief VPP Communications Library (VCL) event handler.
28  *
29  * Definitions for generic event handling in VCL.
30  */
31
32 int
33 vce_generate_event (vce_event_thread_t * evt, u32 ev_idx)
34 {
35   int elts, rv = 0;
36   vce_event_t *p;
37
38   pthread_mutex_lock (&(evt->generator_lock));
39
40   /* Check there is event data for this event */
41
42   VCE_EVENTS_LOCK ();
43   p = pool_elt_at_index (evt->vce_events, ev_idx);
44   ASSERT (p);
45
46   elts = (int) clib_fifo_free_elts (evt->event_index_fifo);
47   if (PREDICT_TRUE (elts))
48     {
49       /* Add event to queue */
50       clib_fifo_add1 (evt->event_index_fifo, ev_idx);
51       pthread_cond_signal (&(evt->generator_cond));
52     }
53   else
54     {
55       rv = VNET_API_ERROR_QUEUE_FULL;
56     }
57
58   VCE_EVENTS_UNLOCK ();
59   pthread_mutex_unlock (&(evt->generator_lock));
60
61   return rv;
62 }
63
64 void
65 vce_clear_event (vce_event_thread_t * evt, u32 ev_idx)
66 {
67   VCE_EVENTS_LOCK ();
68   pool_put_index (evt->vce_events, ev_idx);
69   VCE_EVENTS_UNLOCK ();
70 }
71
72 vce_event_t *
73 vce_get_event_from_index (vce_event_thread_t * evt, u32 ev_idx)
74 {
75   vce_event_t *ev = 0;
76   /* Assumes caller has obtained the spinlock (evt->events_lockp) */
77
78   if (!pool_is_free_index (evt->vce_events, ev_idx))
79     ev = pool_elt_at_index (evt->vce_events, ev_idx);
80
81   return ev;
82 }
83
84 vce_event_handler_reg_t *
85 vce_get_event_handler (vce_event_thread_t * evt, vce_event_key_t * evk)
86 {
87   vce_event_handler_reg_t *handler = 0;
88   uword *p;
89
90   VCE_HANDLERS_LOCK ();
91   p = hash_get (evt->handlers_index_by_event_key, evk->as_u64);
92   if (p)
93     handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
94   VCE_HANDLERS_UNLOCK ();
95
96   return handler;
97 }
98
99 vce_event_handler_reg_t *
100 vce_register_handler (vce_event_thread_t * evt, vce_event_key_t * evk,
101                       vce_event_callback_t cb, void *cb_args)
102 {
103   vce_event_handler_reg_t *handler;
104   vce_event_handler_reg_t *old_handler = 0;
105   uword *p;
106   u32 handler_index;
107
108   /* TODO - multiple handler support. For now we can replace
109    * and re-instate, which is useful for event recycling */
110
111   VCE_HANDLERS_LOCK ();
112
113   p = hash_get (evt->handlers_index_by_event_key, evk->as_u64);
114   if (p)
115     {
116       old_handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
117       /* If we are just re-registering, ignore and move on
118        * else store the old handler_fn for unregister to re-instate */
119       if (old_handler->handler_fn == cb)
120         {
121
122           VCE_HANDLERS_UNLOCK ();
123
124           /* Signal event thread that a handler exists in case any
125            * recycled events requiring this handler are pending */
126           pthread_mutex_lock (&(evt->generator_lock));
127           pthread_cond_signal (&(evt->generator_cond));
128           pthread_mutex_unlock (&(evt->generator_lock));
129           return old_handler;
130         }
131     }
132
133   pool_get (evt->vce_event_handlers, handler);
134   handler_index = (u32) (handler - evt->vce_event_handlers);
135
136   handler->handler_fn = cb;
137   handler->replaced_handler_idx = (u32) ((p) ? p[0] : ~0);
138   handler->ev_idx = (u32) ~ 0;  //This will be set by the event thread if event happens
139   handler->evk = evk->as_u64;
140   handler->handler_fn_args = cb_args;
141
142   hash_set (evt->handlers_index_by_event_key, evk->as_u64, handler_index);
143
144   pthread_cond_init (&(handler->handler_cond), NULL);
145   pthread_mutex_init (&(handler->handler_lock), NULL);
146
147   VCE_HANDLERS_UNLOCK ();
148
149   /* Signal event thread that a new handler exists in case any
150    * recycled events requiring this handler are pending */
151   pthread_mutex_lock (&(evt->generator_lock));
152   pthread_cond_signal (&(evt->generator_cond));
153   pthread_mutex_unlock (&(evt->generator_lock));
154
155   return handler;
156 }
157
158 int
159 vce_unregister_handler (vce_event_thread_t * evt,
160                         vce_event_handler_reg_t * handler)
161 {
162   uword *p;
163   u64 evk = handler->evk;
164   u8 generate_signal = 0;
165
166   VCE_HANDLERS_LOCK ();
167
168   p = hash_get (evt->handlers_index_by_event_key, evk);
169   if (!p)
170     {
171       VCE_HANDLERS_UNLOCK ();
172       return VNET_API_ERROR_NO_SUCH_ENTRY;
173     }
174
175   handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
176
177   /* If this handler replaced another handler, re-instate it */
178   if (handler->replaced_handler_idx != ~0)
179     {
180       hash_set (evt->handlers_index_by_event_key, evk,
181                 handler->replaced_handler_idx);
182       generate_signal = 1;
183     }
184   else
185     {
186       hash_unset (evt->handlers_index_by_event_key, evk);
187     }
188
189   pthread_mutex_destroy (&(handler->handler_lock));
190   pthread_cond_destroy (&(handler->handler_cond));
191   pool_put (evt->vce_event_handlers, handler);
192
193   VCE_HANDLERS_UNLOCK ();
194
195   if (generate_signal)
196     {
197       /* Signal event thread that a new handler exists in case any
198        * recycled events requiring this handler are pending */
199       pthread_mutex_lock (&(evt->generator_lock));
200       pthread_cond_signal (&(evt->generator_cond));
201       pthread_mutex_unlock (&(evt->generator_lock));
202     }
203
204   return 0;
205 }
206
207 void *
208 vce_event_thread_fn (void *arg)
209 {
210   vce_event_thread_t *evt = (vce_event_thread_t *) arg;
211   vce_event_t *ev;
212   u32 ev_idx;
213   vce_event_handler_reg_t *handler;
214   uword *p;
215   u32 recycle_count = 0;
216
217   pthread_mutex_lock (&(evt->generator_lock));
218   while (1)
219     {
220       uword fifo_depth = clib_fifo_elts (evt->event_index_fifo);
221       while ((fifo_depth == 0) || (recycle_count == fifo_depth))
222         {
223           recycle_count = 0;
224           pthread_cond_wait (&(evt->generator_cond), &(evt->generator_lock));
225           fifo_depth = clib_fifo_elts (evt->event_index_fifo);
226         }
227
228       /* Remove event */
229       VCE_EVENTS_LOCK ();
230       clib_fifo_sub1 (evt->event_index_fifo, ev_idx);
231       ev = vce_get_event_from_index (evt, ev_idx);
232       ASSERT (ev);
233       if (recycle_count && ev->recycle)
234         {
235           clib_fifo_add1 (evt->event_index_fifo, ev_idx);
236           VCE_EVENTS_UNLOCK ();
237           continue;
238         }
239       VCE_HANDLERS_LOCK ();
240
241       p = hash_get (evt->handlers_index_by_event_key, ev->evk.as_u64);
242       if (!p)
243         {
244           /* If an event falls in the woods, and there is no handler to hear it,
245            * does it make any sound?
246            * I don't know either, so lets biff the event */
247           pool_put (evt->vce_events, ev);
248           VCE_EVENTS_UNLOCK ();
249           VCE_HANDLERS_UNLOCK ();
250           pthread_mutex_unlock (&(evt->generator_lock));
251         }
252       else
253         {
254           u32 evt_recycle = ev->recycle;
255           handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
256           handler->ev_idx = ev_idx;
257           ev->recycle = 0;
258
259           VCE_EVENTS_UNLOCK ();
260           VCE_HANDLERS_UNLOCK ();
261           pthread_mutex_unlock (&(evt->generator_lock));
262
263           (handler->handler_fn) (handler);
264
265           VCE_EVENTS_LOCK ();
266           ev = vce_get_event_from_index (evt, ev_idx);
267           recycle_count += (!evt_recycle && ev && ev->recycle) ? 1 : 0;
268           VCE_EVENTS_UNLOCK ();
269         }
270
271       pthread_mutex_lock (&(evt->generator_lock));
272     }
273   return NULL;
274 }
275
276 int
277 vce_start_event_thread (vce_event_thread_t * evt, u8 max_events)
278 {
279   clib_fifo_validate (evt->event_index_fifo, max_events);
280   evt->handlers_index_by_event_key = hash_create (0, sizeof (uword));
281
282   pthread_cond_init (&(evt->generator_cond), NULL);
283   pthread_mutex_init (&(evt->generator_lock), NULL);
284
285   clib_spinlock_init (&(evt->events_lockp));
286   clib_spinlock_init (&(evt->handlers_lockp));
287
288   return pthread_create (&(evt->thread), NULL /* attr */ ,
289                          vce_event_thread_fn, evt);
290 }
291
292 static void *
293 vppcom_session_io_thread_fn (void *arg)
294 {
295   vppcom_session_io_thread_t *evt = (vppcom_session_io_thread_t *) arg;
296   u32 *session_indexes = 0, *session_index;
297   int i, rv;
298   u32 bytes = 0;
299   vcl_session_t *session;
300
301   while (1)
302     {
303       vec_reset_length (session_indexes);
304       VCE_IO_SESSIONS_LOCK ();
305       /* *INDENT-OFF* */
306       pool_foreach (session_index, evt->active_session_indexes, ({
307         vec_add1 (session_indexes, *session_index);
308       }));
309       /* *INDENT-ON* */
310       VCE_IO_SESSIONS_UNLOCK ();
311       if (session_indexes)
312         {
313           for (i = 0; i < vec_len (session_indexes); ++i)
314             {
315               VCL_SESSION_LOCK_AND_GET (session_indexes[i], &session);
316               bytes = svm_fifo_max_dequeue (session->rx_fifo);
317               VCL_SESSION_UNLOCK ();
318
319               if (bytes)
320                 {
321                   vppcom_ioevent_t *eio;
322                   vce_event_t *ev;
323                   u32 ev_idx;
324
325                   VCL_EVENTS_LOCK ();
326
327                   pool_get (vcm->event_thread.vce_events, ev);
328                   ev_idx = (u32) (ev - vcm->event_thread.vce_events);
329                   eio = vce_get_event_data (ev, sizeof (*eio));
330                   ev->evk.eid = VCL_EVENT_IOEVENT_RX_FIFO;
331                   ev->evk.session_index = session_indexes[i];
332                   eio->bytes = bytes;
333                   eio->session_index = session_indexes[i];
334
335                   VCL_EVENTS_UNLOCK ();
336
337                   rv = vce_generate_event (&vcm->event_thread, ev_idx);
338                 }
339             }
340         }
341       struct timespec ts;
342       ts.tv_sec = 0;
343       ts.tv_nsec = 1000000;     /* 1 millisecond */
344       nanosleep (&ts, NULL);
345     }
346 done:
347   VCL_SESSION_UNLOCK ();
348   return NULL;
349 }
350
351 static int
352 vppcom_start_io_event_thread (vppcom_session_io_thread_t * evt,
353                               u8 max_sessions)
354 {
355   pthread_cond_init (&(evt->vce_io_cond), NULL);
356   pthread_mutex_init (&(evt->vce_io_lock), NULL);
357
358   clib_spinlock_init (&(evt->io_sessions_lockp));
359
360   return pthread_create (&(evt->thread), NULL /* attr */ ,
361                          vppcom_session_io_thread_fn, evt);
362 }
363
364 static void
365 vce_registered_ioevent_handler_fn (void *arg)
366 {
367   vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
368   vppcom_ioevent_t *eio;
369   vce_event_t *ev;
370   u32 ioevt_ndx = (u64) (reg->handler_fn_args);
371   vppcom_session_ioevent_t *ioevent, ioevent_;
372
373   VCL_EVENTS_LOCK ();
374   ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
375   eio = vce_get_event_data (ev, sizeof (*eio));
376   VCL_EVENTS_UNLOCK ();
377
378   VCL_IO_SESSIONS_LOCK ();
379   ioevent = pool_elt_at_index (vcm->session_io_thread.ioevents, ioevt_ndx);
380   ioevent_ = *ioevent;
381   VCL_IO_SESSIONS_UNLOCK ();
382   (ioevent_.user_cb) (eio, ioevent_.user_cb_data);
383   vce_clear_event (&vcm->event_thread, reg->ev_idx);
384   return;
385
386   /*TODO - Unregister check in close for this listener */
387
388 }
389
390 void
391 vce_registered_listener_connect_handler_fn (void *arg)
392 {
393   vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
394   vce_event_connect_request_t *ecr;
395   vce_event_t *ev;
396   vppcom_endpt_t ep;
397
398   vcl_session_t *new_session;
399   int rv;
400
401   vppcom_session_listener_t *session_listener =
402     (vppcom_session_listener_t *) reg->handler_fn_args;
403
404   VCL_EVENTS_LOCK ();
405   ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
406   ecr = vce_get_event_data (ev, sizeof (*ecr));
407   VCL_EVENTS_UNLOCK ();
408   VCL_SESSION_LOCK_AND_GET (ecr->accepted_session_index, &new_session);
409
410   ep.is_ip4 = new_session->transport.is_ip4;
411   ep.port = new_session->transport.rmt_port;
412   if (new_session->transport.is_ip4)
413     clib_memcpy (&ep.ip, &new_session->transport.rmt_ip.ip4,
414                  sizeof (ip4_address_t));
415   else
416     clib_memcpy (&ep.ip, &new_session->transport.rmt_ip.ip6,
417                  sizeof (ip6_address_t));
418
419   vppcom_send_accept_session_reply (new_session->vpp_handle,
420                                     new_session->client_context,
421                                     0 /* retval OK */ );
422   VCL_SESSION_UNLOCK ();
423
424   (session_listener->user_cb) (ecr->accepted_session_index, &ep,
425                                session_listener->user_cb_data);
426
427   if (vcm->session_io_thread.io_sessions_lockp)
428     {
429       /* Throw this new accepted session index into the rx poll thread pool */
430       VCL_IO_SESSIONS_LOCK ();
431       u32 *active_session_index;
432       pool_get (vcm->session_io_thread.active_session_indexes,
433                 active_session_index);
434       *active_session_index = ecr->accepted_session_index;
435       VCL_IO_SESSIONS_UNLOCK ();
436     }
437
438   /*TODO - Unregister check in close for this listener */
439   return;
440
441 done:
442   ASSERT (0);                   // If we can't get a lock or accepted session fails, lets blow up.
443 }
444
445 /**
446  * @brief vce_poll_wait_connect_request_handler_fn
447  * - used by vppcom_epoll_xxxx() for listener sessions
448  * - when a vl_api_accept_session_t_handler() generates an event
449  *   this callback is alerted and sets the fields that vppcom_epoll_wait()
450  *   expects to see.
451  *
452  * @param arg - void* to be cast to vce_event_handler_reg_t*
453  */
454 void
455 vce_poll_wait_connect_request_handler_fn (void *arg)
456 {
457   vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
458   vce_event_t *ev;
459   /* Retrieve the VCL_EVENT_CONNECT_REQ_ACCEPTED event */
460   ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
461   vce_event_connect_request_t *ecr = vce_get_event_data (ev, sizeof (*ecr));
462
463   /* Add the accepted_session_index to the FIFO */
464   VCL_ACCEPT_FIFO_LOCK ();
465   clib_fifo_add1 (vcm->client_session_index_fifo,
466                   ecr->accepted_session_index);
467   VCL_ACCEPT_FIFO_UNLOCK ();
468
469   /* Recycling the event. */
470   VCL_EVENTS_LOCK ();
471   ev->recycle = 1;
472   clib_fifo_add1 (vcm->event_thread.event_index_fifo, reg->ev_idx);
473   VCL_EVENTS_UNLOCK ();
474 }
475
476 int
477 vppcom_session_register_ioevent_cb (uint32_t session_index,
478                                     vppcom_session_ioevent_cb cb,
479                                     uint8_t rx, void *ptr)
480 {
481   int rv = VPPCOM_OK;
482   vce_event_key_t evk;
483   vppcom_session_ioevent_t *ioevent;
484
485   if (!vcm->session_io_thread.io_sessions_lockp)
486     rv = vppcom_start_io_event_thread (&vcm->session_io_thread, 100);   /* DAW_TODO: ??? hard-coded value */
487
488   if (rv == VPPCOM_OK)
489     {
490       void *io_evt_ndx;
491
492       /* Register handler for ioevent on session_index */
493       VCL_IO_SESSIONS_LOCK ();
494       pool_get (vcm->session_io_thread.ioevents, ioevent);
495       io_evt_ndx = (void *) (ioevent - vcm->session_io_thread.ioevents);
496       ioevent->user_cb = cb;
497       ioevent->user_cb_data = ptr;
498       VCL_IO_SESSIONS_UNLOCK ();
499
500       evk.session_index = session_index;
501       evk.eid = rx ? VCL_EVENT_IOEVENT_RX_FIFO : VCL_EVENT_IOEVENT_TX_FIFO;
502
503       (void) vce_register_handler (&vcm->event_thread, &evk,
504                                    vce_registered_ioevent_handler_fn,
505                                    io_evt_ndx);
506     }
507   return rv;
508 }
509
510 int
511 vppcom_session_register_listener (uint32_t session_index,
512                                   vppcom_session_listener_cb cb,
513                                   vppcom_session_listener_errcb
514                                   errcb, uint8_t flags, int q_len, void *ptr)
515 {
516   int rv = VPPCOM_OK;
517   vce_event_key_t evk;
518   vppcom_session_listener_t *listener_args;
519
520   if (!vcm->session_io_thread.io_sessions_lockp)
521     rv = vppcom_start_io_event_thread (&vcm->session_io_thread, 100);   /* DAW_TODO: ??? hard-coded value */
522   if (rv)
523     {
524       goto done;
525     }
526   rv = vppcom_session_listen (session_index, q_len);
527   if (rv)
528     {
529       goto done;
530     }
531
532   /* Register handler for connect_request event on listen_session_index */
533   listener_args = clib_mem_alloc (sizeof (vppcom_session_listener_t));  // DAW_TODO: Use a pool instead of thrashing the memory allocator!
534   listener_args->user_cb = cb;
535   listener_args->user_cb_data = ptr;
536   listener_args->user_errcb = errcb;
537
538   evk.session_index = session_index;
539   evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
540   (void) vce_register_handler (&vcm->event_thread, &evk,
541                                vce_registered_listener_connect_handler_fn,
542                                listener_args);
543
544 done:
545   return rv;
546 }
547
548 /*
549  * fd.io coding-style-patch-verification: ON
550  *
551  * Local Variables:
552  * eval: (c-set-style "gnu")
553  * End:
554  */