vcl: add support for multi-worker apps
[vpp.git] / src / vcl / vcl_event.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vppinfra/fifo.h>
17 #include <vppinfra/pool.h>
18 #include <vppinfra/hash.h>
19 #include <vnet/api_errno.h>
20
21 #include <vcl/vppcom.h>
22 #include <vcl/vcl_event.h>
23 #include <vcl/vcl_private.h>
24
25 /**
26  * @file
27  * @brief VPP Communications Library (VCL) event handler.
28  *
29  * Definitions for generic event handling in VCL.
30  */
31
32 #define VCL_EVENTS_LOCK() \
33   clib_spinlock_lock (&(vcm->event_thread.events_lockp))
34 #define VCL_EVENTS_UNLOCK() \
35   clib_spinlock_unlock (&(vcm->event_thread.events_lockp))
36 #define VCL_IO_SESSIONS_LOCK() \
37   clib_spinlock_lock (&(vcm->session_io_thread.io_sessions_lockp))
38 #define VCL_IO_SESSIONS_UNLOCK() \
39   clib_spinlock_unlock (&(vcm->session_io_thread.io_sessions_lockp))
40
41 int
42 vce_generate_event (vce_event_thread_t * evt, u32 ev_idx)
43 {
44   int elts, rv = 0;
45   vce_event_t *p;
46
47   pthread_mutex_lock (&(evt->generator_lock));
48
49   /* Check there is event data for this event */
50
51   VCE_EVENTS_LOCK ();
52   p = pool_elt_at_index (evt->vce_events, ev_idx);
53   ASSERT (p);
54
55   elts = (int) clib_fifo_free_elts (evt->event_index_fifo);
56   if (PREDICT_TRUE (elts))
57     {
58       /* Add event to queue */
59       clib_fifo_add1 (evt->event_index_fifo, ev_idx);
60       pthread_cond_signal (&(evt->generator_cond));
61     }
62   else
63     {
64       rv = VNET_API_ERROR_QUEUE_FULL;
65     }
66
67   VCE_EVENTS_UNLOCK ();
68   pthread_mutex_unlock (&(evt->generator_lock));
69
70   return rv;
71 }
72
73 void
74 vce_clear_event (vce_event_thread_t * evt, u32 ev_idx)
75 {
76   VCE_EVENTS_LOCK ();
77   pool_put_index (evt->vce_events, ev_idx);
78   VCE_EVENTS_UNLOCK ();
79 }
80
81 vce_event_t *
82 vce_get_event_from_index (vce_event_thread_t * evt, u32 ev_idx)
83 {
84   vce_event_t *ev = 0;
85   /* Assumes caller has obtained the spinlock (evt->events_lockp) */
86
87   if (!pool_is_free_index (evt->vce_events, ev_idx))
88     ev = pool_elt_at_index (evt->vce_events, ev_idx);
89
90   return ev;
91 }
92
93 vce_event_handler_reg_t *
94 vce_get_event_handler (vce_event_thread_t * evt, vce_event_key_t * evk)
95 {
96   vce_event_handler_reg_t *handler = 0;
97   uword *p;
98
99   VCE_HANDLERS_LOCK ();
100   p = hash_get (evt->handlers_index_by_event_key, evk->as_u64);
101   if (p)
102     handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
103   VCE_HANDLERS_UNLOCK ();
104
105   return handler;
106 }
107
108 vce_event_handler_reg_t *
109 vce_register_handler (vce_event_thread_t * evt, vce_event_key_t * evk,
110                       vce_event_callback_t cb, void *cb_args)
111 {
112   vce_event_handler_reg_t *handler;
113   vce_event_handler_reg_t *old_handler = 0;
114   uword *p;
115   u32 handler_index;
116
117   /* TODO - multiple handler support. For now we can replace
118    * and re-instate, which is useful for event recycling */
119
120   VCE_HANDLERS_LOCK ();
121
122   p = hash_get (evt->handlers_index_by_event_key, evk->as_u64);
123   if (p)
124     {
125       old_handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
126       /* If we are just re-registering, ignore and move on
127        * else store the old handler_fn for unregister to re-instate */
128       if (old_handler->handler_fn == cb)
129         {
130
131           VCE_HANDLERS_UNLOCK ();
132
133           /* Signal event thread that a handler exists in case any
134            * recycled events requiring this handler are pending */
135           pthread_mutex_lock (&(evt->generator_lock));
136           pthread_cond_signal (&(evt->generator_cond));
137           pthread_mutex_unlock (&(evt->generator_lock));
138           return old_handler;
139         }
140     }
141
142   pool_get (evt->vce_event_handlers, handler);
143   handler_index = (u32) (handler - evt->vce_event_handlers);
144
145   handler->handler_fn = cb;
146   handler->replaced_handler_idx = (u32) ((p) ? p[0] : ~0);
147   handler->ev_idx = (u32) ~ 0;  //This will be set by the event thread if event happens
148   handler->evk = evk->as_u64;
149   handler->handler_fn_args = cb_args;
150
151   hash_set (evt->handlers_index_by_event_key, evk->as_u64, handler_index);
152
153   pthread_cond_init (&(handler->handler_cond), NULL);
154   pthread_mutex_init (&(handler->handler_lock), NULL);
155
156   VCE_HANDLERS_UNLOCK ();
157
158   /* Signal event thread that a new handler exists in case any
159    * recycled events requiring this handler are pending */
160   pthread_mutex_lock (&(evt->generator_lock));
161   pthread_cond_signal (&(evt->generator_cond));
162   pthread_mutex_unlock (&(evt->generator_lock));
163
164   return handler;
165 }
166
167 int
168 vce_unregister_handler (vce_event_thread_t * evt,
169                         vce_event_handler_reg_t * handler)
170 {
171   uword *p;
172   u64 evk = handler->evk;
173   u8 generate_signal = 0;
174
175   VCE_HANDLERS_LOCK ();
176
177   p = hash_get (evt->handlers_index_by_event_key, evk);
178   if (!p)
179     {
180       VCE_HANDLERS_UNLOCK ();
181       return VNET_API_ERROR_NO_SUCH_ENTRY;
182     }
183
184   handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
185
186   /* If this handler replaced another handler, re-instate it */
187   if (handler->replaced_handler_idx != ~0)
188     {
189       hash_set (evt->handlers_index_by_event_key, evk,
190                 handler->replaced_handler_idx);
191       generate_signal = 1;
192     }
193   else
194     {
195       hash_unset (evt->handlers_index_by_event_key, evk);
196     }
197
198   pthread_mutex_destroy (&(handler->handler_lock));
199   pthread_cond_destroy (&(handler->handler_cond));
200   pool_put (evt->vce_event_handlers, handler);
201
202   VCE_HANDLERS_UNLOCK ();
203
204   if (generate_signal)
205     {
206       /* Signal event thread that a new handler exists in case any
207        * recycled events requiring this handler are pending */
208       pthread_mutex_lock (&(evt->generator_lock));
209       pthread_cond_signal (&(evt->generator_cond));
210       pthread_mutex_unlock (&(evt->generator_lock));
211     }
212
213   return 0;
214 }
215
216 void *
217 vce_event_thread_fn (void *arg)
218 {
219   vce_event_thread_t *evt = (vce_event_thread_t *) arg;
220   vce_event_t *ev;
221   u32 ev_idx;
222   vce_event_handler_reg_t *handler;
223   uword *p;
224   u32 recycle_count = 0;
225
226   pthread_mutex_lock (&(evt->generator_lock));
227   while (1)
228     {
229       uword fifo_depth = clib_fifo_elts (evt->event_index_fifo);
230       while ((fifo_depth == 0) || (recycle_count == fifo_depth))
231         {
232           recycle_count = 0;
233           pthread_cond_wait (&(evt->generator_cond), &(evt->generator_lock));
234           fifo_depth = clib_fifo_elts (evt->event_index_fifo);
235         }
236
237       /* Remove event */
238       VCE_EVENTS_LOCK ();
239       clib_fifo_sub1 (evt->event_index_fifo, ev_idx);
240       ev = vce_get_event_from_index (evt, ev_idx);
241       ASSERT (ev);
242       if (recycle_count && ev->recycle)
243         {
244           clib_fifo_add1 (evt->event_index_fifo, ev_idx);
245           VCE_EVENTS_UNLOCK ();
246           continue;
247         }
248       VCE_HANDLERS_LOCK ();
249
250       p = hash_get (evt->handlers_index_by_event_key, ev->evk.as_u64);
251       if (!p)
252         {
253           /* If an event falls in the woods, and there is no handler to hear it,
254            * does it make any sound?
255            * I don't know either, so lets biff the event */
256           pool_put (evt->vce_events, ev);
257           VCE_EVENTS_UNLOCK ();
258           VCE_HANDLERS_UNLOCK ();
259           pthread_mutex_unlock (&(evt->generator_lock));
260         }
261       else
262         {
263           u32 evt_recycle = ev->recycle;
264           handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
265           handler->ev_idx = ev_idx;
266           ev->recycle = 0;
267
268           VCE_EVENTS_UNLOCK ();
269           VCE_HANDLERS_UNLOCK ();
270           pthread_mutex_unlock (&(evt->generator_lock));
271
272           (handler->handler_fn) (handler);
273
274           VCE_EVENTS_LOCK ();
275           ev = vce_get_event_from_index (evt, ev_idx);
276           recycle_count += (!evt_recycle && ev && ev->recycle) ? 1 : 0;
277           VCE_EVENTS_UNLOCK ();
278         }
279
280       pthread_mutex_lock (&(evt->generator_lock));
281     }
282   return NULL;
283 }
284
285 int
286 vce_start_event_thread (vce_event_thread_t * evt, u8 max_events)
287 {
288   clib_fifo_validate (evt->event_index_fifo, max_events);
289   evt->handlers_index_by_event_key = hash_create (0, sizeof (uword));
290
291   pthread_cond_init (&(evt->generator_cond), NULL);
292   pthread_mutex_init (&(evt->generator_lock), NULL);
293
294   clib_spinlock_init (&(evt->events_lockp));
295   clib_spinlock_init (&(evt->handlers_lockp));
296
297   return pthread_create (&(evt->thread), NULL /* attr */ ,
298                          vce_event_thread_fn, evt);
299 }
300
301 static void *
302 vppcom_session_io_thread_fn (void *arg)
303 {
304   vppcom_session_io_thread_t *evt = (vppcom_session_io_thread_t *) arg;
305   u32 *session_indexes = 0, *session_index;
306   int i;
307   u32 bytes = 0;
308   vcl_session_t *session;
309
310   while (1)
311     {
312       vec_reset_length (session_indexes);
313       VCE_IO_SESSIONS_LOCK ();
314       /* *INDENT-OFF* */
315       pool_foreach (session_index, evt->active_session_indexes, ({
316         vec_add1 (session_indexes, *session_index);
317       }));
318       /* *INDENT-ON* */
319       VCE_IO_SESSIONS_UNLOCK ();
320       if (session_indexes)
321         {
322           for (i = 0; i < vec_len (session_indexes); ++i)
323             {
324               session = vcl_session_get (0, session_indexes[i]);
325               if (!session)
326                 return NULL;
327               bytes = svm_fifo_max_dequeue (session->rx_fifo);
328
329               if (bytes)
330                 {
331                   vppcom_ioevent_t *eio;
332                   vce_event_t *ev;
333                   u32 ev_idx;
334
335                   VCL_EVENTS_LOCK ();
336
337                   pool_get (vcm->event_thread.vce_events, ev);
338                   ev_idx = (u32) (ev - vcm->event_thread.vce_events);
339                   eio = vce_get_event_data (ev, sizeof (*eio));
340                   ev->evk.eid = VCL_EVENT_IOEVENT_RX_FIFO;
341                   ev->evk.session_index = session_indexes[i];
342                   eio->bytes = bytes;
343                   eio->session_index = session_indexes[i];
344
345                   VCL_EVENTS_UNLOCK ();
346
347                   vce_generate_event (&vcm->event_thread, ev_idx);
348                 }
349             }
350         }
351       struct timespec ts;
352       ts.tv_sec = 0;
353       ts.tv_nsec = 1000000;     /* 1 millisecond */
354       nanosleep (&ts, NULL);
355     }
356   return NULL;
357 }
358
359 static int
360 vppcom_start_io_event_thread (vppcom_session_io_thread_t * evt,
361                               u8 max_sessions)
362 {
363   pthread_cond_init (&(evt->vce_io_cond), NULL);
364   pthread_mutex_init (&(evt->vce_io_lock), NULL);
365
366   clib_spinlock_init (&(evt->io_sessions_lockp));
367
368   return pthread_create (&(evt->thread), NULL /* attr */ ,
369                          vppcom_session_io_thread_fn, evt);
370 }
371
372 static void
373 vce_registered_ioevent_handler_fn (void *arg)
374 {
375   vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
376   vppcom_ioevent_t *eio;
377   vce_event_t *ev;
378   u32 ioevt_ndx = (u64) (reg->handler_fn_args);
379   vppcom_session_ioevent_t *ioevent, ioevent_;
380
381   VCL_EVENTS_LOCK ();
382   ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
383   eio = vce_get_event_data (ev, sizeof (*eio));
384   VCL_EVENTS_UNLOCK ();
385
386   VCL_IO_SESSIONS_LOCK ();
387   ioevent = pool_elt_at_index (vcm->session_io_thread.ioevents, ioevt_ndx);
388   ioevent_ = *ioevent;
389   VCL_IO_SESSIONS_UNLOCK ();
390   (ioevent_.user_cb) (eio, ioevent_.user_cb_data);
391   vce_clear_event (&vcm->event_thread, reg->ev_idx);
392   return;
393
394   /*TODO - Unregister check in close for this listener */
395
396 }
397
398 void
399 vce_registered_listener_connect_handler_fn (void *arg)
400 {
401   vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
402   vce_event_connect_request_t *ecr;
403   vce_event_t *ev;
404   vppcom_endpt_t ep;
405   vcl_session_t *new_session;
406
407   vppcom_session_listener_t *session_listener =
408     (vppcom_session_listener_t *) reg->handler_fn_args;
409
410   VCL_EVENTS_LOCK ();
411   ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
412   ecr = vce_get_event_data (ev, sizeof (*ecr));
413   VCL_EVENTS_UNLOCK ();
414   new_session = vcl_session_get (0, ecr->accepted_session_index);
415   if (!new_session)
416     return;
417
418   ep.is_ip4 = new_session->transport.is_ip4;
419   ep.port = new_session->transport.rmt_port;
420   if (new_session->transport.is_ip4)
421     clib_memcpy (&ep.ip, &new_session->transport.rmt_ip.ip4,
422                  sizeof (ip4_address_t));
423   else
424     clib_memcpy (&ep.ip, &new_session->transport.rmt_ip.ip6,
425                  sizeof (ip6_address_t));
426
427   vppcom_send_accept_session_reply (new_session->vpp_handle,
428                                     new_session->client_context,
429                                     0 /* retval OK */ );
430
431   (session_listener->user_cb) (ecr->accepted_session_index, &ep,
432                                session_listener->user_cb_data);
433
434   if (vcm->session_io_thread.io_sessions_lockp)
435     {
436       /* Throw this new accepted session index into the rx poll thread pool */
437       VCL_IO_SESSIONS_LOCK ();
438       u32 *active_session_index;
439       pool_get (vcm->session_io_thread.active_session_indexes,
440                 active_session_index);
441       *active_session_index = ecr->accepted_session_index;
442       VCL_IO_SESSIONS_UNLOCK ();
443     }
444
445   /*TODO - Unregister check in close for this listener */
446   return;
447
448   ASSERT (0);                   // If we can't get a lock or accepted session fails, lets blow up.
449 }
450
451 /**
452  * @brief vce_poll_wait_connect_request_handler_fn
453  * - used by vppcom_epoll_xxxx() for listener sessions
454  * - when a vl_api_accept_session_t_handler() generates an event
455  *   this callback is alerted and sets the fields that vppcom_epoll_wait()
456  *   expects to see.
457  *
458  * @param arg - void* to be cast to vce_event_handler_reg_t*
459  */
460 void
461 vce_poll_wait_connect_request_handler_fn (void *arg)
462 {
463   vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
464   vce_event_t *ev;
465   /* Retrieve the VCL_EVENT_CONNECT_REQ_ACCEPTED event */
466   ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
467   vce_event_connect_request_t *ecr = vce_get_event_data (ev, sizeof (*ecr));
468
469   /* Recycling the event. */
470   VCL_EVENTS_LOCK ();
471   ev->recycle = 1;
472   clib_fifo_add1 (vcm->event_thread.event_index_fifo, reg->ev_idx);
473   VCL_EVENTS_UNLOCK ();
474 }
475
476 int
477 vppcom_session_register_ioevent_cb (uint32_t session_index,
478                                     vppcom_session_ioevent_cb cb,
479                                     uint8_t rx, void *ptr)
480 {
481   int rv = VPPCOM_OK;
482   vce_event_key_t evk;
483   vppcom_session_ioevent_t *ioevent;
484
485   if (!vcm->session_io_thread.io_sessions_lockp)
486     rv = vppcom_start_io_event_thread (&vcm->session_io_thread, 100);   /* DAW_TODO: ??? hard-coded value */
487
488   if (rv == VPPCOM_OK)
489     {
490       void *io_evt_ndx;
491
492       /* Register handler for ioevent on session_index */
493       VCL_IO_SESSIONS_LOCK ();
494       pool_get (vcm->session_io_thread.ioevents, ioevent);
495       io_evt_ndx = (void *) (ioevent - vcm->session_io_thread.ioevents);
496       ioevent->user_cb = cb;
497       ioevent->user_cb_data = ptr;
498       VCL_IO_SESSIONS_UNLOCK ();
499
500       evk.session_index = session_index;
501       evk.eid = rx ? VCL_EVENT_IOEVENT_RX_FIFO : VCL_EVENT_IOEVENT_TX_FIFO;
502
503       (void) vce_register_handler (&vcm->event_thread, &evk,
504                                    vce_registered_ioevent_handler_fn,
505                                    io_evt_ndx);
506     }
507   return rv;
508 }
509
510 int
511 vppcom_session_register_listener (uint32_t session_index,
512                                   vppcom_session_listener_cb cb,
513                                   vppcom_session_listener_errcb
514                                   errcb, uint8_t flags, int q_len, void *ptr)
515 {
516   int rv = VPPCOM_OK;
517   vce_event_key_t evk;
518   vppcom_session_listener_t *listener_args;
519
520   if (!vcm->session_io_thread.io_sessions_lockp)
521     rv = vppcom_start_io_event_thread (&vcm->session_io_thread, 100);   /* DAW_TODO: ??? hard-coded value */
522   if (rv)
523     {
524       goto done;
525     }
526   rv = vppcom_session_listen (session_index, q_len);
527   if (rv)
528     {
529       goto done;
530     }
531
532   /* Register handler for connect_request event on listen_session_index */
533   listener_args = clib_mem_alloc (sizeof (vppcom_session_listener_t));  // DAW_TODO: Use a pool instead of thrashing the memory allocator!
534   listener_args->user_cb = cb;
535   listener_args->user_cb_data = ptr;
536   listener_args->user_errcb = errcb;
537
538   evk.session_index = session_index;
539   evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
540   (void) vce_register_handler (&vcm->event_thread, &evk,
541                                vce_registered_listener_connect_handler_fn,
542                                listener_args);
543
544 done:
545   return rv;
546 }
547
548 /*
549  * fd.io coding-style-patch-verification: ON
550  *
551  * Local Variables:
552  * eval: (c-set-style "gnu")
553  * End:
554  */