vcl: support inter worker rpc
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <vcl/vppcom.h>
19 #include <vcl/vcl_debug.h>
20 #include <vcl/vcl_private.h>
21 #include <svm/fifo_segment.h>
22
23 __thread uword __vcl_worker_index = ~0;
24
25 static int
26 vcl_segment_is_not_mounted (vcl_worker_t * wrk, u64 segment_handle)
27 {
28   u32 segment_index;
29
30   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
31     return 0;
32
33   segment_index = vcl_segment_table_lookup (segment_handle);
34   if (segment_index != VCL_INVALID_SEGMENT_INDEX)
35     return 0;
36
37   return 1;
38 }
39
40 static inline int
41 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
42 {
43   svm_msg_q_msg_t *msg;
44   u32 n_msgs;
45   int i;
46
47   n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
48   for (i = 0; i < n_msgs; i++)
49     {
50       vec_add2 (wrk->mq_msg_vector, msg, 1);
51       svm_msg_q_sub_w_lock (mq, msg);
52     }
53   return n_msgs;
54 }
55
56 const char *
57 vppcom_session_state_str (vcl_session_state_t state)
58 {
59   char *st;
60
61   switch (state)
62     {
63     case STATE_CLOSED:
64       st = "STATE_CLOSED";
65       break;
66
67     case STATE_CONNECT:
68       st = "STATE_CONNECT";
69       break;
70
71     case STATE_LISTEN:
72       st = "STATE_LISTEN";
73       break;
74
75     case STATE_ACCEPT:
76       st = "STATE_ACCEPT";
77       break;
78
79     case STATE_VPP_CLOSING:
80       st = "STATE_VPP_CLOSING";
81       break;
82
83     case STATE_DISCONNECT:
84       st = "STATE_DISCONNECT";
85       break;
86
87     case STATE_DETACHED:
88       st = "STATE_DETACHED";
89       break;
90
91     case STATE_UPDATED:
92       st = "STATE_UPDATED";
93       break;
94
95     case STATE_LISTEN_NO_MQ:
96       st = "STATE_LISTEN_NO_MQ";
97       break;
98
99     default:
100       st = "UNKNOWN_STATE";
101       break;
102     }
103
104   return st;
105 }
106
107 u8 *
108 format_ip4_address (u8 * s, va_list * args)
109 {
110   u8 *a = va_arg (*args, u8 *);
111   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
112 }
113
114 u8 *
115 format_ip6_address (u8 * s, va_list * args)
116 {
117   ip6_address_t *a = va_arg (*args, ip6_address_t *);
118   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
119
120   i_max_n_zero = ARRAY_LEN (a->as_u16);
121   max_n_zeros = 0;
122   i_first_zero = i_max_n_zero;
123   n_zeros = 0;
124   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
125     {
126       u32 is_zero = a->as_u16[i] == 0;
127       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
128         {
129           i_first_zero = i;
130           n_zeros = 0;
131         }
132       n_zeros += is_zero;
133       if ((!is_zero && n_zeros > max_n_zeros)
134           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
135         {
136           i_max_n_zero = i_first_zero;
137           max_n_zeros = n_zeros;
138           i_first_zero = ARRAY_LEN (a->as_u16);
139           n_zeros = 0;
140         }
141     }
142
143   last_double_colon = 0;
144   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
145     {
146       if (i == i_max_n_zero && max_n_zeros > 1)
147         {
148           s = format (s, "::");
149           i += max_n_zeros - 1;
150           last_double_colon = 1;
151         }
152       else
153         {
154           s = format (s, "%s%x",
155                       (last_double_colon || i == 0) ? "" : ":",
156                       clib_net_to_host_u16 (a->as_u16[i]));
157           last_double_colon = 0;
158         }
159     }
160
161   return s;
162 }
163
164 /* Format an IP46 address. */
165 u8 *
166 format_ip46_address (u8 * s, va_list * args)
167 {
168   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
169   ip46_type_t type = va_arg (*args, ip46_type_t);
170   int is_ip4 = 1;
171
172   switch (type)
173     {
174     case IP46_TYPE_ANY:
175       is_ip4 = ip46_address_is_ip4 (ip46);
176       break;
177     case IP46_TYPE_IP4:
178       is_ip4 = 1;
179       break;
180     case IP46_TYPE_IP6:
181       is_ip4 = 0;
182       break;
183     }
184
185   return is_ip4 ?
186     format (s, "%U", format_ip4_address, &ip46->ip4) :
187     format (s, "%U", format_ip6_address, &ip46->ip6);
188 }
189
190 /*
191  * VPPCOM Utility Functions
192  */
193
194 static void
195 vcl_send_session_listen (vcl_worker_t * wrk, vcl_session_t * s)
196 {
197   app_session_evt_t _app_evt, *app_evt = &_app_evt;
198   session_listen_msg_t *mp;
199   svm_msg_q_t *mq;
200
201   mq = vcl_worker_ctrl_mq (wrk);
202   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_LISTEN);
203   mp = (session_listen_msg_t *) app_evt->evt->data;
204   memset (mp, 0, sizeof (*mp));
205   mp->client_index = wrk->my_client_index;
206   mp->context = s->session_index;
207   mp->wrk_index = wrk->vpp_wrk_index;
208   mp->is_ip4 = s->transport.is_ip4;
209   clib_memcpy_fast (&mp->ip, &s->transport.lcl_ip, sizeof (mp->ip));
210   mp->port = s->transport.lcl_port;
211   mp->proto = s->session_type;
212   if (s->flags & VCL_SESSION_F_CONNECTED)
213     mp->flags = TRANSPORT_CFG_F_CONNECTED;
214   app_send_ctrl_evt_to_vpp (mq, app_evt);
215 }
216
217 static void
218 vcl_send_session_connect (vcl_worker_t * wrk, vcl_session_t * s)
219 {
220   app_session_evt_t _app_evt, *app_evt = &_app_evt;
221   session_connect_msg_t *mp;
222   svm_msg_q_t *mq;
223
224   mq = vcl_worker_ctrl_mq (wrk);
225   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_CONNECT);
226   mp = (session_connect_msg_t *) app_evt->evt->data;
227   memset (mp, 0, sizeof (*mp));
228   mp->client_index = wrk->my_client_index;
229   mp->context = s->session_index;
230   mp->wrk_index = wrk->vpp_wrk_index;
231   mp->is_ip4 = s->transport.is_ip4;
232   mp->parent_handle = s->parent_handle;
233   clib_memcpy_fast (&mp->ip, &s->transport.rmt_ip, sizeof (mp->ip));
234   clib_memcpy_fast (&mp->lcl_ip, &s->transport.lcl_ip, sizeof (mp->lcl_ip));
235   mp->port = s->transport.rmt_port;
236   mp->lcl_port = s->transport.lcl_port;
237   mp->proto = s->session_type;
238   if (s->flags & VCL_SESSION_F_CONNECTED)
239     mp->flags |= TRANSPORT_CFG_F_CONNECTED;
240   app_send_ctrl_evt_to_vpp (mq, app_evt);
241 }
242
243 void
244 vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
245 {
246   app_session_evt_t _app_evt, *app_evt = &_app_evt;
247   session_unlisten_msg_t *mp;
248   svm_msg_q_t *mq;
249
250   mq = vcl_worker_ctrl_mq (wrk);
251   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_UNLISTEN);
252   mp = (session_unlisten_msg_t *) app_evt->evt->data;
253   memset (mp, 0, sizeof (*mp));
254   mp->client_index = wrk->my_client_index;
255   mp->wrk_index = wrk->vpp_wrk_index;
256   mp->handle = s->vpp_handle;
257   mp->context = wrk->wrk_index;
258   app_send_ctrl_evt_to_vpp (mq, app_evt);
259 }
260
261 static void
262 vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s)
263 {
264   app_session_evt_t _app_evt, *app_evt = &_app_evt;
265   session_disconnect_msg_t *mp;
266   svm_msg_q_t *mq;
267
268   /* Send to thread that owns the session */
269   mq = s->vpp_evt_q;
270   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_DISCONNECT);
271   mp = (session_disconnect_msg_t *) app_evt->evt->data;
272   memset (mp, 0, sizeof (*mp));
273   mp->client_index = wrk->my_client_index;
274   mp->handle = s->vpp_handle;
275   app_send_ctrl_evt_to_vpp (mq, app_evt);
276 }
277
278 static void
279 vcl_send_app_detach (vcl_worker_t * wrk)
280 {
281   app_session_evt_t _app_evt, *app_evt = &_app_evt;
282   session_app_detach_msg_t *mp;
283   svm_msg_q_t *mq;
284
285   mq = vcl_worker_ctrl_mq (wrk);
286   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_DETACH);
287   mp = (session_app_detach_msg_t *) app_evt->evt->data;
288   memset (mp, 0, sizeof (*mp));
289   mp->client_index = wrk->my_client_index;
290   app_send_ctrl_evt_to_vpp (mq, app_evt);
291 }
292
293 static void
294 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
295                                  session_handle_t handle, int retval)
296 {
297   app_session_evt_t _app_evt, *app_evt = &_app_evt;
298   session_accepted_reply_msg_t *rmp;
299   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
300   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
301   rmp->handle = handle;
302   rmp->context = context;
303   rmp->retval = retval;
304   app_send_ctrl_evt_to_vpp (mq, app_evt);
305 }
306
307 static void
308 vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
309                                      session_handle_t handle, int retval)
310 {
311   app_session_evt_t _app_evt, *app_evt = &_app_evt;
312   session_disconnected_reply_msg_t *rmp;
313   app_alloc_ctrl_evt_to_vpp (mq, app_evt,
314                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
315   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
316   rmp->handle = handle;
317   rmp->context = context;
318   rmp->retval = retval;
319   app_send_ctrl_evt_to_vpp (mq, app_evt);
320 }
321
322 static void
323 vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
324                               session_handle_t handle, int retval)
325 {
326   app_session_evt_t _app_evt, *app_evt = &_app_evt;
327   session_reset_reply_msg_t *rmp;
328   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_RESET_REPLY);
329   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
330   rmp->handle = handle;
331   rmp->context = context;
332   rmp->retval = retval;
333   app_send_ctrl_evt_to_vpp (mq, app_evt);
334 }
335
336 void
337 vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
338                                 u32 wrk_index)
339 {
340   app_session_evt_t _app_evt, *app_evt = &_app_evt;
341   session_worker_update_msg_t *mp;
342   svm_msg_q_t *mq;
343
344   mq = vcl_session_vpp_evt_q (wrk, s);
345   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_WORKER_UPDATE);
346   mp = (session_worker_update_msg_t *) app_evt->evt->data;
347   mp->client_index = wrk->my_client_index;
348   mp->handle = s->vpp_handle;
349   mp->req_wrk_index = wrk->vpp_wrk_index;
350   mp->wrk_index = wrk_index;
351   app_send_ctrl_evt_to_vpp (mq, app_evt);
352 }
353
354 void
355 vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len)
356 {
357   app_session_evt_t _app_evt, *app_evt = &_app_evt;
358   session_app_wrk_rpc_msg_t *mp;
359   vcl_worker_t *dst_wrk, *wrk;
360   svm_msg_q_t *mq;
361
362   if (data_len > sizeof (mp->data))
363     goto done;
364
365   clib_spinlock_lock (&vcm->workers_lock);
366
367   dst_wrk = vcl_worker_get_if_valid (dst_wrk_index);
368   if (!dst_wrk)
369     goto done;
370
371   wrk = vcl_worker_get_current ();
372   mq = vcl_worker_ctrl_mq (wrk);
373   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_WRK_RPC);
374   mp = (session_app_wrk_rpc_msg_t *) app_evt->evt->data;
375   mp->client_index = wrk->my_client_index;
376   mp->wrk_index = dst_wrk->vpp_wrk_index;
377   clib_memcpy (mp->data, data, data_len);
378   app_send_ctrl_evt_to_vpp (mq, app_evt);
379
380 done:
381   clib_spinlock_unlock (&vcm->workers_lock);
382 }
383
384 static u32
385 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
386                               u32 ls_index)
387 {
388   vcl_session_t *session, *listen_session;
389   svm_fifo_t *rx_fifo, *tx_fifo;
390   u32 vpp_wrk_index;
391   svm_msg_q_t *evt_q;
392
393   session = vcl_session_alloc (wrk);
394
395   listen_session = vcl_session_get (wrk, ls_index);
396   if (listen_session->vpp_handle != mp->listener_handle)
397     {
398       VDBG (0, "ERROR: listener handle %lu does not match session %u",
399             mp->listener_handle, ls_index);
400       goto error;
401     }
402
403   if (vcl_segment_is_not_mounted (wrk, mp->segment_handle))
404     {
405       VDBG (0, "ERROR: segment for session %u is not mounted!",
406             session->session_index);
407       goto error;
408     }
409
410   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
411   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
412   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
413                                          svm_msg_q_t *);
414   rx_fifo->client_session_index = session->session_index;
415   tx_fifo->client_session_index = session->session_index;
416   rx_fifo->client_thread_index = vcl_get_worker_index ();
417   tx_fifo->client_thread_index = vcl_get_worker_index ();
418   vpp_wrk_index = tx_fifo->master_thread_index;
419   vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
420   wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
421
422   session->vpp_handle = mp->handle;
423   session->vpp_thread_index = rx_fifo->master_thread_index;
424   session->rx_fifo = rx_fifo;
425   session->tx_fifo = tx_fifo;
426
427   session->session_state = STATE_ACCEPT;
428   session->transport.rmt_port = mp->rmt.port;
429   session->transport.is_ip4 = mp->rmt.is_ip4;
430   clib_memcpy_fast (&session->transport.rmt_ip, &mp->rmt.ip,
431                     sizeof (ip46_address_t));
432
433   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
434   session->transport.lcl_port = listen_session->transport.lcl_port;
435   session->transport.lcl_ip = listen_session->transport.lcl_ip;
436   session->session_type = listen_session->session_type;
437   session->is_dgram = vcl_proto_is_dgram (session->session_type);
438   session->listener_index = listen_session->session_index;
439   listen_session->n_accepted_sessions++;
440
441   VDBG (1, "session %u [0x%llx]: client accept request from %s address %U"
442         " port %d queue %p!", session->session_index, mp->handle,
443         mp->rmt.is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->rmt.ip,
444         mp->rmt.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
445         clib_net_to_host_u16 (mp->rmt.port), session->vpp_evt_q);
446   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
447
448   vcl_send_session_accepted_reply (session->vpp_evt_q, mp->context,
449                                    session->vpp_handle, 0);
450
451   return session->session_index;
452
453 error:
454   evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
455   vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
456                                    VNET_API_ERROR_INVALID_ARGUMENT);
457   vcl_session_free (wrk, session);
458   return VCL_INVALID_SESSION_INDEX;
459 }
460
461 static u32
462 vcl_session_connected_handler (vcl_worker_t * wrk,
463                                session_connected_msg_t * mp)
464 {
465   u32 session_index, vpp_wrk_index;
466   svm_fifo_t *rx_fifo, *tx_fifo;
467   vcl_session_t *session = 0;
468
469   session_index = mp->context;
470   session = vcl_session_get (wrk, session_index);
471   if (!session)
472     {
473       VDBG (0, "ERROR: vpp handle 0x%llx has no session index (%u)!",
474             mp->handle, session_index);
475       return VCL_INVALID_SESSION_INDEX;
476     }
477   if (mp->retval)
478     {
479       VDBG (0, "ERROR: session index %u: connect failed! %U",
480             session_index, format_session_error, mp->retval);
481       session->session_state = STATE_DETACHED | STATE_DISCONNECT;
482       session->vpp_handle = mp->handle;
483       return session_index;
484     }
485
486   session->vpp_handle = mp->handle;
487   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
488                                          svm_msg_q_t *);
489   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
490   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
491   if (vcl_segment_is_not_mounted (wrk, mp->segment_handle))
492     {
493       VDBG (0, "segment for session %u is not mounted!",
494             session->session_index);
495       session->session_state = STATE_DETACHED | STATE_DISCONNECT;
496       vcl_send_session_disconnect (wrk, session);
497       return session_index;
498     }
499
500   rx_fifo->client_session_index = session_index;
501   tx_fifo->client_session_index = session_index;
502   rx_fifo->client_thread_index = vcl_get_worker_index ();
503   tx_fifo->client_thread_index = vcl_get_worker_index ();
504
505   vpp_wrk_index = tx_fifo->master_thread_index;
506   vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
507   wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
508
509   if (mp->ct_rx_fifo)
510     {
511       session->ct_rx_fifo = uword_to_pointer (mp->ct_rx_fifo, svm_fifo_t *);
512       session->ct_tx_fifo = uword_to_pointer (mp->ct_tx_fifo, svm_fifo_t *);
513       if (vcl_segment_is_not_mounted (wrk, mp->ct_segment_handle))
514         {
515           VDBG (0, "ct segment for session %u is not mounted!",
516                 session->session_index);
517           session->session_state = STATE_DETACHED | STATE_DISCONNECT;
518           vcl_send_session_disconnect (wrk, session);
519           return session_index;
520         }
521     }
522
523   session->rx_fifo = rx_fifo;
524   session->tx_fifo = tx_fifo;
525   session->vpp_thread_index = rx_fifo->master_thread_index;
526   session->transport.is_ip4 = mp->lcl.is_ip4;
527   clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip,
528                     sizeof (session->transport.lcl_ip));
529   session->transport.lcl_port = mp->lcl.port;
530   session->session_state = STATE_CONNECT;
531
532   /* Add it to lookup table */
533   vcl_session_table_add_vpp_handle (wrk, mp->handle, session_index);
534
535   VDBG (1, "session %u [0x%llx] connected! rx_fifo %p, refcnt %d, tx_fifo %p,"
536         " refcnt %d", session_index, mp->handle, session->rx_fifo,
537         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
538
539   return session_index;
540 }
541
542 static int
543 vcl_flag_accepted_session (vcl_session_t * session, u64 handle, u32 flags)
544 {
545   vcl_session_msg_t *accepted_msg;
546   int i;
547
548   for (i = 0; i < vec_len (session->accept_evts_fifo); i++)
549     {
550       accepted_msg = &session->accept_evts_fifo[i];
551       if (accepted_msg->accepted_msg.handle == handle)
552         {
553           accepted_msg->flags |= flags;
554           return 1;
555         }
556     }
557   return 0;
558 }
559
560 static u32
561 vcl_session_reset_handler (vcl_worker_t * wrk,
562                            session_reset_msg_t * reset_msg)
563 {
564   vcl_session_t *session;
565   u32 sid;
566
567   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
568   session = vcl_session_get (wrk, sid);
569   if (!session)
570     {
571       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
572       return VCL_INVALID_SESSION_INDEX;
573     }
574
575   /* Caught a reset before actually accepting the session */
576   if (session->session_state == STATE_LISTEN)
577     {
578
579       if (!vcl_flag_accepted_session (session, reset_msg->handle,
580                                       VCL_ACCEPTED_F_RESET))
581         VDBG (0, "session was not accepted!");
582       return VCL_INVALID_SESSION_INDEX;
583     }
584
585   if (session->session_state != STATE_CLOSED)
586     session->session_state = STATE_DISCONNECT;
587   VDBG (0, "reset session %u [0x%llx]", sid, reset_msg->handle);
588   return sid;
589 }
590
591 static u32
592 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
593 {
594   vcl_session_t *session;
595   u32 sid = mp->context;
596
597   session = vcl_session_get (wrk, sid);
598   if (mp->retval)
599     {
600       VERR ("session %u [0x%llx]: bind failed: %U", sid, mp->handle,
601             format_session_error, mp->retval);
602       if (session)
603         {
604           session->session_state = STATE_DETACHED;
605           session->vpp_handle = mp->handle;
606           return sid;
607         }
608       else
609         {
610           VDBG (0, "ERROR: session %u [0x%llx]: Invalid session index!",
611                 sid, mp->handle);
612           return VCL_INVALID_SESSION_INDEX;
613         }
614     }
615
616   session->vpp_handle = mp->handle;
617   session->transport.is_ip4 = mp->lcl_is_ip4;
618   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
619                     sizeof (ip46_address_t));
620   session->transport.lcl_port = mp->lcl_port;
621   vcl_session_table_add_listener (wrk, mp->handle, sid);
622   session->session_state = STATE_LISTEN;
623
624   session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
625   vec_validate (wrk->vpp_event_queues, 0);
626   wrk->vpp_event_queues[0] = session->vpp_evt_q;
627
628   if (vcl_session_is_cl (session))
629     {
630       svm_fifo_t *rx_fifo, *tx_fifo;
631       session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
632       rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
633       rx_fifo->client_session_index = sid;
634       tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
635       tx_fifo->client_session_index = sid;
636       session->rx_fifo = rx_fifo;
637       session->tx_fifo = tx_fifo;
638     }
639
640   VDBG (0, "session %u [0x%llx]: listen succeeded!", sid, mp->handle);
641   return sid;
642 }
643
644 static void
645 vcl_session_unlisten_reply_handler (vcl_worker_t * wrk, void *data)
646 {
647   session_unlisten_reply_msg_t *mp = (session_unlisten_reply_msg_t *) data;
648   vcl_session_t *s;
649
650   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
651   if (!s)
652     {
653       VDBG (0, "Unlisten reply with wrong handle %llx", mp->handle);
654       return;
655     }
656   if (s->session_state != STATE_DISCONNECT)
657     {
658       /* Connected udp listener */
659       if (s->session_type == VPPCOM_PROTO_UDP
660           && s->session_state == STATE_CLOSED)
661         return;
662
663       VDBG (0, "Unlisten session in wrong state %llx", mp->handle);
664       return;
665     }
666
667   if (mp->retval)
668     VDBG (0, "ERROR: session %u [0xllx]: unlisten failed: %U",
669           s->session_index, mp->handle, format_session_error, mp->retval);
670
671   if (mp->context != wrk->wrk_index)
672     VDBG (0, "wrong context");
673
674   vcl_session_table_del_vpp_handle (wrk, mp->handle);
675   vcl_session_free (wrk, s);
676 }
677
678 static void
679 vcl_session_migrated_handler (vcl_worker_t * wrk, void *data)
680 {
681   session_migrated_msg_t *mp = (session_migrated_msg_t *) data;
682   vcl_session_t *s;
683
684   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
685   if (!s)
686     {
687       VDBG (0, "Migrated notification with wrong handle %llx", mp->handle);
688       return;
689     }
690
691   s->vpp_thread_index = mp->vpp_thread_index;
692   s->vpp_handle = mp->new_handle;
693   s->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
694
695   vec_validate (wrk->vpp_event_queues, s->vpp_thread_index);
696   wrk->vpp_event_queues[s->vpp_thread_index] = s->vpp_evt_q;
697
698   vcl_session_table_del_vpp_handle (wrk, mp->handle);
699   vcl_session_table_add_vpp_handle (wrk, mp->new_handle, s->session_index);
700
701   /* Generate new tx event if we have outstanding data */
702   if (svm_fifo_has_event (s->tx_fifo))
703     app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo->master_session_index,
704                             SESSION_IO_EVT_TX, SVM_Q_WAIT);
705
706   VDBG (0, "Migrated 0x%lx to thread %u 0x%lx", mp->handle,
707         s->vpp_thread_index, mp->new_handle);
708 }
709
710 static vcl_session_t *
711 vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg)
712 {
713   vcl_session_msg_t *vcl_msg;
714   vcl_session_t *session;
715
716   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
717   if (PREDICT_FALSE (session != 0))
718     VWRN ("session overlap handle %lu state %u!", msg->handle,
719           session->session_state);
720
721   session = vcl_session_table_lookup_listener (wrk, msg->listener_handle);
722   if (!session)
723     {
724       VERR ("couldn't find listen session: listener handle %llx",
725             msg->listener_handle);
726       return 0;
727     }
728
729   clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
730   vcl_msg->flags = 0;
731   vcl_msg->accepted_msg = *msg;
732   /* Session handle points to listener until fully accepted by app */
733   vcl_session_table_add_vpp_handle (wrk, msg->handle, session->session_index);
734
735   return session;
736 }
737
738 static vcl_session_t *
739 vcl_session_disconnected_handler (vcl_worker_t * wrk,
740                                   session_disconnected_msg_t * msg)
741 {
742   vcl_session_t *session;
743
744   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
745   if (!session)
746     {
747       VDBG (0, "request to disconnect unknown handle 0x%llx", msg->handle);
748       return 0;
749     }
750
751   /* Late disconnect notification on a session that has been closed */
752   if (session->session_state == STATE_CLOSED)
753     return 0;
754
755   /* Caught a disconnect before actually accepting the session */
756   if (session->session_state == STATE_LISTEN)
757     {
758       if (!vcl_flag_accepted_session (session, msg->handle,
759                                       VCL_ACCEPTED_F_CLOSED))
760         VDBG (0, "session was not accepted!");
761       return 0;
762     }
763
764   /* If not already reset change state */
765   if (session->session_state != STATE_DISCONNECT)
766     session->session_state = STATE_VPP_CLOSING;
767
768   return session;
769 }
770
771 static void
772 vcl_session_cleanup_handler (vcl_worker_t * wrk, void *data)
773 {
774   session_cleanup_msg_t *msg;
775   vcl_session_t *session;
776
777   msg = (session_cleanup_msg_t *) data;
778   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
779   if (!session)
780     {
781       VDBG (0, "disconnect confirmed for unknown handle 0x%llx", msg->handle);
782       return;
783     }
784
785   if (msg->type == SESSION_CLEANUP_TRANSPORT)
786     {
787       /* Transport was cleaned up before we confirmed close. Probably the
788        * app is still waiting for some data that cannot be delivered.
789        * Confirm close to make sure everything is cleaned up */
790       if (session->session_state == STATE_VPP_CLOSING)
791         vcl_session_cleanup (wrk, session, vcl_session_handle (session),
792                              1 /* do_disconnect */ );
793       return;
794     }
795
796   vcl_session_table_del_vpp_handle (wrk, msg->handle);
797   /* Should not happen. App did not close the connection so don't free it. */
798   if (session->session_state != STATE_CLOSED)
799     {
800       VDBG (0, "app did not close session %d", session->session_index);
801       session->session_state = STATE_DETACHED;
802       session->vpp_handle = VCL_INVALID_SESSION_HANDLE;
803       return;
804     }
805   vcl_session_free (wrk, session);
806 }
807
808 static void
809 vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
810 {
811   session_req_worker_update_msg_t *msg;
812   vcl_session_t *s;
813
814   msg = (session_req_worker_update_msg_t *) data;
815   s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
816   if (!s)
817     return;
818
819   vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
820 }
821
822 static void
823 vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
824 {
825   session_worker_update_reply_msg_t *msg;
826   vcl_session_t *s;
827
828   msg = (session_worker_update_reply_msg_t *) data;
829   s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
830   if (!s)
831     {
832       VDBG (0, "unknown handle 0x%llx", msg->handle);
833       return;
834     }
835   if (vcl_segment_is_not_mounted (wrk, msg->segment_handle))
836     {
837       clib_warning ("segment for session %u is not mounted!",
838                     s->session_index);
839       return;
840     }
841
842   if (s->rx_fifo)
843     {
844       s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
845       s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
846       s->rx_fifo->client_session_index = s->session_index;
847       s->tx_fifo->client_session_index = s->session_index;
848       s->rx_fifo->client_thread_index = wrk->wrk_index;
849       s->tx_fifo->client_thread_index = wrk->wrk_index;
850     }
851   s->session_state = STATE_UPDATED;
852
853   VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
854         s->vpp_handle, wrk->wrk_index);
855 }
856
857 static void
858 vcl_session_app_add_segment_handler (vcl_worker_t * wrk, void *data)
859 {
860   ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM;
861   session_app_add_segment_msg_t *msg;
862   u64 segment_handle;
863   int fd = -1;
864
865   msg = (session_app_add_segment_msg_t *) data;
866
867   if (msg->fd_flags)
868     {
869       vl_socket_client_recv_fd_msg2 (&wrk->bapi_sock_ctx, &fd, 1, 5);
870       seg_type = SSVM_SEGMENT_MEMFD;
871     }
872
873   segment_handle = msg->segment_handle;
874   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
875     {
876       clib_warning ("invalid segment handle");
877       return;
878     }
879
880   if (vcl_segment_attach (segment_handle, (char *) msg->segment_name,
881                           seg_type, fd))
882     {
883       VDBG (0, "vcl_segment_attach ('%s') failed", msg->segment_name);
884       return;
885     }
886
887   VDBG (1, "mapped new segment '%s' size %d", msg->segment_name,
888         msg->segment_size);
889 }
890
891 static void
892 vcl_session_app_del_segment_handler (vcl_worker_t * wrk, void *data)
893 {
894   session_app_del_segment_msg_t *msg = (session_app_del_segment_msg_t *) data;
895   vcl_segment_detach (msg->segment_handle);
896   VDBG (1, "Unmapped segment: %d", msg->segment_handle);
897 }
898
899 static void
900 vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data)
901 {
902   if (!vcm->wrk_rpc_fn)
903     return;
904
905   (vcm->wrk_rpc_fn) (data);
906 }
907
908 static int
909 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
910 {
911   session_disconnected_msg_t *disconnected_msg;
912   vcl_session_t *session;
913
914   switch (e->event_type)
915     {
916     case SESSION_IO_EVT_RX:
917     case SESSION_IO_EVT_TX:
918       session = vcl_session_get (wrk, e->session_index);
919       if (!session || !(session->session_state & STATE_OPEN))
920         break;
921       vec_add1 (wrk->unhandled_evts_vector, *e);
922       break;
923     case SESSION_CTRL_EVT_ACCEPTED:
924       vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
925       break;
926     case SESSION_CTRL_EVT_CONNECTED:
927       vcl_session_connected_handler (wrk,
928                                      (session_connected_msg_t *) e->data);
929       break;
930     case SESSION_CTRL_EVT_DISCONNECTED:
931       disconnected_msg = (session_disconnected_msg_t *) e->data;
932       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
933       if (!session)
934         break;
935       VDBG (0, "disconnected session %u [0x%llx]", session->session_index,
936             session->vpp_handle);
937       break;
938     case SESSION_CTRL_EVT_RESET:
939       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
940       break;
941     case SESSION_CTRL_EVT_BOUND:
942       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
943       break;
944     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
945       vcl_session_unlisten_reply_handler (wrk, e->data);
946       break;
947     case SESSION_CTRL_EVT_MIGRATED:
948       vcl_session_migrated_handler (wrk, e->data);
949       break;
950     case SESSION_CTRL_EVT_CLEANUP:
951       vcl_session_cleanup_handler (wrk, e->data);
952       break;
953     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
954       vcl_session_req_worker_update_handler (wrk, e->data);
955       break;
956     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
957       vcl_session_worker_update_reply_handler (wrk, e->data);
958       break;
959     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
960       vcl_session_app_add_segment_handler (wrk, e->data);
961       break;
962     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
963       vcl_session_app_del_segment_handler (wrk, e->data);
964       break;
965     case SESSION_CTRL_EVT_RPC:
966       vcl_worker_rpc_handler (wrk, e->data);
967       break;
968     default:
969       clib_warning ("unhandled %u", e->event_type);
970     }
971   return VPPCOM_OK;
972 }
973
974 static int
975 vppcom_wait_for_session_state_change (u32 session_index,
976                                       vcl_session_state_t state,
977                                       f64 wait_for_time)
978 {
979   vcl_worker_t *wrk = vcl_worker_get_current ();
980   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
981   vcl_session_t *volatile session;
982   svm_msg_q_msg_t msg;
983   session_event_t *e;
984
985   do
986     {
987       session = vcl_session_get (wrk, session_index);
988       if (PREDICT_FALSE (!session))
989         {
990           return VPPCOM_EBADFD;
991         }
992       if (session->session_state & state)
993         {
994           return VPPCOM_OK;
995         }
996       if (session->session_state & STATE_DETACHED)
997         {
998           return VPPCOM_ECONNREFUSED;
999         }
1000
1001       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
1002         {
1003           usleep (100);
1004           continue;
1005         }
1006       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1007       vcl_handle_mq_event (wrk, e);
1008       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1009     }
1010   while (clib_time_now (&wrk->clib_time) < timeout);
1011
1012   VDBG (0, "timeout waiting for state 0x%x (%s)", state,
1013         vppcom_session_state_str (state));
1014   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
1015
1016   return VPPCOM_ETIMEDOUT;
1017 }
1018
1019 static void
1020 vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
1021 {
1022   vcl_session_state_t state;
1023   vcl_session_t *s;
1024   u32 *sip;
1025
1026   if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
1027     return;
1028
1029   vec_foreach (sip, wrk->pending_session_wrk_updates)
1030   {
1031     s = vcl_session_get (wrk, *sip);
1032     vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
1033     state = s->session_state;
1034     vppcom_wait_for_session_state_change (s->session_index, STATE_UPDATED, 5);
1035     s->session_state = state;
1036   }
1037   vec_reset_length (wrk->pending_session_wrk_updates);
1038 }
1039
1040 void
1041 vcl_flush_mq_events (void)
1042 {
1043   vcl_worker_t *wrk = vcl_worker_get_current ();
1044   svm_msg_q_msg_t *msg;
1045   session_event_t *e;
1046   svm_msg_q_t *mq;
1047   int i;
1048
1049   mq = wrk->app_event_queue;
1050   svm_msg_q_lock (mq);
1051   vcl_mq_dequeue_batch (wrk, mq, ~0);
1052   svm_msg_q_unlock (mq);
1053
1054   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
1055     {
1056       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
1057       e = svm_msg_q_msg_data (mq, msg);
1058       vcl_handle_mq_event (wrk, e);
1059       svm_msg_q_free_msg (mq, msg);
1060     }
1061   vec_reset_length (wrk->mq_msg_vector);
1062   vcl_handle_pending_wrk_updates (wrk);
1063 }
1064
1065 static int
1066 vppcom_app_session_enable (void)
1067 {
1068   int rv;
1069
1070   if (vcm->app_state != STATE_APP_ENABLED)
1071     {
1072       vppcom_send_session_enable_disable (1 /* is_enabled == TRUE */ );
1073       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
1074       if (PREDICT_FALSE (rv))
1075         {
1076           VDBG (0, "application session enable timed out! returning %d (%s)",
1077                 rv, vppcom_retval_str (rv));
1078           return rv;
1079         }
1080     }
1081   return VPPCOM_OK;
1082 }
1083
1084 static int
1085 vppcom_app_attach (void)
1086 {
1087   int rv;
1088
1089   vppcom_app_send_attach ();
1090   rv = vcl_wait_for_app_state_change (STATE_APP_ATTACHED);
1091   if (PREDICT_FALSE (rv))
1092     {
1093       VDBG (0, "application attach timed out! returning %d (%s)", rv,
1094             vppcom_retval_str (rv));
1095       return rv;
1096     }
1097
1098   return VPPCOM_OK;
1099 }
1100
1101 static int
1102 vppcom_session_unbind (u32 session_handle)
1103 {
1104   vcl_worker_t *wrk = vcl_worker_get_current ();
1105   session_accepted_msg_t *accepted_msg;
1106   vcl_session_t *session = 0;
1107   vcl_session_msg_t *evt;
1108
1109   session = vcl_session_get_w_handle (wrk, session_handle);
1110   if (!session)
1111     return VPPCOM_EBADFD;
1112
1113   /* Flush pending accept events, if any */
1114   while (clib_fifo_elts (session->accept_evts_fifo))
1115     {
1116       clib_fifo_sub2 (session->accept_evts_fifo, evt);
1117       accepted_msg = &evt->accepted_msg;
1118       vcl_session_table_del_vpp_handle (wrk, accepted_msg->handle);
1119       vcl_send_session_accepted_reply (session->vpp_evt_q,
1120                                        accepted_msg->context,
1121                                        session->vpp_handle, -1);
1122     }
1123   clib_fifo_free (session->accept_evts_fifo);
1124
1125   vcl_send_session_unlisten (wrk, session);
1126
1127   VDBG (1, "session %u [0x%llx]: sending unbind!", session->session_index,
1128         session->vpp_handle);
1129   vcl_evt (VCL_EVT_UNBIND, session);
1130
1131   session->vpp_handle = ~0;
1132   session->session_state = STATE_DISCONNECT;
1133
1134   return VPPCOM_OK;
1135 }
1136
1137 static int
1138 vppcom_session_disconnect (u32 session_handle)
1139 {
1140   vcl_worker_t *wrk = vcl_worker_get_current ();
1141   svm_msg_q_t *vpp_evt_q;
1142   vcl_session_t *session, *listen_session;
1143   vcl_session_state_t state;
1144   u64 vpp_handle;
1145
1146   session = vcl_session_get_w_handle (wrk, session_handle);
1147   if (!session)
1148     return VPPCOM_EBADFD;
1149
1150   vpp_handle = session->vpp_handle;
1151   state = session->session_state;
1152
1153   VDBG (1, "session %u [0x%llx] state 0x%x (%s)", session->session_index,
1154         vpp_handle, state, vppcom_session_state_str (state));
1155
1156   if (PREDICT_FALSE (state & STATE_LISTEN))
1157     {
1158       VDBG (0, "ERROR: Cannot disconnect a listen socket!");
1159       return VPPCOM_EBADFD;
1160     }
1161
1162   if (state & STATE_VPP_CLOSING)
1163     {
1164       vpp_evt_q = vcl_session_vpp_evt_q (wrk, session);
1165       vcl_send_session_disconnected_reply (vpp_evt_q, wrk->my_client_index,
1166                                            vpp_handle, 0);
1167       VDBG (1, "session %u [0x%llx]: sending disconnect REPLY...",
1168             session->session_index, vpp_handle);
1169     }
1170   else
1171     {
1172       VDBG (1, "session %u [0x%llx]: sending disconnect...",
1173             session->session_index, vpp_handle);
1174       vcl_send_session_disconnect (wrk, session);
1175     }
1176
1177   if (session->listener_index != VCL_INVALID_SESSION_INDEX)
1178     {
1179       listen_session = vcl_session_get (wrk, session->listener_index);
1180       listen_session->n_accepted_sessions--;
1181     }
1182
1183   return VPPCOM_OK;
1184 }
1185
1186 /**
1187  * Handle app exit
1188  *
1189  * Notify vpp of the disconnect and mark the worker as free. If we're the
1190  * last worker, do a full cleanup otherwise, since we're probably a forked
1191  * child, avoid syscalls as much as possible. We might've lost privileges.
1192  */
1193 void
1194 vppcom_app_exit (void)
1195 {
1196   if (!pool_elts (vcm->workers))
1197     return;
1198   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
1199   vcl_set_worker_index (~0);
1200   vcl_elog_stop (vcm);
1201 }
1202
1203 /*
1204  * VPPCOM Public API functions
1205  */
1206 int
1207 vppcom_app_create (const char *app_name)
1208 {
1209   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
1210   int rv;
1211
1212   if (vcm->is_init)
1213     {
1214       VDBG (1, "already initialized");
1215       return VPPCOM_EEXIST;
1216     }
1217
1218   vcm->is_init = 1;
1219   vppcom_cfg (&vcm->cfg);
1220   vcl_cfg = &vcm->cfg;
1221
1222   vcm->main_cpu = pthread_self ();
1223   vcm->main_pid = getpid ();
1224   vcm->app_name = format (0, "%s", app_name);
1225   vppcom_init_error_string_table ();
1226   fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
1227                           20 /* timeout in secs */ );
1228   pool_alloc (vcm->workers, vcl_cfg->max_workers);
1229   clib_spinlock_init (&vcm->workers_lock);
1230   clib_rwlock_init (&vcm->segment_table_lock);
1231   atexit (vppcom_app_exit);
1232
1233   /* Allocate default worker */
1234   vcl_worker_alloc_and_init ();
1235
1236   /* API hookup and connect to VPP */
1237   vcl_elog_init (vcm);
1238   vcm->app_state = STATE_APP_START;
1239   rv = vppcom_connect_to_vpp (app_name);
1240   if (rv)
1241     {
1242       VERR ("couldn't connect to VPP!");
1243       return rv;
1244     }
1245   VDBG (0, "sending session enable");
1246   rv = vppcom_app_session_enable ();
1247   if (rv)
1248     {
1249       VERR ("vppcom_app_session_enable() failed!");
1250       return rv;
1251     }
1252
1253   VDBG (0, "sending app attach");
1254   rv = vppcom_app_attach ();
1255   if (rv)
1256     {
1257       VERR ("vppcom_app_attach() failed!");
1258       return rv;
1259     }
1260
1261   VDBG (0, "app_name '%s', my_client_index %d (0x%x)", app_name,
1262         vcm->workers[0].my_client_index, vcm->workers[0].my_client_index);
1263
1264   return VPPCOM_OK;
1265 }
1266
1267 void
1268 vppcom_app_destroy (void)
1269 {
1270   vcl_worker_t *wrk, *current_wrk;
1271   struct dlmallinfo mi;
1272   mspace heap;
1273
1274   if (!pool_elts (vcm->workers))
1275     return;
1276
1277   vcl_evt (VCL_EVT_DETACH, vcm);
1278
1279   current_wrk = vcl_worker_get_current ();
1280
1281   /* *INDENT-OFF* */
1282   pool_foreach (wrk, vcm->workers, ({
1283     if (current_wrk != wrk)
1284       vcl_worker_cleanup (wrk, 0 /* notify vpp */ );
1285   }));
1286   /* *INDENT-ON* */
1287
1288   vcl_send_app_detach (current_wrk);
1289   vppcom_disconnect_from_vpp ();
1290   vcl_worker_cleanup (current_wrk, 0 /* notify vpp */ );
1291
1292   vcl_elog_stop (vcm);
1293
1294   /*
1295    * Free the heap and fix vcm
1296    */
1297   heap = clib_mem_get_heap ();
1298   mi = mspace_mallinfo (heap);
1299   munmap (mspace_least_addr (heap), mi.arena);
1300
1301   vcm = &_vppcom_main;
1302   vcm->is_init = 0;
1303 }
1304
1305 int
1306 vppcom_session_create (u8 proto, u8 is_nonblocking)
1307 {
1308   vcl_worker_t *wrk = vcl_worker_get_current ();
1309   vcl_session_t *session;
1310
1311   session = vcl_session_alloc (wrk);
1312
1313   session->session_type = proto;
1314   session->session_state = STATE_CLOSED;
1315   session->vpp_handle = ~0;
1316   session->is_dgram = vcl_proto_is_dgram (proto);
1317
1318   if (is_nonblocking)
1319     VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
1320
1321   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
1322            is_nonblocking, session_index);
1323
1324   VDBG (0, "created session %u", session->session_index);
1325
1326   return vcl_session_handle (session);
1327 }
1328
1329 int
1330 vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * session,
1331                      vcl_session_handle_t sh, u8 do_disconnect)
1332 {
1333   vcl_session_state_t state;
1334   u32 next_sh, vep_sh;
1335   int rv = VPPCOM_OK;
1336   u64 vpp_handle;
1337   u8 is_vep;
1338
1339   is_vep = session->is_vep;
1340   next_sh = session->vep.next_sh;
1341   vep_sh = session->vep.vep_sh;
1342   state = session->session_state;
1343   vpp_handle = session->vpp_handle;
1344
1345   VDBG (1, "session %u [0x%llx] closing", session->session_index, vpp_handle);
1346
1347   if (is_vep)
1348     {
1349       while (next_sh != ~0)
1350         {
1351           rv = vppcom_epoll_ctl (sh, EPOLL_CTL_DEL, next_sh, 0);
1352           if (PREDICT_FALSE (rv < 0))
1353             VDBG (0, "vpp handle 0x%llx, sh %u: EPOLL_CTL_DEL vep_idx %u"
1354                   " failed! rv %d (%s)", vpp_handle, next_sh, vep_sh, rv,
1355                   vppcom_retval_str (rv));
1356
1357           next_sh = session->vep.next_sh;
1358         }
1359       goto cleanup;
1360     }
1361
1362   if (session->is_vep_session)
1363     {
1364       rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, sh, 0);
1365       if (rv < 0)
1366         VDBG (0, "session %u [0x%llx]: EPOLL_CTL_DEL vep_idx %u "
1367               "failed! rv %d (%s)", session->session_index, vpp_handle,
1368               vep_sh, rv, vppcom_retval_str (rv));
1369     }
1370
1371   if (!do_disconnect)
1372     {
1373       VDBG (1, "session %u [0x%llx] disconnect skipped",
1374             session->session_index, vpp_handle);
1375       goto cleanup;
1376     }
1377
1378   if (state & STATE_LISTEN)
1379     {
1380       rv = vppcom_session_unbind (sh);
1381       if (PREDICT_FALSE (rv < 0))
1382         VDBG (0, "session %u [0x%llx]: listener unbind failed! "
1383               "rv %d (%s)", session->session_index, vpp_handle, rv,
1384               vppcom_retval_str (rv));
1385       return rv;
1386     }
1387   else if ((state & STATE_OPEN)
1388            || (vcl_session_is_connectable_listener (wrk, session)))
1389     {
1390       rv = vppcom_session_disconnect (sh);
1391       if (PREDICT_FALSE (rv < 0))
1392         VDBG (0, "ERROR: session %u [0x%llx]: disconnect failed!"
1393               " rv %d (%s)", session->session_index, vpp_handle,
1394               rv, vppcom_retval_str (rv));
1395     }
1396   else if (state == STATE_DISCONNECT)
1397     {
1398       svm_msg_q_t *mq = vcl_session_vpp_evt_q (wrk, session);
1399       vcl_send_session_reset_reply (mq, wrk->my_client_index,
1400                                     session->vpp_handle, 0);
1401     }
1402   else if (state == STATE_DETACHED)
1403     {
1404       /* Should not happen. VPP cleaned up before app confirmed close */
1405       VDBG (0, "vpp freed session %d before close", session->session_index);
1406       goto free_session;
1407     }
1408
1409   session->session_state = STATE_CLOSED;
1410
1411   /* Session is removed only after vpp confirms the disconnect */
1412   return rv;
1413
1414 cleanup:
1415   vcl_session_table_del_vpp_handle (wrk, vpp_handle);
1416 free_session:
1417   vcl_session_free (wrk, session);
1418   vcl_evt (VCL_EVT_CLOSE, session, rv);
1419
1420   return rv;
1421 }
1422
1423 int
1424 vppcom_session_close (uint32_t session_handle)
1425 {
1426   vcl_worker_t *wrk = vcl_worker_get_current ();
1427   vcl_session_t *session;
1428
1429   session = vcl_session_get_w_handle (wrk, session_handle);
1430   if (!session)
1431     return VPPCOM_EBADFD;
1432   return vcl_session_cleanup (wrk, session, session_handle,
1433                               1 /* do_disconnect */ );
1434 }
1435
1436 int
1437 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
1438 {
1439   vcl_worker_t *wrk = vcl_worker_get_current ();
1440   vcl_session_t *session = 0;
1441
1442   if (!ep || !ep->ip)
1443     return VPPCOM_EINVAL;
1444
1445   session = vcl_session_get_w_handle (wrk, session_handle);
1446   if (!session)
1447     return VPPCOM_EBADFD;
1448
1449   if (session->is_vep)
1450     {
1451       VDBG (0, "ERROR: cannot bind to epoll session %u!",
1452             session->session_index);
1453       return VPPCOM_EBADFD;
1454     }
1455
1456   session->transport.is_ip4 = ep->is_ip4;
1457   if (ep->is_ip4)
1458     clib_memcpy_fast (&session->transport.lcl_ip.ip4, ep->ip,
1459                       sizeof (ip4_address_t));
1460   else
1461     clib_memcpy_fast (&session->transport.lcl_ip.ip6, ep->ip,
1462                       sizeof (ip6_address_t));
1463   session->transport.lcl_port = ep->port;
1464
1465   VDBG (0, "session %u handle %u: binding to local %s address %U port %u, "
1466         "proto %s", session->session_index, session_handle,
1467         session->transport.is_ip4 ? "IPv4" : "IPv6",
1468         format_ip46_address, &session->transport.lcl_ip,
1469         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1470         clib_net_to_host_u16 (session->transport.lcl_port),
1471         vppcom_proto_str (session->session_type));
1472   vcl_evt (VCL_EVT_BIND, session);
1473
1474   if (session->session_type == VPPCOM_PROTO_UDP)
1475     vppcom_session_listen (session_handle, 10);
1476
1477   return VPPCOM_OK;
1478 }
1479
1480 int
1481 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
1482 {
1483   vcl_worker_t *wrk = vcl_worker_get_current ();
1484   vcl_session_t *listen_session = 0;
1485   u64 listen_vpp_handle;
1486   int rv;
1487
1488   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1489   if (!listen_session || listen_session->is_vep)
1490     return VPPCOM_EBADFD;
1491
1492   if (q_len == 0 || q_len == ~0)
1493     q_len = vcm->cfg.listen_queue_size;
1494
1495   listen_vpp_handle = listen_session->vpp_handle;
1496   if (listen_session->session_state & STATE_LISTEN)
1497     {
1498       VDBG (0, "session %u [0x%llx]: already in listen state!",
1499             listen_sh, listen_vpp_handle);
1500       return VPPCOM_OK;
1501     }
1502
1503   VDBG (0, "session %u: sending vpp listen request...", listen_sh);
1504
1505   /*
1506    * Send listen request to vpp and wait for reply
1507    */
1508   vcl_send_session_listen (wrk, listen_session);
1509   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1510                                              STATE_LISTEN,
1511                                              vcm->cfg.session_timeout);
1512
1513   if (PREDICT_FALSE (rv))
1514     {
1515       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1516       VDBG (0, "session %u [0x%llx]: listen failed! returning %d (%s)",
1517             listen_sh, listen_session->vpp_handle, rv,
1518             vppcom_retval_str (rv));
1519       return rv;
1520     }
1521
1522   return VPPCOM_OK;
1523 }
1524
1525 int
1526 vppcom_session_tls_add_cert (uint32_t session_handle, char *cert,
1527                              uint32_t cert_len)
1528 {
1529
1530   vcl_worker_t *wrk = vcl_worker_get_current ();
1531   vcl_session_t *session = 0;
1532
1533   session = vcl_session_get_w_handle (wrk, session_handle);
1534   if (!session)
1535     return VPPCOM_EBADFD;
1536
1537   if (cert_len == 0 || cert_len == ~0)
1538     return VPPCOM_EBADFD;
1539
1540   /*
1541    * Send listen request to vpp and wait for reply
1542    */
1543   vppcom_send_application_tls_cert_add (session, cert, cert_len);
1544   vcm->app_state = STATE_APP_ADDING_TLS_DATA;
1545   vcl_wait_for_app_state_change (STATE_APP_READY);
1546   return VPPCOM_OK;
1547
1548 }
1549
1550 int
1551 vppcom_session_tls_add_key (uint32_t session_handle, char *key,
1552                             uint32_t key_len)
1553 {
1554
1555   vcl_worker_t *wrk = vcl_worker_get_current ();
1556   vcl_session_t *session = 0;
1557
1558   session = vcl_session_get_w_handle (wrk, session_handle);
1559   if (!session)
1560     return VPPCOM_EBADFD;
1561
1562   if (key_len == 0 || key_len == ~0)
1563     return VPPCOM_EBADFD;
1564
1565   vppcom_send_application_tls_key_add (session, key, key_len);
1566   vcm->app_state = STATE_APP_ADDING_TLS_DATA;
1567   vcl_wait_for_app_state_change (STATE_APP_READY);
1568   return VPPCOM_OK;
1569 }
1570
1571 static int
1572 validate_args_session_accept_ (vcl_worker_t * wrk, vcl_session_t * ls)
1573 {
1574   if (ls->is_vep)
1575     {
1576       VDBG (0, "ERROR: cannot accept on epoll session %u!",
1577             ls->session_index);
1578       return VPPCOM_EBADFD;
1579     }
1580
1581   if ((ls->session_state != STATE_LISTEN)
1582       && (!vcl_session_is_connectable_listener (wrk, ls)))
1583     {
1584       VDBG (0,
1585             "ERROR: session [0x%llx]: not in listen state! state 0x%x"
1586             " (%s)", ls->vpp_handle, ls->session_state,
1587             vppcom_session_state_str (ls->session_state));
1588       return VPPCOM_EBADFD;
1589     }
1590   return VPPCOM_OK;
1591 }
1592
1593 int
1594 vppcom_unformat_proto (uint8_t * proto, char *proto_str)
1595 {
1596   if (!strcmp (proto_str, "TCP"))
1597     *proto = VPPCOM_PROTO_TCP;
1598   else if (!strcmp (proto_str, "tcp"))
1599     *proto = VPPCOM_PROTO_TCP;
1600   else if (!strcmp (proto_str, "UDP"))
1601     *proto = VPPCOM_PROTO_UDP;
1602   else if (!strcmp (proto_str, "udp"))
1603     *proto = VPPCOM_PROTO_UDP;
1604   else if (!strcmp (proto_str, "TLS"))
1605     *proto = VPPCOM_PROTO_TLS;
1606   else if (!strcmp (proto_str, "tls"))
1607     *proto = VPPCOM_PROTO_TLS;
1608   else if (!strcmp (proto_str, "QUIC"))
1609     *proto = VPPCOM_PROTO_QUIC;
1610   else if (!strcmp (proto_str, "quic"))
1611     *proto = VPPCOM_PROTO_QUIC;
1612   else
1613     return 1;
1614   return 0;
1615 }
1616
1617 int
1618 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1619                        uint32_t flags)
1620 {
1621   u32 client_session_index = ~0, listen_session_index, accept_flags = 0;
1622   vcl_worker_t *wrk = vcl_worker_get_current ();
1623   session_accepted_msg_t accepted_msg;
1624   vcl_session_t *listen_session = 0;
1625   vcl_session_t *client_session = 0;
1626   vcl_session_msg_t *evt;
1627   svm_msg_q_msg_t msg;
1628   session_event_t *e;
1629   u8 is_nonblocking;
1630   int rv;
1631
1632   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1633   if (!listen_session)
1634     return VPPCOM_EBADFD;
1635
1636   listen_session_index = listen_session->session_index;
1637   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1638     return rv;
1639
1640   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1641     {
1642       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1643       accept_flags = evt->flags;
1644       accepted_msg = evt->accepted_msg;
1645       goto handle;
1646     }
1647
1648   is_nonblocking = VCL_SESS_ATTR_TEST (listen_session->attr,
1649                                        VCL_SESS_ATTR_NONBLOCK);
1650   while (1)
1651     {
1652       if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1653         return VPPCOM_EAGAIN;
1654
1655       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1656         return VPPCOM_EAGAIN;
1657
1658       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1659       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1660         {
1661           vcl_handle_mq_event (wrk, e);
1662           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1663           continue;
1664         }
1665       clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
1666       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1667       break;
1668     }
1669
1670 handle:
1671
1672   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg,
1673                                                        listen_session_index);
1674   if (client_session_index == VCL_INVALID_SESSION_INDEX)
1675     return VPPCOM_ECONNABORTED;
1676
1677   listen_session = vcl_session_get (wrk, listen_session_index);
1678   client_session = vcl_session_get (wrk, client_session_index);
1679
1680   if (flags & O_NONBLOCK)
1681     VCL_SESS_ATTR_SET (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
1682
1683   VDBG (1, "listener %u [0x%llx]: Got a connect request! session %u [0x%llx],"
1684         " flags %d, is_nonblocking %u", listen_session->session_index,
1685         listen_session->vpp_handle, client_session_index,
1686         client_session->vpp_handle, flags,
1687         VCL_SESS_ATTR_TEST (client_session->attr, VCL_SESS_ATTR_NONBLOCK));
1688
1689   if (ep)
1690     {
1691       ep->is_ip4 = client_session->transport.is_ip4;
1692       ep->port = client_session->transport.rmt_port;
1693       if (client_session->transport.is_ip4)
1694         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip4,
1695                           sizeof (ip4_address_t));
1696       else
1697         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip6,
1698                           sizeof (ip6_address_t));
1699     }
1700
1701   VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
1702         "local: %U:%u", listen_session_handle, listen_session->vpp_handle,
1703         client_session_index, client_session->vpp_handle,
1704         format_ip46_address, &client_session->transport.rmt_ip,
1705         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1706         clib_net_to_host_u16 (client_session->transport.rmt_port),
1707         format_ip46_address, &client_session->transport.lcl_ip,
1708         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1709         clib_net_to_host_u16 (client_session->transport.lcl_port));
1710   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1711            client_session_index);
1712
1713   /*
1714    * Session might have been closed already
1715    */
1716   if (accept_flags)
1717     {
1718       if (accept_flags & VCL_ACCEPTED_F_CLOSED)
1719         client_session->session_state = STATE_VPP_CLOSING;
1720       else if (accept_flags & VCL_ACCEPTED_F_RESET)
1721         client_session->session_state = STATE_DISCONNECT;
1722     }
1723   return vcl_session_handle (client_session);
1724 }
1725
1726 int
1727 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1728 {
1729   vcl_worker_t *wrk = vcl_worker_get_current ();
1730   vcl_session_t *session = 0;
1731   u32 session_index;
1732   int rv;
1733
1734   session = vcl_session_get_w_handle (wrk, session_handle);
1735   if (!session)
1736     return VPPCOM_EBADFD;
1737   session_index = session->session_index;
1738
1739   if (PREDICT_FALSE (session->is_vep))
1740     {
1741       VDBG (0, "ERROR: cannot connect epoll session %u!",
1742             session->session_index);
1743       return VPPCOM_EBADFD;
1744     }
1745
1746   if (PREDICT_FALSE (session->session_state & CLIENT_STATE_OPEN))
1747     {
1748       VDBG (0, "session handle %u [0x%llx]: session already "
1749             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1750             session_handle, session->vpp_handle,
1751             session->transport.is_ip4 ? "IPv4" : "IPv6", format_ip46_address,
1752             &session->transport.rmt_ip, session->transport.is_ip4 ?
1753             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1754             clib_net_to_host_u16 (session->transport.rmt_port),
1755             vppcom_proto_str (session->session_type), session->session_state,
1756             vppcom_session_state_str (session->session_state));
1757       return VPPCOM_OK;
1758     }
1759
1760   /* Attempt to connect a connectionless listener */
1761   if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
1762     {
1763       if (session->session_type != VPPCOM_PROTO_UDP)
1764         return VPPCOM_EINVAL;
1765       vcl_send_session_unlisten (wrk, session);
1766       session->session_state = STATE_CLOSED;
1767     }
1768
1769   session->transport.is_ip4 = server_ep->is_ip4;
1770   vcl_ip_copy_from_ep (&session->transport.rmt_ip, server_ep);
1771   session->transport.rmt_port = server_ep->port;
1772   session->parent_handle = VCL_INVALID_SESSION_HANDLE;
1773   session->flags |= VCL_SESSION_F_CONNECTED;
1774
1775   VDBG (0, "session handle %u (%s): connecting to peer %s %U "
1776         "port %d proto %s", session_handle,
1777         vppcom_session_state_str (session->session_state),
1778         session->transport.is_ip4 ? "IPv4" : "IPv6",
1779         format_ip46_address,
1780         &session->transport.rmt_ip, session->transport.is_ip4 ?
1781         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1782         clib_net_to_host_u16 (session->transport.rmt_port),
1783         vppcom_proto_str (session->session_type));
1784
1785   vcl_send_session_connect (wrk, session);
1786
1787   if (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK))
1788     return VPPCOM_EINPROGRESS;
1789
1790   /*
1791    * Wait for reply from vpp if blocking
1792    */
1793   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
1794                                              vcm->cfg.session_timeout);
1795
1796   session = vcl_session_get (wrk, session_index);
1797   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1798         session->vpp_handle, rv ? "failed" : "succeeded");
1799
1800   return rv;
1801 }
1802
1803 int
1804 vppcom_session_stream_connect (uint32_t session_handle,
1805                                uint32_t parent_session_handle)
1806 {
1807   vcl_worker_t *wrk = vcl_worker_get_current ();
1808   vcl_session_t *session, *parent_session;
1809   u32 session_index, parent_session_index;
1810   int rv;
1811
1812   session = vcl_session_get_w_handle (wrk, session_handle);
1813   if (!session)
1814     return VPPCOM_EBADFD;
1815   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1816   if (!parent_session)
1817     return VPPCOM_EBADFD;
1818
1819   session_index = session->session_index;
1820   parent_session_index = parent_session->session_index;
1821   if (PREDICT_FALSE (session->is_vep))
1822     {
1823       VDBG (0, "ERROR: cannot connect epoll session %u!",
1824             session->session_index);
1825       return VPPCOM_EBADFD;
1826     }
1827
1828   if (PREDICT_FALSE (session->session_state & CLIENT_STATE_OPEN))
1829     {
1830       VDBG (0, "session handle %u [0x%llx]: session already "
1831             "connected to session %u [0x%llx] proto %s, state 0x%x (%s)",
1832             session_handle, session->vpp_handle,
1833             parent_session_handle, parent_session->vpp_handle,
1834             vppcom_proto_str (session->session_type), session->session_state,
1835             vppcom_session_state_str (session->session_state));
1836       return VPPCOM_OK;
1837     }
1838
1839   /* Connect to quic session specifics */
1840   session->transport.is_ip4 = parent_session->transport.is_ip4;
1841   session->transport.rmt_ip.ip4.as_u32 = (uint32_t) 1;
1842   session->transport.rmt_port = 0;
1843   session->parent_handle = parent_session->vpp_handle;
1844
1845   VDBG (0, "session handle %u: connecting to session %u [0x%llx]",
1846         session_handle, parent_session_handle, parent_session->vpp_handle);
1847
1848   /*
1849    * Send connect request and wait for reply from vpp
1850    */
1851   vcl_send_session_connect (wrk, session);
1852   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
1853                                              vcm->cfg.session_timeout);
1854
1855   session->listener_index = parent_session_index;
1856   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1857   if (parent_session)
1858     parent_session->n_accepted_sessions++;
1859
1860   session = vcl_session_get (wrk, session_index);
1861   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1862         session->vpp_handle, rv ? "failed" : "succeeded");
1863
1864   return rv;
1865 }
1866
1867 static u8
1868 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1869 {
1870   return (e->event_type == SESSION_IO_EVT_RX && e->session_index == sid);
1871 }
1872
1873 static inline int
1874 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1875                               u8 peek)
1876 {
1877   vcl_worker_t *wrk = vcl_worker_get_current ();
1878   int n_read = 0, is_nonblocking;
1879   vcl_session_t *s = 0;
1880   svm_fifo_t *rx_fifo;
1881   svm_msg_q_msg_t msg;
1882   session_event_t *e;
1883   svm_msg_q_t *mq;
1884   u8 is_ct;
1885
1886   if (PREDICT_FALSE (!buf))
1887     return VPPCOM_EINVAL;
1888
1889   s = vcl_session_get_w_handle (wrk, session_handle);
1890   if (PREDICT_FALSE (!s || s->is_vep))
1891     return VPPCOM_EBADFD;
1892
1893   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1894     {
1895       VDBG (0, "session %u[0x%llx] is not open! state 0x%x (%s)",
1896             s->session_index, s->vpp_handle, s->session_state,
1897             vppcom_session_state_str (s->session_state));
1898       return vcl_session_closed_error (s);
1899     }
1900
1901   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1902   is_ct = vcl_session_is_ct (s);
1903   mq = wrk->app_event_queue;
1904   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
1905   s->has_rx_evt = 0;
1906
1907   if (svm_fifo_is_empty_cons (rx_fifo))
1908     {
1909       if (is_nonblocking)
1910         {
1911           if (vcl_session_is_closing (s))
1912             return vcl_session_closing_error (s);
1913           svm_fifo_unset_event (s->rx_fifo);
1914           return VPPCOM_EWOULDBLOCK;
1915         }
1916       while (svm_fifo_is_empty_cons (rx_fifo))
1917         {
1918           if (vcl_session_is_closing (s))
1919             return vcl_session_closing_error (s);
1920
1921           svm_fifo_unset_event (s->rx_fifo);
1922           svm_msg_q_lock (mq);
1923           if (svm_msg_q_is_empty (mq))
1924             svm_msg_q_wait (mq);
1925
1926           svm_msg_q_sub_w_lock (mq, &msg);
1927           e = svm_msg_q_msg_data (mq, &msg);
1928           svm_msg_q_unlock (mq);
1929           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1930             vcl_handle_mq_event (wrk, e);
1931           svm_msg_q_free_msg (mq, &msg);
1932         }
1933     }
1934
1935   if (s->is_dgram)
1936     n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1937   else
1938     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1939
1940   if (svm_fifo_is_empty_cons (rx_fifo))
1941     svm_fifo_unset_event (s->rx_fifo);
1942
1943   /* Cut-through sessions might request tx notifications on rx fifos */
1944   if (PREDICT_FALSE (rx_fifo->want_deq_ntf))
1945     {
1946       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index,
1947                               SESSION_IO_EVT_RX, SVM_Q_WAIT);
1948       svm_fifo_reset_has_deq_ntf (s->rx_fifo);
1949     }
1950
1951   VDBG (2, "session %u[0x%llx]: read %d bytes from (%p)", s->session_index,
1952         s->vpp_handle, n_read, rx_fifo);
1953
1954   return n_read;
1955 }
1956
1957 int
1958 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1959 {
1960   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1961 }
1962
1963 static int
1964 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1965 {
1966   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1967 }
1968
1969 int
1970 vppcom_session_read_segments (uint32_t session_handle,
1971                               vppcom_data_segments_t ds)
1972 {
1973   vcl_worker_t *wrk = vcl_worker_get_current ();
1974   int n_read = 0, is_nonblocking;
1975   vcl_session_t *s = 0;
1976   svm_fifo_t *rx_fifo;
1977   svm_msg_q_msg_t msg;
1978   session_event_t *e;
1979   svm_msg_q_t *mq;
1980   u8 is_ct;
1981
1982   s = vcl_session_get_w_handle (wrk, session_handle);
1983   if (PREDICT_FALSE (!s || s->is_vep))
1984     return VPPCOM_EBADFD;
1985
1986   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1987     return vcl_session_closed_error (s);
1988
1989   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1990   is_ct = vcl_session_is_ct (s);
1991   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1992   rx_fifo = s->rx_fifo;
1993   s->has_rx_evt = 0;
1994
1995   if (is_ct)
1996     svm_fifo_unset_event (s->rx_fifo);
1997
1998   if (svm_fifo_is_empty_cons (rx_fifo))
1999     {
2000       if (is_nonblocking)
2001         {
2002           svm_fifo_unset_event (rx_fifo);
2003           return VPPCOM_EWOULDBLOCK;
2004         }
2005       while (svm_fifo_is_empty_cons (rx_fifo))
2006         {
2007           if (vcl_session_is_closing (s))
2008             return vcl_session_closing_error (s);
2009
2010           svm_fifo_unset_event (rx_fifo);
2011           svm_msg_q_lock (mq);
2012           if (svm_msg_q_is_empty (mq))
2013             svm_msg_q_wait (mq);
2014
2015           svm_msg_q_sub_w_lock (mq, &msg);
2016           e = svm_msg_q_msg_data (mq, &msg);
2017           svm_msg_q_unlock (mq);
2018           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
2019             vcl_handle_mq_event (wrk, e);
2020           svm_msg_q_free_msg (mq, &msg);
2021         }
2022     }
2023
2024   n_read = svm_fifo_segments (rx_fifo, (svm_fifo_seg_t *) ds);
2025   svm_fifo_unset_event (rx_fifo);
2026
2027   return n_read;
2028 }
2029
2030 void
2031 vppcom_session_free_segments (uint32_t session_handle,
2032                               vppcom_data_segments_t ds)
2033 {
2034   vcl_worker_t *wrk = vcl_worker_get_current ();
2035   vcl_session_t *s;
2036
2037   s = vcl_session_get_w_handle (wrk, session_handle);
2038   if (PREDICT_FALSE (!s || s->is_vep))
2039     return;
2040
2041   svm_fifo_segments_free (s->rx_fifo, (svm_fifo_seg_t *) ds);
2042 }
2043
2044 int
2045 vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
2046 {
2047   u32 first_copy = clib_min (ds[0].len, max_bytes);
2048   clib_memcpy_fast (buf, ds[0].data, first_copy);
2049   if (first_copy < max_bytes)
2050     {
2051       clib_memcpy_fast (buf + first_copy, ds[1].data,
2052                         clib_min (ds[1].len, max_bytes - first_copy));
2053     }
2054   return 0;
2055 }
2056
2057 static u8
2058 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
2059 {
2060   return (e->event_type == SESSION_IO_EVT_TX && e->session_index == sid);
2061 }
2062
2063 always_inline u8
2064 vcl_fifo_is_writeable (svm_fifo_t * f, u32 len, u8 is_dgram)
2065 {
2066   u32 max_enq = svm_fifo_max_enqueue_prod (f);
2067   if (is_dgram)
2068     return max_enq >= (sizeof (session_dgram_hdr_t) + len);
2069   else
2070     return max_enq > 0;
2071 }
2072
2073 always_inline int
2074 vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
2075                              size_t n, u8 is_flush, u8 is_dgram)
2076 {
2077   int n_write, is_nonblocking;
2078   session_evt_type_t et;
2079   svm_msg_q_msg_t msg;
2080   svm_fifo_t *tx_fifo;
2081   session_event_t *e;
2082   svm_msg_q_t *mq;
2083   u8 is_ct;
2084
2085   if (PREDICT_FALSE (!buf || n == 0))
2086     return VPPCOM_EINVAL;
2087
2088   if (PREDICT_FALSE (s->is_vep))
2089     {
2090       VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
2091             " session!", s->session_index, s->vpp_handle);
2092       return VPPCOM_EBADFD;
2093     }
2094
2095   if (PREDICT_FALSE (!vcl_session_is_open (s)))
2096     {
2097       VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
2098             s->session_index, s->vpp_handle, s->session_state,
2099             vppcom_session_state_str (s->session_state));
2100       return vcl_session_closed_error (s);;
2101     }
2102
2103   is_ct = vcl_session_is_ct (s);
2104   tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
2105   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
2106
2107   mq = wrk->app_event_queue;
2108   if (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2109     {
2110       if (is_nonblocking)
2111         {
2112           return VPPCOM_EWOULDBLOCK;
2113         }
2114       while (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2115         {
2116           svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
2117           if (vcl_session_is_closing (s))
2118             return vcl_session_closing_error (s);
2119           svm_msg_q_lock (mq);
2120           if (svm_msg_q_is_empty (mq))
2121             svm_msg_q_wait (mq);
2122
2123           svm_msg_q_sub_w_lock (mq, &msg);
2124           e = svm_msg_q_msg_data (mq, &msg);
2125           svm_msg_q_unlock (mq);
2126
2127           if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
2128             vcl_handle_mq_event (wrk, e);
2129           svm_msg_q_free_msg (mq, &msg);
2130         }
2131     }
2132
2133   et = SESSION_IO_EVT_TX;
2134   if (is_flush && !is_ct)
2135     et = SESSION_IO_EVT_TX_FLUSH;
2136
2137   if (is_dgram)
2138     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
2139                                   s->vpp_evt_q, buf, n, et,
2140                                   0 /* do_evt */ , SVM_Q_WAIT);
2141   else
2142     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
2143                                    0 /* do_evt */ , SVM_Q_WAIT);
2144
2145   if (svm_fifo_set_event (s->tx_fifo))
2146     app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo->master_session_index,
2147                             et, SVM_Q_WAIT);
2148
2149   ASSERT (n_write > 0);
2150
2151   VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
2152         s->vpp_handle, n_write);
2153
2154   return n_write;
2155 }
2156
2157 int
2158 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
2159 {
2160   vcl_worker_t *wrk = vcl_worker_get_current ();
2161   vcl_session_t *s;
2162
2163   s = vcl_session_get_w_handle (wrk, session_handle);
2164   if (PREDICT_FALSE (!s))
2165     return VPPCOM_EBADFD;
2166
2167   return vppcom_session_write_inline (wrk, s, buf, n,
2168                                       0 /* is_flush */ , s->is_dgram ? 1 : 0);
2169 }
2170
2171 int
2172 vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n)
2173 {
2174   vcl_worker_t *wrk = vcl_worker_get_current ();
2175   vcl_session_t *s;
2176
2177   s = vcl_session_get_w_handle (wrk, session_handle);
2178   if (PREDICT_FALSE (!s))
2179     return VPPCOM_EBADFD;
2180
2181   return vppcom_session_write_inline (wrk, s, buf, n,
2182                                       1 /* is_flush */ , s->is_dgram ? 1 : 0);
2183 }
2184
2185 #define vcl_fifo_rx_evt_valid_or_break(_s)                              \
2186 if (PREDICT_FALSE (!_s->rx_fifo))                                       \
2187   break;                                                                \
2188 if (PREDICT_FALSE (svm_fifo_is_empty (_s->rx_fifo)))                    \
2189   {                                                                     \
2190     if (!vcl_session_is_ct (_s))                                        \
2191       {                                                                 \
2192         svm_fifo_unset_event (_s->rx_fifo);                             \
2193         if (svm_fifo_is_empty (_s->rx_fifo))                            \
2194           break;                                                        \
2195       }                                                                 \
2196     else if (svm_fifo_is_empty (_s->ct_rx_fifo))                        \
2197       {                                                                 \
2198         svm_fifo_unset_event (_s->ct_rx_fifo);                          \
2199         if (svm_fifo_is_empty (_s->ct_rx_fifo))                         \
2200           break;                                                        \
2201       }                                                                 \
2202   }                                                                     \
2203
2204 static void
2205 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2206                             unsigned long n_bits, unsigned long *read_map,
2207                             unsigned long *write_map,
2208                             unsigned long *except_map, u32 * bits_set)
2209 {
2210   session_disconnected_msg_t *disconnected_msg;
2211   session_connected_msg_t *connected_msg;
2212   vcl_session_t *session;
2213   u32 sid;
2214
2215   switch (e->event_type)
2216     {
2217     case SESSION_IO_EVT_RX:
2218       sid = e->session_index;
2219       session = vcl_session_get (wrk, sid);
2220       if (!session || !vcl_session_is_open (session))
2221         break;
2222       vcl_fifo_rx_evt_valid_or_break (session);
2223       if (sid < n_bits && read_map)
2224         {
2225           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2226           *bits_set += 1;
2227         }
2228       break;
2229     case SESSION_IO_EVT_TX:
2230       sid = e->session_index;
2231       session = vcl_session_get (wrk, sid);
2232       if (!session || !vcl_session_is_open (session))
2233         break;
2234       if (sid < n_bits && write_map)
2235         {
2236           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2237           *bits_set += 1;
2238         }
2239       break;
2240     case SESSION_CTRL_EVT_ACCEPTED:
2241       session = vcl_session_accepted (wrk,
2242                                       (session_accepted_msg_t *) e->data);
2243       if (!session)
2244         break;
2245       sid = session->session_index;
2246       if (sid < n_bits && read_map)
2247         {
2248           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2249           *bits_set += 1;
2250         }
2251       break;
2252     case SESSION_CTRL_EVT_CONNECTED:
2253       connected_msg = (session_connected_msg_t *) e->data;
2254       sid = vcl_session_connected_handler (wrk, connected_msg);
2255       if (sid == VCL_INVALID_SESSION_INDEX)
2256         break;
2257       if (sid < n_bits && write_map)
2258         {
2259           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2260           *bits_set += 1;
2261         }
2262       break;
2263     case SESSION_CTRL_EVT_DISCONNECTED:
2264       disconnected_msg = (session_disconnected_msg_t *) e->data;
2265       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
2266       if (!session)
2267         break;
2268       sid = session->session_index;
2269       if (sid < n_bits && except_map)
2270         {
2271           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2272           *bits_set += 1;
2273         }
2274       break;
2275     case SESSION_CTRL_EVT_RESET:
2276       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2277       if (sid < n_bits && except_map)
2278         {
2279           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2280           *bits_set += 1;
2281         }
2282       break;
2283     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2284       vcl_session_unlisten_reply_handler (wrk, e->data);
2285       break;
2286     case SESSION_CTRL_EVT_MIGRATED:
2287       vcl_session_migrated_handler (wrk, e->data);
2288       break;
2289     case SESSION_CTRL_EVT_CLEANUP:
2290       vcl_session_cleanup_handler (wrk, e->data);
2291       break;
2292     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2293       vcl_session_worker_update_reply_handler (wrk, e->data);
2294       break;
2295     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2296       vcl_session_req_worker_update_handler (wrk, e->data);
2297       break;
2298     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
2299       vcl_session_app_add_segment_handler (wrk, e->data);
2300       break;
2301     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
2302       vcl_session_app_del_segment_handler (wrk, e->data);
2303       break;
2304     case SESSION_CTRL_EVT_RPC:
2305       vcl_worker_rpc_handler (wrk, e->data);
2306       break;
2307     default:
2308       clib_warning ("unhandled: %u", e->event_type);
2309       break;
2310     }
2311 }
2312
2313 static int
2314 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2315                       unsigned long n_bits, unsigned long *read_map,
2316                       unsigned long *write_map, unsigned long *except_map,
2317                       double time_to_wait, u32 * bits_set)
2318 {
2319   svm_msg_q_msg_t *msg;
2320   session_event_t *e;
2321   u32 i;
2322
2323   svm_msg_q_lock (mq);
2324   if (svm_msg_q_is_empty (mq))
2325     {
2326       if (*bits_set)
2327         {
2328           svm_msg_q_unlock (mq);
2329           return 0;
2330         }
2331
2332       if (!time_to_wait)
2333         {
2334           svm_msg_q_unlock (mq);
2335           return 0;
2336         }
2337       else if (time_to_wait < 0)
2338         {
2339           svm_msg_q_wait (mq);
2340         }
2341       else
2342         {
2343           if (svm_msg_q_timedwait (mq, time_to_wait))
2344             {
2345               svm_msg_q_unlock (mq);
2346               return 0;
2347             }
2348         }
2349     }
2350   vcl_mq_dequeue_batch (wrk, mq, ~0);
2351   svm_msg_q_unlock (mq);
2352
2353   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2354     {
2355       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2356       e = svm_msg_q_msg_data (mq, msg);
2357       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
2358                                   except_map, bits_set);
2359       svm_msg_q_free_msg (mq, msg);
2360     }
2361   vec_reset_length (wrk->mq_msg_vector);
2362   vcl_handle_pending_wrk_updates (wrk);
2363   return *bits_set;
2364 }
2365
2366 static int
2367 vppcom_select_condvar (vcl_worker_t * wrk, int n_bits,
2368                        vcl_si_set * read_map, vcl_si_set * write_map,
2369                        vcl_si_set * except_map, double time_to_wait,
2370                        u32 * bits_set)
2371 {
2372   double wait = 0, start = 0;
2373
2374   if (!*bits_set)
2375     {
2376       wait = time_to_wait;
2377       start = clib_time_now (&wrk->clib_time);
2378     }
2379
2380   do
2381     {
2382       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
2383                             write_map, except_map, wait, bits_set);
2384       if (*bits_set)
2385         return *bits_set;
2386       if (wait == -1)
2387         continue;
2388
2389       wait = wait - (clib_time_now (&wrk->clib_time) - start);
2390     }
2391   while (wait > 0);
2392
2393   return 0;
2394 }
2395
2396 static int
2397 vppcom_select_eventfd (vcl_worker_t * wrk, int n_bits,
2398                        vcl_si_set * read_map, vcl_si_set * write_map,
2399                        vcl_si_set * except_map, double time_to_wait,
2400                        u32 * bits_set)
2401 {
2402   vcl_mq_evt_conn_t *mqc;
2403   int __clib_unused n_read;
2404   int n_mq_evts, i;
2405   u64 buf;
2406
2407   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2408   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2409                           vec_len (wrk->mq_events), time_to_wait);
2410   for (i = 0; i < n_mq_evts; i++)
2411     {
2412       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2413       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2414       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
2415                             except_map, 0, bits_set);
2416     }
2417
2418   return (n_mq_evts > 0 ? (int) *bits_set : 0);
2419 }
2420
2421 int
2422 vppcom_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
2423                vcl_si_set * except_map, double time_to_wait)
2424 {
2425   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
2426   vcl_worker_t *wrk = vcl_worker_get_current ();
2427   vcl_session_t *session = 0;
2428   int i;
2429
2430   if (n_bits && read_map)
2431     {
2432       clib_bitmap_validate (wrk->rd_bitmap, minbits);
2433       clib_memcpy_fast (wrk->rd_bitmap, read_map,
2434                         vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2435       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2436     }
2437   if (n_bits && write_map)
2438     {
2439       clib_bitmap_validate (wrk->wr_bitmap, minbits);
2440       clib_memcpy_fast (wrk->wr_bitmap, write_map,
2441                         vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2442       memset (write_map, 0, vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2443     }
2444   if (n_bits && except_map)
2445     {
2446       clib_bitmap_validate (wrk->ex_bitmap, minbits);
2447       clib_memcpy_fast (wrk->ex_bitmap, except_map,
2448                         vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2449       memset (except_map, 0, vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2450     }
2451
2452   if (!n_bits)
2453     return 0;
2454
2455   if (!write_map)
2456     goto check_rd;
2457
2458   /* *INDENT-OFF* */
2459   clib_bitmap_foreach (sid, wrk->wr_bitmap, ({
2460     if (!(session = vcl_session_get (wrk, sid)))
2461       {
2462         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
2463         bits_set++;
2464         continue;
2465       }
2466
2467     if (vcl_session_write_ready (session))
2468       {
2469         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
2470         bits_set++;
2471       }
2472     else
2473       svm_fifo_add_want_deq_ntf (session->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
2474   }));
2475
2476 check_rd:
2477   if (!read_map)
2478     goto check_mq;
2479
2480   clib_bitmap_foreach (sid, wrk->rd_bitmap, ({
2481     if (!(session = vcl_session_get (wrk, sid)))
2482       {
2483         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
2484         bits_set++;
2485         continue;
2486       }
2487
2488     if (vcl_session_read_ready (session))
2489       {
2490         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
2491         bits_set++;
2492       }
2493   }));
2494   /* *INDENT-ON* */
2495
2496 check_mq:
2497
2498   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2499     {
2500       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
2501                                   read_map, write_map, except_map, &bits_set);
2502     }
2503   vec_reset_length (wrk->unhandled_evts_vector);
2504
2505   if (vcm->cfg.use_mq_eventfd)
2506     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
2507                            time_to_wait, &bits_set);
2508   else
2509     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
2510                            time_to_wait, &bits_set);
2511
2512   return (bits_set);
2513 }
2514
2515 static inline void
2516 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_handle)
2517 {
2518   vcl_session_t *session;
2519   vppcom_epoll_t *vep;
2520   u32 sh = vep_handle;
2521
2522   if (VPPCOM_DEBUG <= 2)
2523     return;
2524
2525   session = vcl_session_get_w_handle (wrk, vep_handle);
2526   if (PREDICT_FALSE (!session))
2527     {
2528       VDBG (0, "ERROR: Invalid vep_sh (%u)!", vep_handle);
2529       goto done;
2530     }
2531   if (PREDICT_FALSE (!session->is_vep))
2532     {
2533       VDBG (0, "ERROR: vep_sh (%u) is not a vep!", vep_handle);
2534       goto done;
2535     }
2536   vep = &session->vep;
2537   VDBG (0, "vep_sh (%u): Dumping epoll chain\n"
2538         "{\n"
2539         "   is_vep         = %u\n"
2540         "   is_vep_session = %u\n"
2541         "   next_sh        = 0x%x (%u)\n"
2542         "}\n", vep_handle, session->is_vep, session->is_vep_session,
2543         vep->next_sh, vep->next_sh);
2544
2545   for (sh = vep->next_sh; sh != ~0; sh = vep->next_sh)
2546     {
2547       session = vcl_session_get_w_handle (wrk, sh);
2548       if (PREDICT_FALSE (!session))
2549         {
2550           VDBG (0, "ERROR: Invalid sh (%u)!", sh);
2551           goto done;
2552         }
2553       if (PREDICT_FALSE (session->is_vep))
2554         {
2555           VDBG (0, "ERROR: sh (%u) is a vep!", vep_handle);
2556         }
2557       else if (PREDICT_FALSE (!session->is_vep_session))
2558         {
2559           VDBG (0, "ERROR: sh (%u) is not a vep session handle!", sh);
2560           goto done;
2561         }
2562       vep = &session->vep;
2563       if (PREDICT_FALSE (vep->vep_sh != vep_handle))
2564         VDBG (0, "ERROR: session (%u) vep_sh (%u) != vep_sh (%u)!",
2565               sh, session->vep.vep_sh, vep_handle);
2566       if (session->is_vep_session)
2567         {
2568           VDBG (0, "vep_sh[%u]: sh 0x%x (%u)\n"
2569                 "{\n"
2570                 "   next_sh        = 0x%x (%u)\n"
2571                 "   prev_sh        = 0x%x (%u)\n"
2572                 "   vep_sh         = 0x%x (%u)\n"
2573                 "   ev.events      = 0x%x\n"
2574                 "   ev.data.u64    = 0x%llx\n"
2575                 "   et_mask        = 0x%x\n"
2576                 "}\n",
2577                 vep_handle, sh, sh, vep->next_sh, vep->next_sh, vep->prev_sh,
2578                 vep->prev_sh, vep->vep_sh, vep->vep_sh, vep->ev.events,
2579                 vep->ev.data.u64, vep->et_mask);
2580         }
2581     }
2582
2583 done:
2584   VDBG (0, "vep_sh (%u): Dump complete!\n", vep_handle);
2585 }
2586
2587 int
2588 vppcom_epoll_create (void)
2589 {
2590   vcl_worker_t *wrk = vcl_worker_get_current ();
2591   vcl_session_t *vep_session;
2592
2593   vep_session = vcl_session_alloc (wrk);
2594
2595   vep_session->is_vep = 1;
2596   vep_session->vep.vep_sh = ~0;
2597   vep_session->vep.next_sh = ~0;
2598   vep_session->vep.prev_sh = ~0;
2599   vep_session->vpp_handle = ~0;
2600
2601   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_session->session_index);
2602   VDBG (0, "Created vep_idx %u", vep_session->session_index);
2603
2604   return vcl_session_handle (vep_session);
2605 }
2606
2607 int
2608 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2609                   struct epoll_event *event)
2610 {
2611   vcl_worker_t *wrk = vcl_worker_get_current ();
2612   vcl_session_t *vep_session;
2613   vcl_session_t *session;
2614   int rv = VPPCOM_OK;
2615
2616   if (vep_handle == session_handle)
2617     {
2618       VDBG (0, "vep_sh == session handle (%u)!", vep_handle);
2619       return VPPCOM_EINVAL;
2620     }
2621
2622   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2623   if (PREDICT_FALSE (!vep_session))
2624     {
2625       VDBG (0, "Invalid vep_sh (%u)!", vep_handle);
2626       return VPPCOM_EBADFD;
2627     }
2628   if (PREDICT_FALSE (!vep_session->is_vep))
2629     {
2630       VDBG (0, "vep_sh (%u) is not a vep!", vep_handle);
2631       return VPPCOM_EINVAL;
2632     }
2633
2634   ASSERT (vep_session->vep.vep_sh == ~0);
2635   ASSERT (vep_session->vep.prev_sh == ~0);
2636
2637   session = vcl_session_get_w_handle (wrk, session_handle);
2638   if (PREDICT_FALSE (!session))
2639     {
2640       VDBG (0, "Invalid session_handle (%u)!", session_handle);
2641       return VPPCOM_EBADFD;
2642     }
2643   if (PREDICT_FALSE (session->is_vep))
2644     {
2645       VDBG (0, "session_handle (%u) is a vep!", vep_handle);
2646       return VPPCOM_EINVAL;
2647     }
2648
2649   switch (op)
2650     {
2651     case EPOLL_CTL_ADD:
2652       if (PREDICT_FALSE (!event))
2653         {
2654           VDBG (0, "EPOLL_CTL_ADD: NULL pointer to epoll_event structure!");
2655           return VPPCOM_EINVAL;
2656         }
2657       if (vep_session->vep.next_sh != ~0)
2658         {
2659           vcl_session_t *next_session;
2660           next_session = vcl_session_get_w_handle (wrk,
2661                                                    vep_session->vep.next_sh);
2662           if (PREDICT_FALSE (!next_session))
2663             {
2664               VDBG (0, "EPOLL_CTL_ADD: Invalid vep.next_sh (%u) on "
2665                     "vep_idx (%u)!", vep_session->vep.next_sh, vep_handle);
2666               return VPPCOM_EBADFD;
2667             }
2668           ASSERT (next_session->vep.prev_sh == vep_handle);
2669           next_session->vep.prev_sh = session_handle;
2670         }
2671       session->vep.next_sh = vep_session->vep.next_sh;
2672       session->vep.prev_sh = vep_handle;
2673       session->vep.vep_sh = vep_handle;
2674       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2675       session->vep.ev = *event;
2676       session->is_vep = 0;
2677       session->is_vep_session = 1;
2678       vep_session->vep.next_sh = session_handle;
2679
2680       if (session->tx_fifo)
2681         svm_fifo_add_want_deq_ntf (session->tx_fifo,
2682                                    SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2683
2684       /* Generate EPOLLOUT if tx fifo not full */
2685       if ((event->events & EPOLLOUT) &&
2686           (vcl_session_write_ready (session) > 0))
2687         {
2688           session_event_t e = { 0 };
2689           e.event_type = SESSION_IO_EVT_TX;
2690           e.session_index = session->session_index;
2691           vec_add1 (wrk->unhandled_evts_vector, e);
2692         }
2693       /* Generate EPOLLIN if rx fifo has data */
2694       if ((event->events & EPOLLIN) && (vcl_session_read_ready (session) > 0))
2695         {
2696           session_event_t e = { 0 };
2697           e.event_type = SESSION_IO_EVT_RX;
2698           e.session_index = session->session_index;
2699           vec_add1 (wrk->unhandled_evts_vector, e);
2700         }
2701       VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2702             vep_handle, session_handle, event->events, event->data.u64);
2703       vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
2704       break;
2705
2706     case EPOLL_CTL_MOD:
2707       if (PREDICT_FALSE (!event))
2708         {
2709           VDBG (0, "EPOLL_CTL_MOD: NULL pointer to epoll_event structure!");
2710           rv = VPPCOM_EINVAL;
2711           goto done;
2712         }
2713       else if (PREDICT_FALSE (!session->is_vep_session))
2714         {
2715           VDBG (0, "sh %u EPOLL_CTL_MOD: not a vep session!", session_handle);
2716           rv = VPPCOM_EINVAL;
2717           goto done;
2718         }
2719       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2720         {
2721           VDBG (0, "EPOLL_CTL_MOD: sh %u vep_sh (%u) != vep_sh (%u)!",
2722                 session_handle, session->vep.vep_sh, vep_handle);
2723           rv = VPPCOM_EINVAL;
2724           goto done;
2725         }
2726
2727       /* Generate EPOLLOUT when tx_fifo/ct_tx_fifo not full */
2728       if ((event->events & EPOLLOUT) &&
2729           !(session->vep.ev.events & EPOLLOUT) &&
2730           (vcl_session_write_ready (session) > 0))
2731         {
2732           session_event_t e = { 0 };
2733           e.event_type = SESSION_IO_EVT_TX;
2734           e.session_index = session->session_index;
2735           vec_add1 (wrk->unhandled_evts_vector, e);
2736         }
2737       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2738       session->vep.ev = *event;
2739       VDBG (1, "EPOLL_CTL_MOD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2740             vep_handle, session_handle, event->events, event->data.u64);
2741       break;
2742
2743     case EPOLL_CTL_DEL:
2744       if (PREDICT_FALSE (!session->is_vep_session))
2745         {
2746           VDBG (0, "EPOLL_CTL_DEL: %u not a vep session!", session_handle);
2747           rv = VPPCOM_EINVAL;
2748           goto done;
2749         }
2750       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2751         {
2752           VDBG (0, "EPOLL_CTL_DEL: sh %u vep_sh (%u) != vep_sh (%u)!",
2753                 session_handle, session->vep.vep_sh, vep_handle);
2754           rv = VPPCOM_EINVAL;
2755           goto done;
2756         }
2757
2758       if (session->vep.prev_sh == vep_handle)
2759         vep_session->vep.next_sh = session->vep.next_sh;
2760       else
2761         {
2762           vcl_session_t *prev_session;
2763           prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh);
2764           if (PREDICT_FALSE (!prev_session))
2765             {
2766               VDBG (0, "EPOLL_CTL_DEL: Invalid prev_sh (%u) on sh (%u)!",
2767                     session->vep.prev_sh, session_handle);
2768               return VPPCOM_EBADFD;
2769             }
2770           ASSERT (prev_session->vep.next_sh == session_handle);
2771           prev_session->vep.next_sh = session->vep.next_sh;
2772         }
2773       if (session->vep.next_sh != ~0)
2774         {
2775           vcl_session_t *next_session;
2776           next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh);
2777           if (PREDICT_FALSE (!next_session))
2778             {
2779               VDBG (0, "EPOLL_CTL_DEL: Invalid next_sh (%u) on sh (%u)!",
2780                     session->vep.next_sh, session_handle);
2781               return VPPCOM_EBADFD;
2782             }
2783           ASSERT (next_session->vep.prev_sh == session_handle);
2784           next_session->vep.prev_sh = session->vep.prev_sh;
2785         }
2786
2787       memset (&session->vep, 0, sizeof (session->vep));
2788       session->vep.next_sh = ~0;
2789       session->vep.prev_sh = ~0;
2790       session->vep.vep_sh = ~0;
2791       session->is_vep_session = 0;
2792
2793       if (session->tx_fifo)
2794         svm_fifo_del_want_deq_ntf (session->tx_fifo, SVM_FIFO_NO_DEQ_NOTIF);
2795
2796       VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sh %u!", vep_handle,
2797             session_handle);
2798       vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
2799       break;
2800
2801     default:
2802       VDBG (0, "Invalid operation (%d)!", op);
2803       rv = VPPCOM_EINVAL;
2804     }
2805
2806   vep_verify_epoll_chain (wrk, vep_handle);
2807
2808 done:
2809   return rv;
2810 }
2811
2812 static inline void
2813 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2814                                 struct epoll_event *events, u32 * num_ev)
2815 {
2816   session_disconnected_msg_t *disconnected_msg;
2817   session_connected_msg_t *connected_msg;
2818   u32 sid = ~0, session_events;
2819   u64 session_evt_data = ~0;
2820   vcl_session_t *session;
2821   u8 add_event = 0;
2822
2823   switch (e->event_type)
2824     {
2825     case SESSION_IO_EVT_RX:
2826       sid = e->session_index;
2827       session = vcl_session_get (wrk, sid);
2828       if (vcl_session_is_closed (session))
2829         break;
2830       vcl_fifo_rx_evt_valid_or_break (session);
2831       session_events = session->vep.ev.events;
2832       if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
2833         break;
2834       add_event = 1;
2835       events[*num_ev].events |= EPOLLIN;
2836       session_evt_data = session->vep.ev.data.u64;
2837       session->has_rx_evt = 1;
2838       break;
2839     case SESSION_IO_EVT_TX:
2840       sid = e->session_index;
2841       session = vcl_session_get (wrk, sid);
2842       if (vcl_session_is_closed (session))
2843         break;
2844       session_events = session->vep.ev.events;
2845       if (!(EPOLLOUT & session_events))
2846         break;
2847       add_event = 1;
2848       events[*num_ev].events |= EPOLLOUT;
2849       session_evt_data = session->vep.ev.data.u64;
2850       svm_fifo_reset_has_deq_ntf (session->tx_fifo);
2851       break;
2852     case SESSION_CTRL_EVT_ACCEPTED:
2853       session = vcl_session_accepted (wrk,
2854                                       (session_accepted_msg_t *) e->data);
2855       if (!session)
2856         break;
2857
2858       session_events = session->vep.ev.events;
2859       if (!(EPOLLIN & session_events))
2860         break;
2861
2862       add_event = 1;
2863       events[*num_ev].events |= EPOLLIN;
2864       session_evt_data = session->vep.ev.data.u64;
2865       break;
2866     case SESSION_CTRL_EVT_CONNECTED:
2867       connected_msg = (session_connected_msg_t *) e->data;
2868       sid = vcl_session_connected_handler (wrk, connected_msg);
2869       /* Generate EPOLLOUT because there's no connected event */
2870       session = vcl_session_get (wrk, sid);
2871       if (vcl_session_is_closed (session))
2872         break;
2873       session_events = session->vep.ev.events;
2874       if (!(EPOLLOUT & session_events))
2875         break;
2876       add_event = 1;
2877       events[*num_ev].events |= EPOLLOUT;
2878       session_evt_data = session->vep.ev.data.u64;
2879       if (session->session_state & STATE_DETACHED)
2880         events[*num_ev].events |= EPOLLHUP;
2881       break;
2882     case SESSION_CTRL_EVT_DISCONNECTED:
2883       disconnected_msg = (session_disconnected_msg_t *) e->data;
2884       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
2885       if (vcl_session_is_closed (session))
2886         break;
2887       session_events = session->vep.ev.events;
2888       add_event = 1;
2889       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2890       session_evt_data = session->vep.ev.data.u64;
2891       break;
2892     case SESSION_CTRL_EVT_RESET:
2893       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2894       session = vcl_session_get (wrk, sid);
2895       if (vcl_session_is_closed (session))
2896         break;
2897       session_events = session->vep.ev.events;
2898       add_event = 1;
2899       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2900       session_evt_data = session->vep.ev.data.u64;
2901       break;
2902     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2903       vcl_session_unlisten_reply_handler (wrk, e->data);
2904       break;
2905     case SESSION_CTRL_EVT_MIGRATED:
2906       vcl_session_migrated_handler (wrk, e->data);
2907       break;
2908     case SESSION_CTRL_EVT_CLEANUP:
2909       vcl_session_cleanup_handler (wrk, e->data);
2910       break;
2911     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2912       vcl_session_req_worker_update_handler (wrk, e->data);
2913       break;
2914     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2915       vcl_session_worker_update_reply_handler (wrk, e->data);
2916       break;
2917     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
2918       vcl_session_app_add_segment_handler (wrk, e->data);
2919       break;
2920     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
2921       vcl_session_app_del_segment_handler (wrk, e->data);
2922       break;
2923     case SESSION_CTRL_EVT_RPC:
2924       vcl_worker_rpc_handler (wrk, e->data);
2925       break;
2926     default:
2927       VDBG (0, "unhandled: %u", e->event_type);
2928       break;
2929     }
2930
2931   if (add_event)
2932     {
2933       events[*num_ev].data.u64 = session_evt_data;
2934       if (EPOLLONESHOT & session_events)
2935         {
2936           session = vcl_session_get (wrk, sid);
2937           session->vep.ev.events = 0;
2938         }
2939       *num_ev += 1;
2940     }
2941 }
2942
2943 static int
2944 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2945                           struct epoll_event *events, u32 maxevents,
2946                           double wait_for_time, u32 * num_ev)
2947 {
2948   svm_msg_q_msg_t *msg;
2949   session_event_t *e;
2950   int i;
2951
2952   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
2953     goto handle_dequeued;
2954
2955   svm_msg_q_lock (mq);
2956   if (svm_msg_q_is_empty (mq))
2957     {
2958       if (!wait_for_time)
2959         {
2960           svm_msg_q_unlock (mq);
2961           return 0;
2962         }
2963       else if (wait_for_time < 0)
2964         {
2965           svm_msg_q_wait (mq);
2966         }
2967       else
2968         {
2969           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2970             {
2971               svm_msg_q_unlock (mq);
2972               return 0;
2973             }
2974         }
2975     }
2976   ASSERT (maxevents > *num_ev);
2977   vcl_mq_dequeue_batch (wrk, mq, ~0);
2978   svm_msg_q_unlock (mq);
2979
2980 handle_dequeued:
2981   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2982     {
2983       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2984       e = svm_msg_q_msg_data (mq, msg);
2985       if (*num_ev < maxevents)
2986         vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
2987       else
2988         vcl_handle_mq_event (wrk, e);
2989       svm_msg_q_free_msg (mq, msg);
2990     }
2991   vec_reset_length (wrk->mq_msg_vector);
2992   vcl_handle_pending_wrk_updates (wrk);
2993   return *num_ev;
2994 }
2995
2996 static int
2997 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
2998                            int maxevents, u32 n_evts, double wait_for_time)
2999 {
3000   double wait = 0, start = 0, now;
3001
3002   if (!n_evts)
3003     {
3004       wait = wait_for_time;
3005       start = clib_time_now (&wrk->clib_time);
3006     }
3007
3008   do
3009     {
3010       vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events, maxevents,
3011                                 wait, &n_evts);
3012       if (n_evts)
3013         return n_evts;
3014       if (wait == -1)
3015         continue;
3016
3017       now = clib_time_now (&wrk->clib_time);
3018       wait -= (now - start) * 1e3;
3019       start = now;
3020     }
3021   while (wait > 0);
3022
3023   return 0;
3024 }
3025
3026 static int
3027 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
3028                            int maxevents, u32 n_evts, double wait_for_time)
3029 {
3030   vcl_mq_evt_conn_t *mqc;
3031   int __clib_unused n_read;
3032   int n_mq_evts, i;
3033   u64 buf;
3034
3035   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
3036 again:
3037   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
3038                           vec_len (wrk->mq_events), wait_for_time);
3039   for (i = 0; i < n_mq_evts; i++)
3040     {
3041       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
3042       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
3043       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
3044     }
3045   if (!n_evts && n_mq_evts > 0)
3046     goto again;
3047
3048   return (int) n_evts;
3049 }
3050
3051 int
3052 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
3053                    int maxevents, double wait_for_time)
3054 {
3055   vcl_worker_t *wrk = vcl_worker_get_current ();
3056   vcl_session_t *vep_session;
3057   u32 n_evts = 0;
3058   int i;
3059
3060   if (PREDICT_FALSE (maxevents <= 0))
3061     {
3062       VDBG (0, "ERROR: Invalid maxevents (%d)!", maxevents);
3063       return VPPCOM_EINVAL;
3064     }
3065
3066   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
3067   if (!vep_session)
3068     return VPPCOM_EBADFD;
3069
3070   if (PREDICT_FALSE (!vep_session->is_vep))
3071     {
3072       VDBG (0, "ERROR: vep_idx (%u) is not a vep!", vep_handle);
3073       return VPPCOM_EINVAL;
3074     }
3075
3076   memset (events, 0, sizeof (*events) * maxevents);
3077
3078   if (vec_len (wrk->unhandled_evts_vector))
3079     {
3080       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
3081         {
3082           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
3083                                           events, &n_evts);
3084           if (n_evts == maxevents)
3085             {
3086               vec_delete (wrk->unhandled_evts_vector, i + 1, 0);
3087               return n_evts;
3088             }
3089         }
3090       vec_reset_length (wrk->unhandled_evts_vector);
3091     }
3092
3093   if (vcm->cfg.use_mq_eventfd)
3094     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
3095                                       wait_for_time);
3096
3097   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
3098                                     wait_for_time);
3099 }
3100
3101 int
3102 vppcom_session_attr (uint32_t session_handle, uint32_t op,
3103                      void *buffer, uint32_t * buflen)
3104 {
3105   vcl_worker_t *wrk = vcl_worker_get_current ();
3106   vcl_session_t *session;
3107   int rv = VPPCOM_OK;
3108   u32 *flags = buffer, tmp_flags = 0;
3109   vppcom_endpt_t *ep = buffer;
3110
3111   session = vcl_session_get_w_handle (wrk, session_handle);
3112   if (!session)
3113     return VPPCOM_EBADFD;
3114
3115   switch (op)
3116     {
3117     case VPPCOM_ATTR_GET_NREAD:
3118       rv = vcl_session_read_ready (session);
3119       VDBG (2, "VPPCOM_ATTR_GET_NREAD: sh %u, nread = %d", session_handle,
3120             rv);
3121       break;
3122
3123     case VPPCOM_ATTR_GET_NWRITE:
3124       rv = vcl_session_write_ready (session);
3125       VDBG (2, "VPPCOM_ATTR_GET_NWRITE: sh %u, nwrite = %d", session_handle,
3126             rv);
3127       break;
3128
3129     case VPPCOM_ATTR_GET_FLAGS:
3130       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
3131         {
3132           *flags =
3133             O_RDWR |
3134             (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK) ?
3135              O_NONBLOCK : 0);
3136           *buflen = sizeof (*flags);
3137           VDBG (2, "VPPCOM_ATTR_GET_FLAGS: sh %u, flags = 0x%08x, "
3138                 "is_nonblocking = %u", session_handle, *flags,
3139                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
3140         }
3141       else
3142         rv = VPPCOM_EINVAL;
3143       break;
3144
3145     case VPPCOM_ATTR_SET_FLAGS:
3146       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
3147         {
3148           if (*flags & O_NONBLOCK)
3149             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
3150           else
3151             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
3152
3153           VDBG (2, "VPPCOM_ATTR_SET_FLAGS: sh %u, flags = 0x%08x,"
3154                 " is_nonblocking = %u", session_handle, *flags,
3155                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
3156         }
3157       else
3158         rv = VPPCOM_EINVAL;
3159       break;
3160
3161     case VPPCOM_ATTR_GET_PEER_ADDR:
3162       if (PREDICT_TRUE (buffer && buflen &&
3163                         (*buflen >= sizeof (*ep)) && ep->ip))
3164         {
3165           ep->is_ip4 = session->transport.is_ip4;
3166           ep->port = session->transport.rmt_port;
3167           if (session->transport.is_ip4)
3168             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3169                               sizeof (ip4_address_t));
3170           else
3171             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3172                               sizeof (ip6_address_t));
3173           *buflen = sizeof (*ep);
3174           VDBG (1, "VPPCOM_ATTR_GET_PEER_ADDR: sh %u, is_ip4 = %u, "
3175                 "addr = %U, port %u", session_handle, ep->is_ip4,
3176                 format_ip46_address, &session->transport.rmt_ip,
3177                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3178                 clib_net_to_host_u16 (ep->port));
3179         }
3180       else
3181         rv = VPPCOM_EINVAL;
3182       break;
3183
3184     case VPPCOM_ATTR_GET_LCL_ADDR:
3185       if (PREDICT_TRUE (buffer && buflen &&
3186                         (*buflen >= sizeof (*ep)) && ep->ip))
3187         {
3188           ep->is_ip4 = session->transport.is_ip4;
3189           ep->port = session->transport.lcl_port;
3190           if (session->transport.is_ip4)
3191             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip4,
3192                               sizeof (ip4_address_t));
3193           else
3194             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip6,
3195                               sizeof (ip6_address_t));
3196           *buflen = sizeof (*ep);
3197           VDBG (1, "VPPCOM_ATTR_GET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3198                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3199                 &session->transport.lcl_ip,
3200                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3201                 clib_net_to_host_u16 (ep->port));
3202         }
3203       else
3204         rv = VPPCOM_EINVAL;
3205       break;
3206
3207     case VPPCOM_ATTR_SET_LCL_ADDR:
3208       if (PREDICT_TRUE (buffer && buflen &&
3209                         (*buflen >= sizeof (*ep)) && ep->ip))
3210         {
3211           session->transport.is_ip4 = ep->is_ip4;
3212           session->transport.lcl_port = ep->port;
3213           vcl_ip_copy_from_ep (&session->transport.lcl_ip, ep);
3214           *buflen = sizeof (*ep);
3215           VDBG (1, "VPPCOM_ATTR_SET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3216                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3217                 &session->transport.lcl_ip,
3218                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3219                 clib_net_to_host_u16 (ep->port));
3220         }
3221       else
3222         rv = VPPCOM_EINVAL;
3223       break;
3224
3225     case VPPCOM_ATTR_GET_LIBC_EPFD:
3226       rv = session->libc_epfd;
3227       VDBG (2, "VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d", rv);
3228       break;
3229
3230     case VPPCOM_ATTR_SET_LIBC_EPFD:
3231       if (PREDICT_TRUE (buffer && buflen &&
3232                         (*buflen == sizeof (session->libc_epfd))))
3233         {
3234           session->libc_epfd = *(int *) buffer;
3235           *buflen = sizeof (session->libc_epfd);
3236
3237           VDBG (2, "VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, buflen %d",
3238                 session->libc_epfd, *buflen);
3239         }
3240       else
3241         rv = VPPCOM_EINVAL;
3242       break;
3243
3244     case VPPCOM_ATTR_GET_PROTOCOL:
3245       if (buffer && buflen && (*buflen >= sizeof (int)))
3246         {
3247           *(int *) buffer = session->session_type;
3248           *buflen = sizeof (int);
3249
3250           VDBG (2, "VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
3251                 *(int *) buffer, *(int *) buffer ? "UDP" : "TCP", *buflen);
3252         }
3253       else
3254         rv = VPPCOM_EINVAL;
3255       break;
3256
3257     case VPPCOM_ATTR_GET_LISTEN:
3258       if (buffer && buflen && (*buflen >= sizeof (int)))
3259         {
3260           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3261                                                 VCL_SESS_ATTR_LISTEN);
3262           *buflen = sizeof (int);
3263
3264           VDBG (2, "VPPCOM_ATTR_GET_LISTEN: %d, buflen %d", *(int *) buffer,
3265                 *buflen);
3266         }
3267       else
3268         rv = VPPCOM_EINVAL;
3269       break;
3270
3271     case VPPCOM_ATTR_GET_ERROR:
3272       if (buffer && buflen && (*buflen >= sizeof (int)))
3273         {
3274           *(int *) buffer = 0;
3275           *buflen = sizeof (int);
3276
3277           VDBG (2, "VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
3278                 *(int *) buffer, *buflen);
3279         }
3280       else
3281         rv = VPPCOM_EINVAL;
3282       break;
3283
3284     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
3285       if (buffer && buflen && (*buflen >= sizeof (u32)))
3286         {
3287
3288           /* VPP-TBD */
3289           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
3290                                 session->tx_fifo ?
3291                                 svm_fifo_size (session->tx_fifo) :
3292                                 vcm->cfg.tx_fifo_size);
3293           *buflen = sizeof (u32);
3294
3295           VDBG (2, "VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3296                 " #VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer,
3297                 *buflen);
3298         }
3299       else
3300         rv = VPPCOM_EINVAL;
3301       break;
3302
3303     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
3304       if (buffer && buflen && (*buflen == sizeof (u32)))
3305         {
3306           /* VPP-TBD */
3307           session->sndbuf_size = *(u32 *) buffer;
3308           VDBG (2, "VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3309                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3310                 *buflen);
3311         }
3312       else
3313         rv = VPPCOM_EINVAL;
3314       break;
3315
3316     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
3317       if (buffer && buflen && (*buflen >= sizeof (u32)))
3318         {
3319
3320           /* VPP-TBD */
3321           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
3322                                 session->rx_fifo ?
3323                                 svm_fifo_size (session->rx_fifo) :
3324                                 vcm->cfg.rx_fifo_size);
3325           *buflen = sizeof (u32);
3326
3327           VDBG (2, "VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), buflen %d, "
3328                 "#VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer, *buflen);
3329         }
3330       else
3331         rv = VPPCOM_EINVAL;
3332       break;
3333
3334     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
3335       if (buffer && buflen && (*buflen == sizeof (u32)))
3336         {
3337           /* VPP-TBD */
3338           session->rcvbuf_size = *(u32 *) buffer;
3339           VDBG (2, "VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), buflen %d,"
3340                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3341                 *buflen);
3342         }
3343       else
3344         rv = VPPCOM_EINVAL;
3345       break;
3346
3347     case VPPCOM_ATTR_GET_REUSEADDR:
3348       if (buffer && buflen && (*buflen >= sizeof (int)))
3349         {
3350           /* VPP-TBD */
3351           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3352                                                 VCL_SESS_ATTR_REUSEADDR);
3353           *buflen = sizeof (int);
3354
3355           VDBG (2, "VPPCOM_ATTR_GET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3356                 *(int *) buffer, *buflen);
3357         }
3358       else
3359         rv = VPPCOM_EINVAL;
3360       break;
3361
3362     case VPPCOM_ATTR_SET_REUSEADDR:
3363       if (buffer && buflen && (*buflen == sizeof (int)) &&
3364           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
3365         {
3366           /* VPP-TBD */
3367           if (*(int *) buffer)
3368             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEADDR);
3369           else
3370             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEADDR);
3371
3372           VDBG (2, "VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3373                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_REUSEADDR),
3374                 *buflen);
3375         }
3376       else
3377         rv = VPPCOM_EINVAL;
3378       break;
3379
3380     case VPPCOM_ATTR_GET_REUSEPORT:
3381       if (buffer && buflen && (*buflen >= sizeof (int)))
3382         {
3383           /* VPP-TBD */
3384           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3385                                                 VCL_SESS_ATTR_REUSEPORT);
3386           *buflen = sizeof (int);
3387
3388           VDBG (2, "VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3389                 *(int *) buffer, *buflen);
3390         }
3391       else
3392         rv = VPPCOM_EINVAL;
3393       break;
3394
3395     case VPPCOM_ATTR_SET_REUSEPORT:
3396       if (buffer && buflen && (*buflen == sizeof (int)) &&
3397           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
3398         {
3399           /* VPP-TBD */
3400           if (*(int *) buffer)
3401             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEPORT);
3402           else
3403             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEPORT);
3404
3405           VDBG (2, "VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3406                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_REUSEPORT),
3407                 *buflen);
3408         }
3409       else
3410         rv = VPPCOM_EINVAL;
3411       break;
3412
3413     case VPPCOM_ATTR_GET_BROADCAST:
3414       if (buffer && buflen && (*buflen >= sizeof (int)))
3415         {
3416           /* VPP-TBD */
3417           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3418                                                 VCL_SESS_ATTR_BROADCAST);
3419           *buflen = sizeof (int);
3420
3421           VDBG (2, "VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3422                 *(int *) buffer, *buflen);
3423         }
3424       else
3425         rv = VPPCOM_EINVAL;
3426       break;
3427
3428     case VPPCOM_ATTR_SET_BROADCAST:
3429       if (buffer && buflen && (*buflen == sizeof (int)))
3430         {
3431           /* VPP-TBD */
3432           if (*(int *) buffer)
3433             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_BROADCAST);
3434           else
3435             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_BROADCAST);
3436
3437           VDBG (2, "VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3438                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_BROADCAST),
3439                 *buflen);
3440         }
3441       else
3442         rv = VPPCOM_EINVAL;
3443       break;
3444
3445     case VPPCOM_ATTR_GET_V6ONLY:
3446       if (buffer && buflen && (*buflen >= sizeof (int)))
3447         {
3448           /* VPP-TBD */
3449           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3450                                                 VCL_SESS_ATTR_V6ONLY);
3451           *buflen = sizeof (int);
3452
3453           VDBG (2, "VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3454                 *(int *) buffer, *buflen);
3455         }
3456       else
3457         rv = VPPCOM_EINVAL;
3458       break;
3459
3460     case VPPCOM_ATTR_SET_V6ONLY:
3461       if (buffer && buflen && (*buflen == sizeof (int)))
3462         {
3463           /* VPP-TBD */
3464           if (*(int *) buffer)
3465             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_V6ONLY);
3466           else
3467             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_V6ONLY);
3468
3469           VDBG (2, "VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3470                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_V6ONLY),
3471                 *buflen);
3472         }
3473       else
3474         rv = VPPCOM_EINVAL;
3475       break;
3476
3477     case VPPCOM_ATTR_GET_KEEPALIVE:
3478       if (buffer && buflen && (*buflen >= sizeof (int)))
3479         {
3480           /* VPP-TBD */
3481           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3482                                                 VCL_SESS_ATTR_KEEPALIVE);
3483           *buflen = sizeof (int);
3484
3485           VDBG (2, "VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3486                 *(int *) buffer, *buflen);
3487         }
3488       else
3489         rv = VPPCOM_EINVAL;
3490       break;
3491
3492     case VPPCOM_ATTR_SET_KEEPALIVE:
3493       if (buffer && buflen && (*buflen == sizeof (int)))
3494         {
3495           /* VPP-TBD */
3496           if (*(int *) buffer)
3497             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3498           else
3499             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3500
3501           VDBG (2, "VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3502                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_KEEPALIVE),
3503                 *buflen);
3504         }
3505       else
3506         rv = VPPCOM_EINVAL;
3507       break;
3508
3509     case VPPCOM_ATTR_GET_TCP_NODELAY:
3510       if (buffer && buflen && (*buflen >= sizeof (int)))
3511         {
3512           /* VPP-TBD */
3513           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3514                                                 VCL_SESS_ATTR_TCP_NODELAY);
3515           *buflen = sizeof (int);
3516
3517           VDBG (2, "VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3518                 *(int *) buffer, *buflen);
3519         }
3520       else
3521         rv = VPPCOM_EINVAL;
3522       break;
3523
3524     case VPPCOM_ATTR_SET_TCP_NODELAY:
3525       if (buffer && buflen && (*buflen == sizeof (int)))
3526         {
3527           /* VPP-TBD */
3528           if (*(int *) buffer)
3529             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3530           else
3531             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3532
3533           VDBG (2, "VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3534                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_TCP_NODELAY),
3535                 *buflen);
3536         }
3537       else
3538         rv = VPPCOM_EINVAL;
3539       break;
3540
3541     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3542       if (buffer && buflen && (*buflen >= sizeof (int)))
3543         {
3544           /* VPP-TBD */
3545           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3546                                                 VCL_SESS_ATTR_TCP_KEEPIDLE);
3547           *buflen = sizeof (int);
3548
3549           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3550                 *(int *) buffer, *buflen);
3551         }
3552       else
3553         rv = VPPCOM_EINVAL;
3554       break;
3555
3556     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3557       if (buffer && buflen && (*buflen == sizeof (int)))
3558         {
3559           /* VPP-TBD */
3560           if (*(int *) buffer)
3561             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3562           else
3563             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3564
3565           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3566                 VCL_SESS_ATTR_TEST (session->attr,
3567                                     VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3568         }
3569       else
3570         rv = VPPCOM_EINVAL;
3571       break;
3572
3573     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3574       if (buffer && buflen && (*buflen >= sizeof (int)))
3575         {
3576           /* VPP-TBD */
3577           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3578                                                 VCL_SESS_ATTR_TCP_KEEPINTVL);
3579           *buflen = sizeof (int);
3580
3581           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3582                 *(int *) buffer, *buflen);
3583         }
3584       else
3585         rv = VPPCOM_EINVAL;
3586       break;
3587
3588     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3589       if (buffer && buflen && (*buflen == sizeof (int)))
3590         {
3591           /* VPP-TBD */
3592           if (*(int *) buffer)
3593             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3594           else
3595             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3596
3597           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3598                 VCL_SESS_ATTR_TEST (session->attr,
3599                                     VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3600         }
3601       else
3602         rv = VPPCOM_EINVAL;
3603       break;
3604
3605     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3606       if (buffer && buflen && (*buflen >= sizeof (u32)))
3607         {
3608           /* VPP-TBD */
3609           *(u32 *) buffer = session->user_mss;
3610           *buflen = sizeof (int);
3611
3612           VDBG (2, "VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d, #VPP-TBD#",
3613                 *(int *) buffer, *buflen);
3614         }
3615       else
3616         rv = VPPCOM_EINVAL;
3617       break;
3618
3619     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3620       if (buffer && buflen && (*buflen == sizeof (u32)))
3621         {
3622           /* VPP-TBD */
3623           session->user_mss = *(u32 *) buffer;
3624
3625           VDBG (2, "VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, #VPP-TBD#",
3626                 session->user_mss, *buflen);
3627         }
3628       else
3629         rv = VPPCOM_EINVAL;
3630       break;
3631
3632     case VPPCOM_ATTR_SET_SHUT:
3633       if (*flags == SHUT_RD || *flags == SHUT_RDWR)
3634         VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_SHUT_RD);
3635       if (*flags == SHUT_WR || *flags == SHUT_RDWR)
3636         VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_SHUT_WR);
3637       break;
3638
3639     case VPPCOM_ATTR_GET_SHUT:
3640       if (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_SHUT_RD))
3641         tmp_flags = 1;
3642       if (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_SHUT_WR))
3643         tmp_flags |= 2;
3644       if (tmp_flags == 1)
3645         *(int *) buffer = SHUT_RD;
3646       else if (tmp_flags == 2)
3647         *(int *) buffer = SHUT_WR;
3648       else if (tmp_flags == 3)
3649         *(int *) buffer = SHUT_RDWR;
3650       *buflen = sizeof (int);
3651       break;
3652
3653     case VPPCOM_ATTR_SET_CONNECTED:
3654       session->flags |= VCL_SESSION_F_CONNECTED;
3655       break;
3656
3657     default:
3658       rv = VPPCOM_EINVAL;
3659       break;
3660     }
3661
3662   return rv;
3663 }
3664
3665 int
3666 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3667                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3668 {
3669   vcl_worker_t *wrk = vcl_worker_get_current ();
3670   vcl_session_t *session;
3671   int rv = VPPCOM_OK;
3672
3673   if (flags == 0)
3674     rv = vppcom_session_read (session_handle, buffer, buflen);
3675   else if (flags & MSG_PEEK)
3676     rv = vppcom_session_peek (session_handle, buffer, buflen);
3677   else
3678     {
3679       VDBG (0, "Unsupport flags for recvfrom %d", flags);
3680       return VPPCOM_EAFNOSUPPORT;
3681     }
3682
3683   if (ep && rv > 0)
3684     {
3685       session = vcl_session_get_w_handle (wrk, session_handle);
3686       if (session->transport.is_ip4)
3687         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3688                           sizeof (ip4_address_t));
3689       else
3690         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3691                           sizeof (ip6_address_t));
3692       ep->is_ip4 = session->transport.is_ip4;
3693       ep->port = session->transport.rmt_port;
3694     }
3695
3696   return rv;
3697 }
3698
3699 int
3700 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3701                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3702 {
3703   vcl_worker_t *wrk = vcl_worker_get_current ();
3704   vcl_session_t *s;
3705
3706   s = vcl_session_get_w_handle (wrk, session_handle);
3707   if (!s)
3708     return VPPCOM_EBADFD;
3709
3710   if (!buffer)
3711     return VPPCOM_EINVAL;
3712
3713   if (ep)
3714     {
3715       if (s->session_type != VPPCOM_PROTO_UDP
3716           || (s->flags & VCL_SESSION_F_CONNECTED))
3717         return VPPCOM_EINVAL;
3718
3719       /* Session not connected/bound in vpp. Create it by 'connecting' it */
3720       if (PREDICT_FALSE (s->session_state == STATE_CLOSED))
3721         {
3722           vcl_send_session_connect (wrk, s);
3723         }
3724       else
3725         {
3726           s->transport.is_ip4 = ep->is_ip4;
3727           s->transport.rmt_port = ep->port;
3728           vcl_ip_copy_from_ep (&s->transport.rmt_ip, ep);
3729         }
3730     }
3731
3732   if (flags)
3733     {
3734       // TBD check the flags and do the right thing
3735       VDBG (2, "handling flags 0x%u (%d) not implemented yet.", flags, flags);
3736     }
3737
3738   return (vppcom_session_write_inline (wrk, s, buffer, buflen, 1,
3739                                        s->is_dgram ? 1 : 0));
3740 }
3741
3742 int
3743 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3744 {
3745   vcl_worker_t *wrk = vcl_worker_get_current ();
3746   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3747   u32 i, keep_trying = 1;
3748   svm_msg_q_msg_t msg;
3749   session_event_t *e;
3750   int rv, num_ev = 0;
3751
3752   VDBG (3, "vp %p, nsids %u, wait_for_time %f", vp, n_sids, wait_for_time);
3753
3754   if (!vp)
3755     return VPPCOM_EFAULT;
3756
3757   do
3758     {
3759       vcl_session_t *session;
3760
3761       /* Dequeue all events and drop all unhandled io events */
3762       while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
3763         {
3764           e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
3765           vcl_handle_mq_event (wrk, e);
3766           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
3767         }
3768       vec_reset_length (wrk->unhandled_evts_vector);
3769
3770       for (i = 0; i < n_sids; i++)
3771         {
3772           session = vcl_session_get (wrk, vp[i].sh);
3773           if (!session)
3774             {
3775               vp[i].revents = POLLHUP;
3776               num_ev++;
3777               continue;
3778             }
3779
3780           vp[i].revents = 0;
3781
3782           if (POLLIN & vp[i].events)
3783             {
3784               rv = vcl_session_read_ready (session);
3785               if (rv > 0)
3786                 {
3787                   vp[i].revents |= POLLIN;
3788                   num_ev++;
3789                 }
3790               else if (rv < 0)
3791                 {
3792                   switch (rv)
3793                     {
3794                     case VPPCOM_ECONNRESET:
3795                       vp[i].revents = POLLHUP;
3796                       break;
3797
3798                     default:
3799                       vp[i].revents = POLLERR;
3800                       break;
3801                     }
3802                   num_ev++;
3803                 }
3804             }
3805
3806           if (POLLOUT & vp[i].events)
3807             {
3808               rv = vcl_session_write_ready (session);
3809               if (rv > 0)
3810                 {
3811                   vp[i].revents |= POLLOUT;
3812                   num_ev++;
3813                 }
3814               else if (rv < 0)
3815                 {
3816                   switch (rv)
3817                     {
3818                     case VPPCOM_ECONNRESET:
3819                       vp[i].revents = POLLHUP;
3820                       break;
3821
3822                     default:
3823                       vp[i].revents = POLLERR;
3824                       break;
3825                     }
3826                   num_ev++;
3827                 }
3828             }
3829
3830           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3831             {
3832               vp[i].revents = POLLNVAL;
3833               num_ev++;
3834             }
3835         }
3836       if (wait_for_time != -1)
3837         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3838     }
3839   while ((num_ev == 0) && keep_trying);
3840
3841   return num_ev;
3842 }
3843
3844 int
3845 vppcom_mq_epoll_fd (void)
3846 {
3847   vcl_worker_t *wrk = vcl_worker_get_current ();
3848   return wrk->mqs_epfd;
3849 }
3850
3851 int
3852 vppcom_session_index (vcl_session_handle_t session_handle)
3853 {
3854   return session_handle & 0xFFFFFF;
3855 }
3856
3857 int
3858 vppcom_session_worker (vcl_session_handle_t session_handle)
3859 {
3860   return session_handle >> 24;
3861 }
3862
3863 int
3864 vppcom_worker_register (void)
3865 {
3866   vcl_worker_t *wrk;
3867   u8 *wrk_name = 0;
3868   int rv;
3869
3870   if (!vcl_worker_alloc_and_init ())
3871     return VPPCOM_EEXIST;
3872
3873   wrk = vcl_worker_get_current ();
3874   wrk_name = format (0, "%s-wrk-%u", vcm->app_name, wrk->wrk_index);
3875
3876   rv = vppcom_connect_to_vpp ((char *) wrk_name);
3877   vec_free (wrk_name);
3878
3879   if (rv)
3880     return VPPCOM_EFAULT;
3881
3882   if (vcl_worker_register_with_vpp ())
3883     return VPPCOM_EEXIST;
3884
3885   return VPPCOM_OK;
3886 }
3887
3888 void
3889 vppcom_worker_unregister (void)
3890 {
3891   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
3892   vcl_set_worker_index (~0);
3893 }
3894
3895 void
3896 vppcom_worker_index_set (int index)
3897 {
3898   vcl_set_worker_index (index);
3899 }
3900
3901 int
3902 vppcom_worker_index (void)
3903 {
3904   return vcl_get_worker_index ();
3905 }
3906
3907 int
3908 vppcom_worker_mqs_epfd (void)
3909 {
3910   vcl_worker_t *wrk = vcl_worker_get_current ();
3911   if (!vcm->cfg.use_mq_eventfd)
3912     return -1;
3913   return wrk->mqs_epfd;
3914 }
3915
3916 int
3917 vppcom_session_is_connectable_listener (uint32_t session_handle)
3918 {
3919   vcl_session_t *session;
3920   vcl_worker_t *wrk = vcl_worker_get_current ();
3921   session = vcl_session_get_w_handle (wrk, session_handle);
3922   if (!session)
3923     return VPPCOM_EBADFD;
3924   return vcl_session_is_connectable_listener (wrk, session);
3925 }
3926
3927 int
3928 vppcom_session_listener (uint32_t session_handle)
3929 {
3930   vcl_worker_t *wrk = vcl_worker_get_current ();
3931   vcl_session_t *listen_session, *session;
3932   session = vcl_session_get_w_handle (wrk, session_handle);
3933   if (!session)
3934     return VPPCOM_EBADFD;
3935   if (session->listener_index == VCL_INVALID_SESSION_INDEX)
3936     return VPPCOM_EBADFD;
3937   listen_session = vcl_session_get_w_handle (wrk, session->listener_index);
3938   if (!listen_session)
3939     return VPPCOM_EBADFD;
3940   return vcl_session_handle (listen_session);
3941 }
3942
3943 int
3944 vppcom_session_n_accepted (uint32_t session_handle)
3945 {
3946   vcl_worker_t *wrk = vcl_worker_get_current ();
3947   vcl_session_t *session = vcl_session_get_w_handle (wrk, session_handle);
3948   if (!session)
3949     return VPPCOM_EBADFD;
3950   return session->n_accepted_sessions;
3951 }
3952
3953 const char *
3954 vppcom_proto_str (vppcom_proto_t proto)
3955 {
3956   char const *proto_str;
3957
3958   switch (proto)
3959     {
3960     case VPPCOM_PROTO_TCP:
3961       proto_str = "TCP";
3962       break;
3963     case VPPCOM_PROTO_UDP:
3964       proto_str = "UDP";
3965       break;
3966     case VPPCOM_PROTO_TLS:
3967       proto_str = "TLS";
3968       break;
3969     case VPPCOM_PROTO_QUIC:
3970       proto_str = "QUIC";
3971       break;
3972     default:
3973       proto_str = "UNKNOWN";
3974       break;
3975     }
3976   return proto_str;
3977 }
3978
3979 const char *
3980 vppcom_retval_str (int retval)
3981 {
3982   char const *st;
3983
3984   switch (retval)
3985     {
3986     case VPPCOM_OK:
3987       st = "VPPCOM_OK";
3988       break;
3989
3990     case VPPCOM_EAGAIN:
3991       st = "VPPCOM_EAGAIN";
3992       break;
3993
3994     case VPPCOM_EFAULT:
3995       st = "VPPCOM_EFAULT";
3996       break;
3997
3998     case VPPCOM_ENOMEM:
3999       st = "VPPCOM_ENOMEM";
4000       break;
4001
4002     case VPPCOM_EINVAL:
4003       st = "VPPCOM_EINVAL";
4004       break;
4005
4006     case VPPCOM_EBADFD:
4007       st = "VPPCOM_EBADFD";
4008       break;
4009
4010     case VPPCOM_EAFNOSUPPORT:
4011       st = "VPPCOM_EAFNOSUPPORT";
4012       break;
4013
4014     case VPPCOM_ECONNABORTED:
4015       st = "VPPCOM_ECONNABORTED";
4016       break;
4017
4018     case VPPCOM_ECONNRESET:
4019       st = "VPPCOM_ECONNRESET";
4020       break;
4021
4022     case VPPCOM_ENOTCONN:
4023       st = "VPPCOM_ENOTCONN";
4024       break;
4025
4026     case VPPCOM_ECONNREFUSED:
4027       st = "VPPCOM_ECONNREFUSED";
4028       break;
4029
4030     case VPPCOM_ETIMEDOUT:
4031       st = "VPPCOM_ETIMEDOUT";
4032       break;
4033
4034     default:
4035       st = "UNKNOWN_STATE";
4036       break;
4037     }
4038
4039   return st;
4040 }
4041
4042 /*
4043  * fd.io coding-style-patch-verification: ON
4044  *
4045  * Local Variables:
4046  * eval: (c-set-style "gnu")
4047  * End:
4048  */