vcl: fix listener session close problem
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <vcl/vppcom.h>
19 #include <vcl/vcl_debug.h>
20 #include <vcl/vcl_private.h>
21 #include <svm/fifo_segment.h>
22
23 __thread uword __vcl_worker_index = ~0;
24
25 static int
26 vcl_segment_is_not_mounted (vcl_worker_t * wrk, u64 segment_handle)
27 {
28   u32 segment_index;
29
30   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
31     return 0;
32
33   segment_index = vcl_segment_table_lookup (segment_handle);
34   if (segment_index != VCL_INVALID_SEGMENT_INDEX)
35     return 0;
36
37   return 1;
38 }
39
40 static inline int
41 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
42 {
43   svm_msg_q_msg_t *msg;
44   u32 n_msgs;
45   int i;
46
47   n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
48   for (i = 0; i < n_msgs; i++)
49     {
50       vec_add2 (wrk->mq_msg_vector, msg, 1);
51       svm_msg_q_sub_w_lock (mq, msg);
52     }
53   return n_msgs;
54 }
55
56 const char *
57 vppcom_session_state_str (vcl_session_state_t state)
58 {
59   char *st;
60
61   switch (state)
62     {
63     case VCL_STATE_CLOSED:
64       st = "STATE_CLOSED";
65       break;
66     case VCL_STATE_LISTEN:
67       st = "STATE_LISTEN";
68       break;
69     case VCL_STATE_READY:
70       st = "STATE_READY";
71       break;
72     case VCL_STATE_VPP_CLOSING:
73       st = "STATE_VPP_CLOSING";
74       break;
75     case VCL_STATE_DISCONNECT:
76       st = "STATE_DISCONNECT";
77       break;
78     case VCL_STATE_DETACHED:
79       st = "STATE_DETACHED";
80       break;
81     case VCL_STATE_UPDATED:
82       st = "STATE_UPDATED";
83       break;
84     case VCL_STATE_LISTEN_NO_MQ:
85       st = "STATE_LISTEN_NO_MQ";
86       break;
87     default:
88       st = "UNKNOWN_STATE";
89       break;
90     }
91
92   return st;
93 }
94
95 u8 *
96 format_ip4_address (u8 * s, va_list * args)
97 {
98   u8 *a = va_arg (*args, u8 *);
99   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
100 }
101
102 u8 *
103 format_ip6_address (u8 * s, va_list * args)
104 {
105   ip6_address_t *a = va_arg (*args, ip6_address_t *);
106   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
107
108   i_max_n_zero = ARRAY_LEN (a->as_u16);
109   max_n_zeros = 0;
110   i_first_zero = i_max_n_zero;
111   n_zeros = 0;
112   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
113     {
114       u32 is_zero = a->as_u16[i] == 0;
115       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
116         {
117           i_first_zero = i;
118           n_zeros = 0;
119         }
120       n_zeros += is_zero;
121       if ((!is_zero && n_zeros > max_n_zeros)
122           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
123         {
124           i_max_n_zero = i_first_zero;
125           max_n_zeros = n_zeros;
126           i_first_zero = ARRAY_LEN (a->as_u16);
127           n_zeros = 0;
128         }
129     }
130
131   last_double_colon = 0;
132   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
133     {
134       if (i == i_max_n_zero && max_n_zeros > 1)
135         {
136           s = format (s, "::");
137           i += max_n_zeros - 1;
138           last_double_colon = 1;
139         }
140       else
141         {
142           s = format (s, "%s%x",
143                       (last_double_colon || i == 0) ? "" : ":",
144                       clib_net_to_host_u16 (a->as_u16[i]));
145           last_double_colon = 0;
146         }
147     }
148
149   return s;
150 }
151
152 /* Format an IP46 address. */
153 u8 *
154 format_ip46_address (u8 * s, va_list * args)
155 {
156   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
157   ip46_type_t type = va_arg (*args, ip46_type_t);
158   int is_ip4 = 1;
159
160   switch (type)
161     {
162     case IP46_TYPE_ANY:
163       is_ip4 = ip46_address_is_ip4 (ip46);
164       break;
165     case IP46_TYPE_IP4:
166       is_ip4 = 1;
167       break;
168     case IP46_TYPE_IP6:
169       is_ip4 = 0;
170       break;
171     }
172
173   return is_ip4 ?
174     format (s, "%U", format_ip4_address, &ip46->ip4) :
175     format (s, "%U", format_ip6_address, &ip46->ip6);
176 }
177
178 /*
179  * VPPCOM Utility Functions
180  */
181
182 static void
183 vcl_send_session_listen (vcl_worker_t * wrk, vcl_session_t * s)
184 {
185   app_session_evt_t _app_evt, *app_evt = &_app_evt;
186   session_listen_msg_t *mp;
187   svm_msg_q_t *mq;
188
189   mq = vcl_worker_ctrl_mq (wrk);
190   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_LISTEN);
191   mp = (session_listen_msg_t *) app_evt->evt->data;
192   memset (mp, 0, sizeof (*mp));
193   mp->client_index = wrk->api_client_handle;
194   mp->context = s->session_index;
195   mp->wrk_index = wrk->vpp_wrk_index;
196   mp->is_ip4 = s->transport.is_ip4;
197   clib_memcpy_fast (&mp->ip, &s->transport.lcl_ip, sizeof (mp->ip));
198   mp->port = s->transport.lcl_port;
199   mp->proto = s->session_type;
200   if (s->flags & VCL_SESSION_F_CONNECTED)
201     mp->flags = TRANSPORT_CFG_F_CONNECTED;
202   app_send_ctrl_evt_to_vpp (mq, app_evt);
203 }
204
205 static void
206 vcl_send_session_connect (vcl_worker_t * wrk, vcl_session_t * s)
207 {
208   app_session_evt_t _app_evt, *app_evt = &_app_evt;
209   session_connect_msg_t *mp;
210   svm_msg_q_t *mq;
211
212   mq = vcl_worker_ctrl_mq (wrk);
213   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_CONNECT);
214   mp = (session_connect_msg_t *) app_evt->evt->data;
215   memset (mp, 0, sizeof (*mp));
216   mp->client_index = wrk->api_client_handle;
217   mp->context = s->session_index;
218   mp->wrk_index = wrk->vpp_wrk_index;
219   mp->is_ip4 = s->transport.is_ip4;
220   mp->parent_handle = s->parent_handle;
221   clib_memcpy_fast (&mp->ip, &s->transport.rmt_ip, sizeof (mp->ip));
222   clib_memcpy_fast (&mp->lcl_ip, &s->transport.lcl_ip, sizeof (mp->lcl_ip));
223   mp->port = s->transport.rmt_port;
224   mp->lcl_port = s->transport.lcl_port;
225   mp->proto = s->session_type;
226   if (s->flags & VCL_SESSION_F_CONNECTED)
227     mp->flags |= TRANSPORT_CFG_F_CONNECTED;
228   app_send_ctrl_evt_to_vpp (mq, app_evt);
229 }
230
231 void
232 vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
233 {
234   app_session_evt_t _app_evt, *app_evt = &_app_evt;
235   session_unlisten_msg_t *mp;
236   svm_msg_q_t *mq;
237
238   mq = vcl_worker_ctrl_mq (wrk);
239   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_UNLISTEN);
240   mp = (session_unlisten_msg_t *) app_evt->evt->data;
241   memset (mp, 0, sizeof (*mp));
242   mp->client_index = wrk->api_client_handle;
243   mp->wrk_index = wrk->vpp_wrk_index;
244   mp->handle = s->vpp_handle;
245   mp->context = wrk->wrk_index;
246   app_send_ctrl_evt_to_vpp (mq, app_evt);
247 }
248
249 static void
250 vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s)
251 {
252   app_session_evt_t _app_evt, *app_evt = &_app_evt;
253   session_disconnect_msg_t *mp;
254   svm_msg_q_t *mq;
255
256   /* Send to thread that owns the session */
257   mq = s->vpp_evt_q;
258   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_DISCONNECT);
259   mp = (session_disconnect_msg_t *) app_evt->evt->data;
260   memset (mp, 0, sizeof (*mp));
261   mp->client_index = wrk->api_client_handle;
262   mp->handle = s->vpp_handle;
263   app_send_ctrl_evt_to_vpp (mq, app_evt);
264 }
265
266 static void
267 vcl_send_app_detach (vcl_worker_t * wrk)
268 {
269   app_session_evt_t _app_evt, *app_evt = &_app_evt;
270   session_app_detach_msg_t *mp;
271   svm_msg_q_t *mq;
272
273   mq = vcl_worker_ctrl_mq (wrk);
274   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_DETACH);
275   mp = (session_app_detach_msg_t *) app_evt->evt->data;
276   memset (mp, 0, sizeof (*mp));
277   mp->client_index = wrk->api_client_handle;
278   app_send_ctrl_evt_to_vpp (mq, app_evt);
279 }
280
281 static void
282 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
283                                  session_handle_t handle, int retval)
284 {
285   app_session_evt_t _app_evt, *app_evt = &_app_evt;
286   session_accepted_reply_msg_t *rmp;
287   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
288   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
289   rmp->handle = handle;
290   rmp->context = context;
291   rmp->retval = retval;
292   app_send_ctrl_evt_to_vpp (mq, app_evt);
293 }
294
295 static void
296 vcl_send_session_disconnected_reply (vcl_worker_t * wrk, vcl_session_t * s,
297                                      int retval)
298 {
299   app_session_evt_t _app_evt, *app_evt = &_app_evt;
300   session_disconnected_reply_msg_t *rmp;
301   app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
302                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
303   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
304   rmp->handle = s->vpp_handle;
305   rmp->context = wrk->api_client_handle;
306   rmp->retval = retval;
307   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
308 }
309
310 static void
311 vcl_send_session_reset_reply (vcl_worker_t * wrk, vcl_session_t * s,
312                               int retval)
313 {
314   app_session_evt_t _app_evt, *app_evt = &_app_evt;
315   session_reset_reply_msg_t *rmp;
316   app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
317                              SESSION_CTRL_EVT_RESET_REPLY);
318   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
319   rmp->handle = s->vpp_handle;
320   rmp->context = wrk->api_client_handle;
321   rmp->retval = retval;
322   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
323 }
324
325 void
326 vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
327                                 u32 wrk_index)
328 {
329   app_session_evt_t _app_evt, *app_evt = &_app_evt;
330   session_worker_update_msg_t *mp;
331
332   app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
333                              SESSION_CTRL_EVT_WORKER_UPDATE);
334   mp = (session_worker_update_msg_t *) app_evt->evt->data;
335   mp->client_index = wrk->api_client_handle;
336   mp->handle = s->vpp_handle;
337   mp->req_wrk_index = wrk->vpp_wrk_index;
338   mp->wrk_index = wrk_index;
339   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
340 }
341
342 int
343 vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len)
344 {
345   app_session_evt_t _app_evt, *app_evt = &_app_evt;
346   session_app_wrk_rpc_msg_t *mp;
347   vcl_worker_t *dst_wrk, *wrk;
348   svm_msg_q_t *mq;
349   int ret = -1;
350
351   if (data_len > sizeof (mp->data))
352     goto done;
353
354   clib_spinlock_lock (&vcm->workers_lock);
355
356   dst_wrk = vcl_worker_get_if_valid (dst_wrk_index);
357   if (!dst_wrk)
358     goto done;
359
360   wrk = vcl_worker_get_current ();
361   mq = vcl_worker_ctrl_mq (wrk);
362   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_WRK_RPC);
363   mp = (session_app_wrk_rpc_msg_t *) app_evt->evt->data;
364   mp->client_index = wrk->api_client_handle;
365   mp->wrk_index = dst_wrk->vpp_wrk_index;
366   clib_memcpy (mp->data, data, data_len);
367   app_send_ctrl_evt_to_vpp (mq, app_evt);
368   ret = 0;
369
370 done:
371   clib_spinlock_unlock (&vcm->workers_lock);
372   return ret;
373 }
374
375 static u32
376 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
377                               u32 ls_index)
378 {
379   vcl_session_t *session, *listen_session;
380   svm_fifo_t *rx_fifo, *tx_fifo;
381   svm_msg_q_t *evt_q;
382
383   session = vcl_session_alloc (wrk);
384
385   listen_session = vcl_session_get (wrk, ls_index);
386   if (listen_session->vpp_handle != mp->listener_handle)
387     {
388       VDBG (0, "ERROR: listener handle %lu does not match session %u",
389             mp->listener_handle, ls_index);
390       goto error;
391     }
392
393   if (vcl_segment_is_not_mounted (wrk, mp->segment_handle))
394     {
395       VDBG (0, "ERROR: segment for session %u is not mounted!",
396             session->session_index);
397       goto error;
398     }
399
400   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
401   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
402   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
403                                          svm_msg_q_t *);
404   rx_fifo->client_session_index = session->session_index;
405   tx_fifo->client_session_index = session->session_index;
406   rx_fifo->client_thread_index = vcl_get_worker_index ();
407   tx_fifo->client_thread_index = vcl_get_worker_index ();
408
409   session->vpp_handle = mp->handle;
410   session->rx_fifo = rx_fifo;
411   session->tx_fifo = tx_fifo;
412
413   session->session_state = VCL_STATE_READY;
414   session->transport.rmt_port = mp->rmt.port;
415   session->transport.is_ip4 = mp->rmt.is_ip4;
416   clib_memcpy_fast (&session->transport.rmt_ip, &mp->rmt.ip,
417                     sizeof (ip46_address_t));
418
419   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
420   session->transport.lcl_port = listen_session->transport.lcl_port;
421   session->transport.lcl_ip = listen_session->transport.lcl_ip;
422   session->session_type = listen_session->session_type;
423   session->is_dgram = vcl_proto_is_dgram (session->session_type);
424   session->listener_index = listen_session->session_index;
425   listen_session->n_accepted_sessions++;
426
427   VDBG (1, "session %u [0x%llx]: client accept request from %s address %U"
428         " port %d queue %p!", session->session_index, mp->handle,
429         mp->rmt.is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->rmt.ip,
430         mp->rmt.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
431         clib_net_to_host_u16 (mp->rmt.port), session->vpp_evt_q);
432   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
433
434   vcl_send_session_accepted_reply (session->vpp_evt_q, mp->context,
435                                    session->vpp_handle, 0);
436
437   return session->session_index;
438
439 error:
440   evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
441   vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
442                                    VNET_API_ERROR_INVALID_ARGUMENT);
443   vcl_session_free (wrk, session);
444   return VCL_INVALID_SESSION_INDEX;
445 }
446
447 static u32
448 vcl_session_connected_handler (vcl_worker_t * wrk,
449                                session_connected_msg_t * mp)
450 {
451   svm_fifo_t *rx_fifo, *tx_fifo;
452   vcl_session_t *session = 0;
453   u32 session_index;
454
455   session_index = mp->context;
456   session = vcl_session_get (wrk, session_index);
457   if (!session)
458     {
459       VDBG (0, "ERROR: vpp handle 0x%llx has no session index (%u)!",
460             mp->handle, session_index);
461       return VCL_INVALID_SESSION_INDEX;
462     }
463   if (mp->retval)
464     {
465       VDBG (0, "ERROR: session index %u: connect failed! %U",
466             session_index, format_session_error, mp->retval);
467       session->session_state = VCL_STATE_DETACHED;
468       session->vpp_handle = mp->handle;
469       return session_index;
470     }
471
472   session->vpp_handle = mp->handle;
473   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
474                                          svm_msg_q_t *);
475   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
476   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
477   if (vcl_segment_is_not_mounted (wrk, mp->segment_handle))
478     {
479       VDBG (0, "segment for session %u is not mounted!",
480             session->session_index);
481       session->session_state = VCL_STATE_DETACHED;
482       vcl_send_session_disconnect (wrk, session);
483       return session_index;
484     }
485
486   rx_fifo->client_session_index = session_index;
487   tx_fifo->client_session_index = session_index;
488   rx_fifo->client_thread_index = vcl_get_worker_index ();
489   tx_fifo->client_thread_index = vcl_get_worker_index ();
490
491   if (mp->ct_rx_fifo)
492     {
493       session->ct_rx_fifo = uword_to_pointer (mp->ct_rx_fifo, svm_fifo_t *);
494       session->ct_tx_fifo = uword_to_pointer (mp->ct_tx_fifo, svm_fifo_t *);
495       if (vcl_segment_is_not_mounted (wrk, mp->ct_segment_handle))
496         {
497           VDBG (0, "ct segment for session %u is not mounted!",
498                 session->session_index);
499           session->session_state = VCL_STATE_DETACHED;
500           vcl_send_session_disconnect (wrk, session);
501           return session_index;
502         }
503     }
504
505   session->rx_fifo = rx_fifo;
506   session->tx_fifo = tx_fifo;
507   session->transport.is_ip4 = mp->lcl.is_ip4;
508   clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip,
509                     sizeof (session->transport.lcl_ip));
510   session->transport.lcl_port = mp->lcl.port;
511
512   /* Application closed session before connect reply */
513   if (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK)
514       && session->session_state == VCL_STATE_CLOSED)
515     vcl_send_session_disconnect (wrk, session);
516   else
517     session->session_state = VCL_STATE_READY;
518
519   /* Add it to lookup table */
520   vcl_session_table_add_vpp_handle (wrk, mp->handle, session_index);
521
522   VDBG (1, "session %u [0x%llx] connected! rx_fifo %p, refcnt %d, tx_fifo %p,"
523         " refcnt %d", session_index, mp->handle, session->rx_fifo,
524         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
525
526   return session_index;
527 }
528
529 static int
530 vcl_flag_accepted_session (vcl_session_t * session, u64 handle, u32 flags)
531 {
532   vcl_session_msg_t *accepted_msg;
533   int i;
534
535   for (i = 0; i < vec_len (session->accept_evts_fifo); i++)
536     {
537       accepted_msg = &session->accept_evts_fifo[i];
538       if (accepted_msg->accepted_msg.handle == handle)
539         {
540           accepted_msg->flags |= flags;
541           return 1;
542         }
543     }
544   return 0;
545 }
546
547 static u32
548 vcl_session_reset_handler (vcl_worker_t * wrk,
549                            session_reset_msg_t * reset_msg)
550 {
551   vcl_session_t *session;
552   u32 sid;
553
554   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
555   session = vcl_session_get (wrk, sid);
556   if (!session)
557     {
558       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
559       return VCL_INVALID_SESSION_INDEX;
560     }
561
562   /* Caught a reset before actually accepting the session */
563   if (session->session_state == VCL_STATE_LISTEN)
564     {
565
566       if (!vcl_flag_accepted_session (session, reset_msg->handle,
567                                       VCL_ACCEPTED_F_RESET))
568         VDBG (0, "session was not accepted!");
569       return VCL_INVALID_SESSION_INDEX;
570     }
571
572   if (session->session_state != VCL_STATE_CLOSED)
573     session->session_state = VCL_STATE_DISCONNECT;
574   VDBG (0, "reset session %u [0x%llx]", sid, reset_msg->handle);
575   return sid;
576 }
577
578 static u32
579 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
580 {
581   vcl_session_t *session;
582   u32 sid = mp->context;
583
584   session = vcl_session_get (wrk, sid);
585   if (mp->retval)
586     {
587       VERR ("session %u [0x%llx]: bind failed: %U", sid, mp->handle,
588             format_session_error, mp->retval);
589       if (session)
590         {
591           session->session_state = VCL_STATE_DETACHED;
592           session->vpp_handle = mp->handle;
593           return sid;
594         }
595       else
596         {
597           VDBG (0, "ERROR: session %u [0x%llx]: Invalid session index!",
598                 sid, mp->handle);
599           return VCL_INVALID_SESSION_INDEX;
600         }
601     }
602
603   session->vpp_handle = mp->handle;
604   session->transport.is_ip4 = mp->lcl_is_ip4;
605   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
606                     sizeof (ip46_address_t));
607   session->transport.lcl_port = mp->lcl_port;
608   vcl_session_table_add_listener (wrk, mp->handle, sid);
609   session->session_state = VCL_STATE_LISTEN;
610   session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
611
612   if (vcl_session_is_cl (session))
613     {
614       svm_fifo_t *rx_fifo, *tx_fifo;
615       session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
616       rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
617       rx_fifo->client_session_index = sid;
618       tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
619       tx_fifo->client_session_index = sid;
620       session->rx_fifo = rx_fifo;
621       session->tx_fifo = tx_fifo;
622     }
623
624   VDBG (0, "session %u [0x%llx]: listen succeeded!", sid, mp->handle);
625   return sid;
626 }
627
628 static void
629 vcl_session_unlisten_reply_handler (vcl_worker_t * wrk, void *data)
630 {
631   session_unlisten_reply_msg_t *mp = (session_unlisten_reply_msg_t *) data;
632   vcl_session_t *s;
633
634   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
635   if (!s)
636     {
637       VDBG (0, "Unlisten reply with wrong handle %llx", mp->handle);
638       return;
639     }
640   if (s->session_state != VCL_STATE_DISCONNECT)
641     {
642       /* Connected udp listener */
643       if (s->session_type == VPPCOM_PROTO_UDP
644           && s->session_state == VCL_STATE_CLOSED)
645         return;
646
647       VDBG (0, "Unlisten session in wrong state %llx", mp->handle);
648       return;
649     }
650
651   if (mp->retval)
652     VDBG (0, "ERROR: session %u [0xllx]: unlisten failed: %U",
653           s->session_index, mp->handle, format_session_error, mp->retval);
654
655   if (mp->context != wrk->wrk_index)
656     VDBG (0, "wrong context");
657
658   vcl_session_table_del_vpp_handle (wrk, mp->handle);
659   vcl_session_free (wrk, s);
660 }
661
662 static void
663 vcl_session_migrated_handler (vcl_worker_t * wrk, void *data)
664 {
665   session_migrated_msg_t *mp = (session_migrated_msg_t *) data;
666   vcl_session_t *s;
667
668   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
669   if (!s)
670     {
671       VDBG (0, "Migrated notification with wrong handle %llx", mp->handle);
672       return;
673     }
674
675   s->vpp_handle = mp->new_handle;
676   s->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
677
678   vcl_session_table_del_vpp_handle (wrk, mp->handle);
679   vcl_session_table_add_vpp_handle (wrk, mp->new_handle, s->session_index);
680
681   /* Generate new tx event if we have outstanding data */
682   if (svm_fifo_has_event (s->tx_fifo))
683     app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo->master_session_index,
684                             SESSION_IO_EVT_TX, SVM_Q_WAIT);
685
686   VDBG (0, "Migrated 0x%lx to thread %u 0x%lx", mp->handle,
687         mp->vpp_thread_index, mp->new_handle);
688 }
689
690 static vcl_session_t *
691 vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg)
692 {
693   vcl_session_msg_t *vcl_msg;
694   vcl_session_t *session;
695
696   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
697   if (PREDICT_FALSE (session != 0))
698     VWRN ("session overlap handle %lu state %u!", msg->handle,
699           session->session_state);
700
701   session = vcl_session_table_lookup_listener (wrk, msg->listener_handle);
702   if (!session)
703     {
704       VERR ("couldn't find listen session: listener handle %llx",
705             msg->listener_handle);
706       return 0;
707     }
708
709   clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
710   vcl_msg->flags = 0;
711   vcl_msg->accepted_msg = *msg;
712   /* Session handle points to listener until fully accepted by app */
713   vcl_session_table_add_vpp_handle (wrk, msg->handle, session->session_index);
714
715   return session;
716 }
717
718 static vcl_session_t *
719 vcl_session_disconnected_handler (vcl_worker_t * wrk,
720                                   session_disconnected_msg_t * msg)
721 {
722   vcl_session_t *session;
723
724   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
725   if (!session)
726     {
727       VDBG (0, "request to disconnect unknown handle 0x%llx", msg->handle);
728       return 0;
729     }
730
731   /* Late disconnect notification on a session that has been closed */
732   if (session->session_state == VCL_STATE_CLOSED)
733     return 0;
734
735   /* Caught a disconnect before actually accepting the session */
736   if (session->session_state == VCL_STATE_LISTEN)
737     {
738       if (!vcl_flag_accepted_session (session, msg->handle,
739                                       VCL_ACCEPTED_F_CLOSED))
740         VDBG (0, "session was not accepted!");
741       return 0;
742     }
743
744   /* If not already reset change state */
745   if (session->session_state != VCL_STATE_DISCONNECT)
746     session->session_state = VCL_STATE_VPP_CLOSING;
747
748   return session;
749 }
750
751 static int
752 vppcom_session_disconnect (u32 session_handle)
753 {
754   vcl_worker_t *wrk = vcl_worker_get_current ();
755   vcl_session_t *session, *listen_session;
756   vcl_session_state_t state;
757   u64 vpp_handle;
758
759   session = vcl_session_get_w_handle (wrk, session_handle);
760   if (!session)
761     return VPPCOM_EBADFD;
762
763   vpp_handle = session->vpp_handle;
764   state = session->session_state;
765
766   VDBG (1, "session %u [0x%llx] state 0x%x (%s)", session->session_index,
767         vpp_handle, state, vppcom_session_state_str (state));
768
769   if (PREDICT_FALSE (state == VCL_STATE_LISTEN))
770     {
771       VDBG (0, "ERROR: Cannot disconnect a listen socket!");
772       return VPPCOM_EBADFD;
773     }
774
775   if (state == VCL_STATE_VPP_CLOSING)
776     {
777       vcl_send_session_disconnected_reply (wrk, session, 0);
778       VDBG (1, "session %u [0x%llx]: sending disconnect REPLY...",
779             session->session_index, vpp_handle);
780     }
781   else
782     {
783       /* Session doesn't have an event queue yet. Probably a non-blocking
784        * connect. Wait for the reply */
785       if (PREDICT_FALSE (!session->vpp_evt_q))
786         return VPPCOM_OK;
787
788       VDBG (1, "session %u [0x%llx]: sending disconnect...",
789             session->session_index, vpp_handle);
790       vcl_send_session_disconnect (wrk, session);
791     }
792
793   if (session->listener_index != VCL_INVALID_SESSION_INDEX)
794     {
795       listen_session = vcl_session_get (wrk, session->listener_index);
796       listen_session->n_accepted_sessions--;
797     }
798
799   return VPPCOM_OK;
800 }
801
802 static void
803 vcl_session_cleanup_handler (vcl_worker_t * wrk, void *data)
804 {
805   session_cleanup_msg_t *msg;
806   vcl_session_t *session;
807
808   msg = (session_cleanup_msg_t *) data;
809   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
810   if (!session)
811     {
812       VDBG (0, "disconnect confirmed for unknown handle 0x%llx", msg->handle);
813       return;
814     }
815
816   if (msg->type == SESSION_CLEANUP_TRANSPORT)
817     {
818       /* Transport was cleaned up before we confirmed close. Probably the
819        * app is still waiting for some data that cannot be delivered.
820        * Confirm close to make sure everything is cleaned up.
821        * Move to undetermined state to ensure that the session is not
822        * removed before both vpp and the app cleanup.
823        * - If the app closes first, the session is moved to CLOSED state
824        *   and the session cleanup notification from vpp removes the
825        *   session.
826        * - If vpp cleans up the session first, the session is moved to
827        *   DETACHED state lower and subsequently the close from the app
828        *   frees the session
829        */
830       if (session->session_state == VCL_STATE_VPP_CLOSING)
831         {
832           vppcom_session_disconnect (vcl_session_handle (session));
833           session->session_state = VCL_STATE_UPDATED;
834         }
835       else if (session->session_state == VCL_STATE_DISCONNECT)
836         {
837           vcl_send_session_reset_reply (wrk, session, 0);
838           session->session_state = VCL_STATE_UPDATED;
839         }
840       return;
841     }
842
843   vcl_session_table_del_vpp_handle (wrk, msg->handle);
844   /* Should not happen. App did not close the connection so don't free it. */
845   if (session->session_state != VCL_STATE_CLOSED)
846     {
847       VDBG (0, "app did not close session %d", session->session_index);
848       session->session_state = VCL_STATE_DETACHED;
849       session->vpp_handle = VCL_INVALID_SESSION_HANDLE;
850       return;
851     }
852   vcl_session_free (wrk, session);
853 }
854
855 static void
856 vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
857 {
858   session_req_worker_update_msg_t *msg;
859   vcl_session_t *s;
860
861   msg = (session_req_worker_update_msg_t *) data;
862   s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
863   if (!s)
864     return;
865
866   vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
867 }
868
869 static void
870 vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
871 {
872   session_worker_update_reply_msg_t *msg;
873   vcl_session_t *s;
874
875   msg = (session_worker_update_reply_msg_t *) data;
876   s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
877   if (!s)
878     {
879       VDBG (0, "unknown handle 0x%llx", msg->handle);
880       return;
881     }
882   if (vcl_segment_is_not_mounted (wrk, msg->segment_handle))
883     {
884       clib_warning ("segment for session %u is not mounted!",
885                     s->session_index);
886       return;
887     }
888
889   if (s->rx_fifo)
890     {
891       s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
892       s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
893       s->rx_fifo->client_session_index = s->session_index;
894       s->tx_fifo->client_session_index = s->session_index;
895       s->rx_fifo->client_thread_index = wrk->wrk_index;
896       s->tx_fifo->client_thread_index = wrk->wrk_index;
897     }
898   s->session_state = VCL_STATE_UPDATED;
899
900   VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
901         s->vpp_handle, wrk->wrk_index);
902 }
903
904 static int
905 vcl_api_recv_fd (vcl_worker_t * wrk, int *fds, int n_fds)
906 {
907
908   if (vcm->cfg.vpp_app_socket_api)
909     return vcl_sapi_recv_fds (wrk, fds, n_fds);
910
911   return vcl_bapi_recv_fds (wrk, fds, n_fds);
912 }
913
914 static void
915 vcl_session_app_add_segment_handler (vcl_worker_t * wrk, void *data)
916 {
917   ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM;
918   session_app_add_segment_msg_t *msg;
919   u64 segment_handle;
920   int fd = -1;
921
922   msg = (session_app_add_segment_msg_t *) data;
923
924   if (msg->fd_flags)
925     {
926       vcl_api_recv_fd (wrk, &fd, 1);
927       seg_type = SSVM_SEGMENT_MEMFD;
928     }
929
930   segment_handle = msg->segment_handle;
931   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
932     {
933       clib_warning ("invalid segment handle");
934       return;
935     }
936
937   if (vcl_segment_attach (segment_handle, (char *) msg->segment_name,
938                           seg_type, fd))
939     {
940       VDBG (0, "vcl_segment_attach ('%s') failed", msg->segment_name);
941       return;
942     }
943
944   VDBG (1, "mapped new segment '%s' size %d", msg->segment_name,
945         msg->segment_size);
946 }
947
948 static void
949 vcl_session_app_del_segment_handler (vcl_worker_t * wrk, void *data)
950 {
951   session_app_del_segment_msg_t *msg = (session_app_del_segment_msg_t *) data;
952   vcl_segment_detach (msg->segment_handle);
953   VDBG (1, "Unmapped segment: %d", msg->segment_handle);
954 }
955
956 static void
957 vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data)
958 {
959   if (!vcm->wrk_rpc_fn)
960     return;
961
962   (vcm->wrk_rpc_fn) (((session_app_wrk_rpc_msg_t *) data)->data);
963 }
964
965 static int
966 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
967 {
968   session_disconnected_msg_t *disconnected_msg;
969   session_connected_msg_t *connected_msg;
970   session_reset_msg_t *reset_msg;
971   session_event_t *ecpy;
972   vcl_session_t *s;
973   u32 sid;
974
975   switch (e->event_type)
976     {
977     case SESSION_IO_EVT_RX:
978     case SESSION_IO_EVT_TX:
979       s = vcl_session_get (wrk, e->session_index);
980       if (!s || !vcl_session_is_open (s))
981         break;
982       vec_add1 (wrk->unhandled_evts_vector, *e);
983       break;
984     case SESSION_CTRL_EVT_BOUND:
985       /* We can only wait for only one listen so not postponed */
986       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
987       break;
988     case SESSION_CTRL_EVT_ACCEPTED:
989       s = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
990       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
991         {
992           vec_add2 (wrk->unhandled_evts_vector, ecpy, 1);
993           *ecpy = *e;
994           ecpy->postponed = 1;
995           ecpy->session_index = s->session_index;
996         }
997       break;
998     case SESSION_CTRL_EVT_CONNECTED:
999       connected_msg = (session_connected_msg_t *) e->data;
1000       sid = vcl_session_connected_handler (wrk, connected_msg);
1001       if (!(s = vcl_session_get (wrk, sid)))
1002         break;
1003       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
1004         {
1005           vec_add2 (wrk->unhandled_evts_vector, ecpy, 1);
1006           *ecpy = *e;
1007           ecpy->postponed = 1;
1008           ecpy->session_index = s->session_index;
1009         }
1010       break;
1011     case SESSION_CTRL_EVT_DISCONNECTED:
1012       disconnected_msg = (session_disconnected_msg_t *) e->data;
1013       if (!(s = vcl_session_get_w_vpp_handle (wrk, disconnected_msg->handle)))
1014         break;
1015       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
1016         {
1017           vec_add1 (wrk->unhandled_evts_vector, *e);
1018           break;
1019         }
1020       if (!(s = vcl_session_disconnected_handler (wrk, disconnected_msg)))
1021         break;
1022       VDBG (0, "disconnected session %u [0x%llx]", s->session_index,
1023             s->vpp_handle);
1024       break;
1025     case SESSION_CTRL_EVT_RESET:
1026       reset_msg = (session_reset_msg_t *) e->data;
1027       if (!(s = vcl_session_get_w_vpp_handle (wrk, reset_msg->handle)))
1028         break;
1029       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
1030         {
1031           vec_add1 (wrk->unhandled_evts_vector, *e);
1032           break;
1033         }
1034       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
1035       break;
1036     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
1037       vcl_session_unlisten_reply_handler (wrk, e->data);
1038       break;
1039     case SESSION_CTRL_EVT_MIGRATED:
1040       vcl_session_migrated_handler (wrk, e->data);
1041       break;
1042     case SESSION_CTRL_EVT_CLEANUP:
1043       vcl_session_cleanup_handler (wrk, e->data);
1044       break;
1045     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
1046       vcl_session_req_worker_update_handler (wrk, e->data);
1047       break;
1048     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
1049       vcl_session_worker_update_reply_handler (wrk, e->data);
1050       break;
1051     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
1052       vcl_session_app_add_segment_handler (wrk, e->data);
1053       break;
1054     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
1055       vcl_session_app_del_segment_handler (wrk, e->data);
1056       break;
1057     case SESSION_CTRL_EVT_APP_WRK_RPC:
1058       vcl_worker_rpc_handler (wrk, e->data);
1059       break;
1060     default:
1061       clib_warning ("unhandled %u", e->event_type);
1062     }
1063   return VPPCOM_OK;
1064 }
1065
1066 static int
1067 vppcom_wait_for_session_state_change (u32 session_index,
1068                                       vcl_session_state_t state,
1069                                       f64 wait_for_time)
1070 {
1071   vcl_worker_t *wrk = vcl_worker_get_current ();
1072   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
1073   vcl_session_t *volatile session;
1074   svm_msg_q_msg_t msg;
1075   session_event_t *e;
1076
1077   do
1078     {
1079       session = vcl_session_get (wrk, session_index);
1080       if (PREDICT_FALSE (!session))
1081         {
1082           return VPPCOM_EBADFD;
1083         }
1084       if (session->session_state == state)
1085         {
1086           return VPPCOM_OK;
1087         }
1088       if (session->session_state == VCL_STATE_DETACHED)
1089         {
1090           return VPPCOM_ECONNREFUSED;
1091         }
1092
1093       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
1094         {
1095           usleep (100);
1096           continue;
1097         }
1098       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1099       vcl_handle_mq_event (wrk, e);
1100       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1101     }
1102   while (clib_time_now (&wrk->clib_time) < timeout);
1103
1104   VDBG (0, "timeout waiting for state 0x%x (%s)", state,
1105         vppcom_session_state_str (state));
1106   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
1107
1108   return VPPCOM_ETIMEDOUT;
1109 }
1110
1111 static void
1112 vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
1113 {
1114   vcl_session_state_t state;
1115   vcl_session_t *s;
1116   u32 *sip;
1117
1118   if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
1119     return;
1120
1121   vec_foreach (sip, wrk->pending_session_wrk_updates)
1122   {
1123     s = vcl_session_get (wrk, *sip);
1124     vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
1125     state = s->session_state;
1126     vppcom_wait_for_session_state_change (s->session_index, VCL_STATE_UPDATED,
1127                                           5);
1128     s->session_state = state;
1129   }
1130   vec_reset_length (wrk->pending_session_wrk_updates);
1131 }
1132
1133 void
1134 vcl_flush_mq_events (void)
1135 {
1136   vcl_worker_t *wrk = vcl_worker_get_current ();
1137   svm_msg_q_msg_t *msg;
1138   session_event_t *e;
1139   svm_msg_q_t *mq;
1140   int i;
1141
1142   mq = wrk->app_event_queue;
1143   svm_msg_q_lock (mq);
1144   vcl_mq_dequeue_batch (wrk, mq, ~0);
1145   svm_msg_q_unlock (mq);
1146
1147   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
1148     {
1149       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
1150       e = svm_msg_q_msg_data (mq, msg);
1151       vcl_handle_mq_event (wrk, e);
1152       svm_msg_q_free_msg (mq, msg);
1153     }
1154   vec_reset_length (wrk->mq_msg_vector);
1155   vcl_handle_pending_wrk_updates (wrk);
1156 }
1157
1158 static int
1159 vppcom_session_unbind (u32 session_handle)
1160 {
1161   vcl_worker_t *wrk = vcl_worker_get_current ();
1162   session_accepted_msg_t *accepted_msg;
1163   vcl_session_t *session = 0;
1164   vcl_session_msg_t *evt;
1165
1166   session = vcl_session_get_w_handle (wrk, session_handle);
1167   if (!session)
1168     return VPPCOM_EBADFD;
1169
1170   /* Flush pending accept events, if any */
1171   while (clib_fifo_elts (session->accept_evts_fifo))
1172     {
1173       clib_fifo_sub2 (session->accept_evts_fifo, evt);
1174       accepted_msg = &evt->accepted_msg;
1175       vcl_session_table_del_vpp_handle (wrk, accepted_msg->handle);
1176       vcl_send_session_accepted_reply (session->vpp_evt_q,
1177                                        accepted_msg->context,
1178                                        accepted_msg->handle, -1);
1179     }
1180   clib_fifo_free (session->accept_evts_fifo);
1181
1182   vcl_send_session_unlisten (wrk, session);
1183
1184   VDBG (1, "session %u [0x%llx]: sending unbind!", session->session_index,
1185         session->vpp_handle);
1186   vcl_evt (VCL_EVT_UNBIND, session);
1187
1188   session->vpp_handle = ~0;
1189   session->session_state = VCL_STATE_DISCONNECT;
1190
1191   return VPPCOM_OK;
1192 }
1193
1194 /**
1195  * Handle app exit
1196  *
1197  * Notify vpp of the disconnect and mark the worker as free. If we're the
1198  * last worker, do a full cleanup otherwise, since we're probably a forked
1199  * child, avoid syscalls as much as possible. We might've lost privileges.
1200  */
1201 void
1202 vppcom_app_exit (void)
1203 {
1204   if (!pool_elts (vcm->workers))
1205     return;
1206   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
1207   vcl_set_worker_index (~0);
1208   vcl_elog_stop (vcm);
1209 }
1210
1211 static int
1212 vcl_api_attach (void)
1213 {
1214   if (vcm->cfg.vpp_app_socket_api)
1215     return vcl_sapi_attach ();
1216
1217   return vcl_bapi_attach ();
1218 }
1219
1220 static void
1221 vcl_api_detach (vcl_worker_t * wrk)
1222 {
1223   vcl_send_app_detach (wrk);
1224
1225   if (vcm->cfg.vpp_app_socket_api)
1226     return vcl_sapi_detach (wrk);
1227
1228   return vcl_bapi_disconnect_from_vpp ();
1229 }
1230
1231 /*
1232  * VPPCOM Public API functions
1233  */
1234 int
1235 vppcom_app_create (const char *app_name)
1236 {
1237   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
1238   int rv;
1239
1240   if (vcm->is_init)
1241     {
1242       VDBG (1, "already initialized");
1243       return VPPCOM_EEXIST;
1244     }
1245
1246   vcm->is_init = 1;
1247   vppcom_cfg (&vcm->cfg);
1248   vcl_cfg = &vcm->cfg;
1249
1250   vcm->main_cpu = pthread_self ();
1251   vcm->main_pid = getpid ();
1252   vcm->app_name = format (0, "%s", app_name);
1253   fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
1254                           20 /* timeout in secs */ );
1255   pool_alloc (vcm->workers, vcl_cfg->max_workers);
1256   clib_spinlock_init (&vcm->workers_lock);
1257   clib_rwlock_init (&vcm->segment_table_lock);
1258   atexit (vppcom_app_exit);
1259   vcl_elog_init (vcm);
1260
1261   /* Allocate default worker */
1262   vcl_worker_alloc_and_init ();
1263
1264   if ((rv = vcl_api_attach ()))
1265     return rv;
1266
1267   VDBG (0, "app_name '%s', my_client_index %d (0x%x)", app_name,
1268         vcm->workers[0].api_client_handle, vcm->workers[0].api_client_handle);
1269
1270   return VPPCOM_OK;
1271 }
1272
1273 void
1274 vppcom_app_destroy (void)
1275 {
1276   vcl_worker_t *wrk, *current_wrk;
1277   void *heap;
1278
1279   if (!pool_elts (vcm->workers))
1280     return;
1281
1282   vcl_evt (VCL_EVT_DETACH, vcm);
1283
1284   current_wrk = vcl_worker_get_current ();
1285
1286   /* *INDENT-OFF* */
1287   pool_foreach (wrk, vcm->workers, ({
1288     if (current_wrk != wrk)
1289       vcl_worker_cleanup (wrk, 0 /* notify vpp */ );
1290   }));
1291   /* *INDENT-ON* */
1292
1293   vcl_api_detach (current_wrk);
1294   vcl_worker_cleanup (current_wrk, 0 /* notify vpp */ );
1295
1296   vcl_elog_stop (vcm);
1297
1298   /*
1299    * Free the heap and fix vcm
1300    */
1301   heap = clib_mem_get_heap ();
1302   munmap (clib_mem_get_heap_base (heap), clib_mem_get_heap_size (heap));
1303
1304   vcm = &_vppcom_main;
1305   vcm->is_init = 0;
1306 }
1307
1308 int
1309 vppcom_session_create (u8 proto, u8 is_nonblocking)
1310 {
1311   vcl_worker_t *wrk = vcl_worker_get_current ();
1312   vcl_session_t *session;
1313
1314   session = vcl_session_alloc (wrk);
1315
1316   session->session_type = proto;
1317   session->session_state = VCL_STATE_CLOSED;
1318   session->vpp_handle = ~0;
1319   session->is_dgram = vcl_proto_is_dgram (proto);
1320
1321   if (is_nonblocking)
1322     vcl_session_set_attr (session, VCL_SESS_ATTR_NONBLOCK);
1323
1324   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
1325            is_nonblocking, session_index);
1326
1327   VDBG (0, "created session %u", session->session_index);
1328
1329   return vcl_session_handle (session);
1330 }
1331
1332 int
1333 vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * s,
1334                      vcl_session_handle_t sh, u8 do_disconnect)
1335 {
1336   int rv = VPPCOM_OK;
1337
1338   VDBG (1, "session %u [0x%llx] closing", s->session_index, s->vpp_handle);
1339
1340   if (s->flags & VCL_SESSION_F_IS_VEP)
1341     {
1342       u32 next_sh = s->vep.next_sh;
1343       while (next_sh != ~0)
1344         {
1345           rv = vppcom_epoll_ctl (sh, EPOLL_CTL_DEL, next_sh, 0);
1346           if (PREDICT_FALSE (rv < 0))
1347             VDBG (0, "vpp handle 0x%llx, sh %u: EPOLL_CTL_DEL vep_idx %u"
1348                   " failed! rv %d (%s)", s->vpp_handle, next_sh,
1349                   s->vep.vep_sh, rv, vppcom_retval_str (rv));
1350           next_sh = s->vep.next_sh;
1351         }
1352       goto free_session;
1353     }
1354
1355   if (s->flags & VCL_SESSION_F_IS_VEP_SESSION)
1356     {
1357       rv = vppcom_epoll_ctl (s->vep.vep_sh, EPOLL_CTL_DEL, sh, 0);
1358       if (rv < 0)
1359         VDBG (0, "session %u [0x%llx]: EPOLL_CTL_DEL vep_idx %u "
1360               "failed! rv %d (%s)", s->session_index, s->vpp_handle,
1361               s->vep.vep_sh, rv, vppcom_retval_str (rv));
1362     }
1363
1364   if (!do_disconnect)
1365     {
1366       VDBG (1, "session %u [0x%llx] disconnect skipped",
1367             s->session_index, s->vpp_handle);
1368       goto cleanup;
1369     }
1370
1371   if (s->session_state == VCL_STATE_LISTEN)
1372     {
1373       rv = vppcom_session_unbind (sh);
1374       if (PREDICT_FALSE (rv < 0))
1375         VDBG (0, "session %u [0x%llx]: listener unbind failed! "
1376               "rv %d (%s)", s->session_index, s->vpp_handle, rv,
1377               vppcom_retval_str (rv));
1378       return rv;
1379     }
1380   else if (vcl_session_is_ready (s)
1381            || (vcl_session_is_connectable_listener (wrk, s)))
1382     {
1383       rv = vppcom_session_disconnect (sh);
1384       if (PREDICT_FALSE (rv < 0))
1385         VDBG (0, "ERROR: session %u [0x%llx]: disconnect failed!"
1386               " rv %d (%s)", s->session_index, s->vpp_handle,
1387               rv, vppcom_retval_str (rv));
1388     }
1389   else if (s->session_state == VCL_STATE_DISCONNECT)
1390     {
1391       vcl_send_session_reset_reply (wrk, s, 0);
1392     }
1393   else if (s->session_state == VCL_STATE_DETACHED)
1394     {
1395       /* Should not happen. VPP cleaned up before app confirmed close */
1396       VDBG (0, "vpp freed session %d before close", s->session_index);
1397       goto free_session;
1398     }
1399
1400   s->session_state = VCL_STATE_CLOSED;
1401
1402   /* Session is removed only after vpp confirms the disconnect */
1403   return rv;
1404
1405 cleanup:
1406   vcl_session_table_del_vpp_handle (wrk, s->vpp_handle);
1407 free_session:
1408   vcl_session_free (wrk, s);
1409   vcl_evt (VCL_EVT_CLOSE, s, rv);
1410
1411   return rv;
1412 }
1413
1414 int
1415 vppcom_session_close (uint32_t session_handle)
1416 {
1417   vcl_worker_t *wrk = vcl_worker_get_current ();
1418   vcl_session_t *session;
1419
1420   session = vcl_session_get_w_handle (wrk, session_handle);
1421   if (!session)
1422     return VPPCOM_EBADFD;
1423   return vcl_session_cleanup (wrk, session, session_handle,
1424                               1 /* do_disconnect */ );
1425 }
1426
1427 int
1428 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
1429 {
1430   vcl_worker_t *wrk = vcl_worker_get_current ();
1431   vcl_session_t *session = 0;
1432
1433   if (!ep || !ep->ip)
1434     return VPPCOM_EINVAL;
1435
1436   session = vcl_session_get_w_handle (wrk, session_handle);
1437   if (!session)
1438     return VPPCOM_EBADFD;
1439
1440   if (session->flags & VCL_SESSION_F_IS_VEP)
1441     {
1442       VDBG (0, "ERROR: cannot bind to epoll session %u!",
1443             session->session_index);
1444       return VPPCOM_EBADFD;
1445     }
1446
1447   session->transport.is_ip4 = ep->is_ip4;
1448   if (ep->is_ip4)
1449     clib_memcpy_fast (&session->transport.lcl_ip.ip4, ep->ip,
1450                       sizeof (ip4_address_t));
1451   else
1452     clib_memcpy_fast (&session->transport.lcl_ip.ip6, ep->ip,
1453                       sizeof (ip6_address_t));
1454   session->transport.lcl_port = ep->port;
1455
1456   VDBG (0, "session %u handle %u: binding to local %s address %U port %u, "
1457         "proto %s", session->session_index, session_handle,
1458         session->transport.is_ip4 ? "IPv4" : "IPv6",
1459         format_ip46_address, &session->transport.lcl_ip,
1460         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1461         clib_net_to_host_u16 (session->transport.lcl_port),
1462         vppcom_proto_str (session->session_type));
1463   vcl_evt (VCL_EVT_BIND, session);
1464
1465   if (session->session_type == VPPCOM_PROTO_UDP)
1466     vppcom_session_listen (session_handle, 10);
1467
1468   return VPPCOM_OK;
1469 }
1470
1471 int
1472 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
1473 {
1474   vcl_worker_t *wrk = vcl_worker_get_current ();
1475   vcl_session_t *listen_session = 0;
1476   u64 listen_vpp_handle;
1477   int rv;
1478
1479   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1480   if (!listen_session || (listen_session->flags & VCL_SESSION_F_IS_VEP))
1481     return VPPCOM_EBADFD;
1482
1483   if (q_len == 0 || q_len == ~0)
1484     q_len = vcm->cfg.listen_queue_size;
1485
1486   listen_vpp_handle = listen_session->vpp_handle;
1487   if (listen_session->session_state == VCL_STATE_LISTEN)
1488     {
1489       VDBG (0, "session %u [0x%llx]: already in listen state!",
1490             listen_sh, listen_vpp_handle);
1491       return VPPCOM_OK;
1492     }
1493
1494   VDBG (0, "session %u: sending vpp listen request...", listen_sh);
1495
1496   /*
1497    * Send listen request to vpp and wait for reply
1498    */
1499   vcl_send_session_listen (wrk, listen_session);
1500   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1501                                              VCL_STATE_LISTEN,
1502                                              vcm->cfg.session_timeout);
1503
1504   if (PREDICT_FALSE (rv))
1505     {
1506       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1507       VDBG (0, "session %u [0x%llx]: listen failed! returning %d (%s)",
1508             listen_sh, listen_session->vpp_handle, rv,
1509             vppcom_retval_str (rv));
1510       return rv;
1511     }
1512
1513   return VPPCOM_OK;
1514 }
1515
1516 static int
1517 validate_args_session_accept_ (vcl_worker_t * wrk, vcl_session_t * ls)
1518 {
1519   if (ls->flags & VCL_SESSION_F_IS_VEP)
1520     {
1521       VDBG (0, "ERROR: cannot accept on epoll session %u!",
1522             ls->session_index);
1523       return VPPCOM_EBADFD;
1524     }
1525
1526   if ((ls->session_state != VCL_STATE_LISTEN)
1527       && (!vcl_session_is_connectable_listener (wrk, ls)))
1528     {
1529       VDBG (0, "ERROR: session [0x%llx]: not in listen state! state 0x%x"
1530             " (%s)", ls->vpp_handle, ls->session_state,
1531             vppcom_session_state_str (ls->session_state));
1532       return VPPCOM_EBADFD;
1533     }
1534   return VPPCOM_OK;
1535 }
1536
1537 int
1538 vppcom_unformat_proto (uint8_t * proto, char *proto_str)
1539 {
1540   if (!strcmp (proto_str, "TCP"))
1541     *proto = VPPCOM_PROTO_TCP;
1542   else if (!strcmp (proto_str, "tcp"))
1543     *proto = VPPCOM_PROTO_TCP;
1544   else if (!strcmp (proto_str, "UDP"))
1545     *proto = VPPCOM_PROTO_UDP;
1546   else if (!strcmp (proto_str, "udp"))
1547     *proto = VPPCOM_PROTO_UDP;
1548   else if (!strcmp (proto_str, "TLS"))
1549     *proto = VPPCOM_PROTO_TLS;
1550   else if (!strcmp (proto_str, "tls"))
1551     *proto = VPPCOM_PROTO_TLS;
1552   else if (!strcmp (proto_str, "QUIC"))
1553     *proto = VPPCOM_PROTO_QUIC;
1554   else if (!strcmp (proto_str, "quic"))
1555     *proto = VPPCOM_PROTO_QUIC;
1556   else
1557     return 1;
1558   return 0;
1559 }
1560
1561 int
1562 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1563                        uint32_t flags)
1564 {
1565   u32 client_session_index = ~0, listen_session_index, accept_flags = 0;
1566   vcl_worker_t *wrk = vcl_worker_get_current ();
1567   session_accepted_msg_t accepted_msg;
1568   vcl_session_t *listen_session = 0;
1569   vcl_session_t *client_session = 0;
1570   vcl_session_msg_t *evt;
1571   svm_msg_q_msg_t msg;
1572   session_event_t *e;
1573   u8 is_nonblocking;
1574   int rv;
1575
1576   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1577   if (!listen_session)
1578     return VPPCOM_EBADFD;
1579
1580   listen_session_index = listen_session->session_index;
1581   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1582     return rv;
1583
1584   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1585     {
1586       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1587       accept_flags = evt->flags;
1588       accepted_msg = evt->accepted_msg;
1589       goto handle;
1590     }
1591
1592   is_nonblocking = vcl_session_has_attr (listen_session,
1593                                          VCL_SESS_ATTR_NONBLOCK);
1594   while (1)
1595     {
1596       if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1597         return VPPCOM_EAGAIN;
1598
1599       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1600         return VPPCOM_EAGAIN;
1601
1602       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1603       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1604         {
1605           vcl_handle_mq_event (wrk, e);
1606           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1607           continue;
1608         }
1609       clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
1610       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1611       break;
1612     }
1613
1614 handle:
1615
1616   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg,
1617                                                        listen_session_index);
1618   if (client_session_index == VCL_INVALID_SESSION_INDEX)
1619     return VPPCOM_ECONNABORTED;
1620
1621   listen_session = vcl_session_get (wrk, listen_session_index);
1622   client_session = vcl_session_get (wrk, client_session_index);
1623
1624   if (flags & O_NONBLOCK)
1625     vcl_session_set_attr (client_session, VCL_SESS_ATTR_NONBLOCK);
1626
1627   VDBG (1, "listener %u [0x%llx]: Got a connect request! session %u [0x%llx],"
1628         " flags %d, is_nonblocking %u", listen_session->session_index,
1629         listen_session->vpp_handle, client_session_index,
1630         client_session->vpp_handle, flags,
1631         vcl_session_has_attr (client_session, VCL_SESS_ATTR_NONBLOCK));
1632
1633   if (ep)
1634     {
1635       ep->is_ip4 = client_session->transport.is_ip4;
1636       ep->port = client_session->transport.rmt_port;
1637       if (client_session->transport.is_ip4)
1638         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip4,
1639                           sizeof (ip4_address_t));
1640       else
1641         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip6,
1642                           sizeof (ip6_address_t));
1643     }
1644
1645   VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
1646         "local: %U:%u", listen_session_handle, listen_session->vpp_handle,
1647         client_session_index, client_session->vpp_handle,
1648         format_ip46_address, &client_session->transport.rmt_ip,
1649         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1650         clib_net_to_host_u16 (client_session->transport.rmt_port),
1651         format_ip46_address, &client_session->transport.lcl_ip,
1652         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1653         clib_net_to_host_u16 (client_session->transport.lcl_port));
1654   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1655            client_session_index);
1656
1657   /*
1658    * Session might have been closed already
1659    */
1660   if (accept_flags)
1661     {
1662       if (accept_flags & VCL_ACCEPTED_F_CLOSED)
1663         client_session->session_state = VCL_STATE_VPP_CLOSING;
1664       else if (accept_flags & VCL_ACCEPTED_F_RESET)
1665         client_session->session_state = VCL_STATE_DISCONNECT;
1666     }
1667   return vcl_session_handle (client_session);
1668 }
1669
1670 int
1671 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1672 {
1673   vcl_worker_t *wrk = vcl_worker_get_current ();
1674   vcl_session_t *session = 0;
1675   u32 session_index;
1676   int rv;
1677
1678   session = vcl_session_get_w_handle (wrk, session_handle);
1679   if (!session)
1680     return VPPCOM_EBADFD;
1681   session_index = session->session_index;
1682
1683   if (PREDICT_FALSE (session->flags & VCL_SESSION_F_IS_VEP))
1684     {
1685       VDBG (0, "ERROR: cannot connect epoll session %u!",
1686             session->session_index);
1687       return VPPCOM_EBADFD;
1688     }
1689
1690   if (PREDICT_FALSE (vcl_session_is_ready (session)))
1691     {
1692       VDBG (0, "session handle %u [0x%llx]: session already "
1693             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1694             session_handle, session->vpp_handle,
1695             session->transport.is_ip4 ? "IPv4" : "IPv6", format_ip46_address,
1696             &session->transport.rmt_ip, session->transport.is_ip4 ?
1697             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1698             clib_net_to_host_u16 (session->transport.rmt_port),
1699             vppcom_proto_str (session->session_type), session->session_state,
1700             vppcom_session_state_str (session->session_state));
1701       return VPPCOM_OK;
1702     }
1703
1704   /* Attempt to connect a connectionless listener */
1705   if (PREDICT_FALSE (session->session_state == VCL_STATE_LISTEN))
1706     {
1707       if (session->session_type != VPPCOM_PROTO_UDP)
1708         return VPPCOM_EINVAL;
1709       vcl_send_session_unlisten (wrk, session);
1710       session->session_state = VCL_STATE_CLOSED;
1711     }
1712
1713   session->transport.is_ip4 = server_ep->is_ip4;
1714   vcl_ip_copy_from_ep (&session->transport.rmt_ip, server_ep);
1715   session->transport.rmt_port = server_ep->port;
1716   session->parent_handle = VCL_INVALID_SESSION_HANDLE;
1717   session->flags |= VCL_SESSION_F_CONNECTED;
1718
1719   VDBG (0, "session handle %u (%s): connecting to peer %s %U "
1720         "port %d proto %s", session_handle,
1721         vppcom_session_state_str (session->session_state),
1722         session->transport.is_ip4 ? "IPv4" : "IPv6",
1723         format_ip46_address,
1724         &session->transport.rmt_ip, session->transport.is_ip4 ?
1725         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1726         clib_net_to_host_u16 (session->transport.rmt_port),
1727         vppcom_proto_str (session->session_type));
1728
1729   vcl_send_session_connect (wrk, session);
1730
1731   if (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK))
1732     {
1733       /* State set to STATE_UPDATED to ensure the session is not assumed
1734        * to be ready and to also allow the app to close it prior to vpp's
1735        * connected reply. */
1736       session->session_state = VCL_STATE_UPDATED;
1737       return VPPCOM_EINPROGRESS;
1738     }
1739
1740   /*
1741    * Wait for reply from vpp if blocking
1742    */
1743   rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY,
1744                                              vcm->cfg.session_timeout);
1745
1746   session = vcl_session_get (wrk, session_index);
1747   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1748         session->vpp_handle, rv ? "failed" : "succeeded");
1749
1750   return rv;
1751 }
1752
1753 int
1754 vppcom_session_stream_connect (uint32_t session_handle,
1755                                uint32_t parent_session_handle)
1756 {
1757   vcl_worker_t *wrk = vcl_worker_get_current ();
1758   vcl_session_t *session, *parent_session;
1759   u32 session_index, parent_session_index;
1760   int rv;
1761
1762   session = vcl_session_get_w_handle (wrk, session_handle);
1763   if (!session)
1764     return VPPCOM_EBADFD;
1765   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1766   if (!parent_session)
1767     return VPPCOM_EBADFD;
1768
1769   session_index = session->session_index;
1770   parent_session_index = parent_session->session_index;
1771   if (PREDICT_FALSE (session->flags & VCL_SESSION_F_IS_VEP))
1772     {
1773       VDBG (0, "ERROR: cannot connect epoll session %u!",
1774             session->session_index);
1775       return VPPCOM_EBADFD;
1776     }
1777
1778   if (PREDICT_FALSE (vcl_session_is_ready (session)))
1779     {
1780       VDBG (0, "session handle %u [0x%llx]: session already "
1781             "connected to session %u [0x%llx] proto %s, state 0x%x (%s)",
1782             session_handle, session->vpp_handle,
1783             parent_session_handle, parent_session->vpp_handle,
1784             vppcom_proto_str (session->session_type), session->session_state,
1785             vppcom_session_state_str (session->session_state));
1786       return VPPCOM_OK;
1787     }
1788
1789   /* Connect to quic session specifics */
1790   session->transport.is_ip4 = parent_session->transport.is_ip4;
1791   session->transport.rmt_ip.ip4.as_u32 = (uint32_t) 1;
1792   session->transport.rmt_port = 0;
1793   session->parent_handle = parent_session->vpp_handle;
1794
1795   VDBG (0, "session handle %u: connecting to session %u [0x%llx]",
1796         session_handle, parent_session_handle, parent_session->vpp_handle);
1797
1798   /*
1799    * Send connect request and wait for reply from vpp
1800    */
1801   vcl_send_session_connect (wrk, session);
1802   rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY,
1803                                              vcm->cfg.session_timeout);
1804
1805   session->listener_index = parent_session_index;
1806   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1807   if (parent_session)
1808     parent_session->n_accepted_sessions++;
1809
1810   session = vcl_session_get (wrk, session_index);
1811   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1812         session->vpp_handle, rv ? "failed" : "succeeded");
1813
1814   return rv;
1815 }
1816
1817 static u8
1818 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1819 {
1820   return (e->event_type == SESSION_IO_EVT_RX && e->session_index == sid);
1821 }
1822
1823 static inline int
1824 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1825                               u8 peek)
1826 {
1827   vcl_worker_t *wrk = vcl_worker_get_current ();
1828   int rv, n_read = 0, is_nonblocking;
1829   vcl_session_t *s = 0;
1830   svm_fifo_t *rx_fifo;
1831   svm_msg_q_msg_t msg;
1832   session_event_t *e;
1833   svm_msg_q_t *mq;
1834   u8 is_ct;
1835
1836   if (PREDICT_FALSE (!buf))
1837     return VPPCOM_EINVAL;
1838
1839   s = vcl_session_get_w_handle (wrk, session_handle);
1840   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
1841     return VPPCOM_EBADFD;
1842
1843   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1844     {
1845       VDBG (0, "session %u[0x%llx] is not open! state 0x%x (%s)",
1846             s->session_index, s->vpp_handle, s->session_state,
1847             vppcom_session_state_str (s->session_state));
1848       return vcl_session_closed_error (s);
1849     }
1850
1851   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
1852   is_ct = vcl_session_is_ct (s);
1853   mq = wrk->app_event_queue;
1854   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
1855   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
1856
1857   if (svm_fifo_is_empty_cons (rx_fifo))
1858     {
1859       if (is_nonblocking)
1860         {
1861           if (vcl_session_is_closing (s))
1862             return vcl_session_closing_error (s);
1863           if (is_ct)
1864             svm_fifo_unset_event (s->rx_fifo);
1865           svm_fifo_unset_event (rx_fifo);
1866           return VPPCOM_EWOULDBLOCK;
1867         }
1868       while (svm_fifo_is_empty_cons (rx_fifo))
1869         {
1870           if (vcl_session_is_closing (s))
1871             return vcl_session_closing_error (s);
1872
1873           if (is_ct)
1874             svm_fifo_unset_event (s->rx_fifo);
1875           svm_fifo_unset_event (rx_fifo);
1876           svm_msg_q_lock (mq);
1877           if (svm_msg_q_is_empty (mq))
1878             svm_msg_q_wait (mq);
1879
1880           svm_msg_q_sub_w_lock (mq, &msg);
1881           e = svm_msg_q_msg_data (mq, &msg);
1882           svm_msg_q_unlock (mq);
1883           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1884             vcl_handle_mq_event (wrk, e);
1885           svm_msg_q_free_msg (mq, &msg);
1886         }
1887     }
1888
1889 read_again:
1890
1891   if (s->is_dgram)
1892     rv = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1893   else
1894     rv = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1895
1896   ASSERT (rv >= 0);
1897   n_read += rv;
1898
1899   if (svm_fifo_is_empty_cons (rx_fifo))
1900     {
1901       if (is_ct)
1902         svm_fifo_unset_event (s->rx_fifo);
1903       svm_fifo_unset_event (rx_fifo);
1904       if (!svm_fifo_is_empty_cons (rx_fifo)
1905           && svm_fifo_set_event (rx_fifo) && is_nonblocking)
1906         {
1907           vec_add2 (wrk->unhandled_evts_vector, e, 1);
1908           e->event_type = SESSION_IO_EVT_RX;
1909           e->session_index = s->session_index;
1910         }
1911     }
1912   else if (PREDICT_FALSE (rv < n && !s->is_dgram))
1913     {
1914       /* More data enqueued while reading. Try to drain it
1915        * or fill the buffer. Avoid doing that for dgrams */
1916       buf += rv;
1917       n -= rv;
1918       goto read_again;
1919     }
1920
1921   if (PREDICT_FALSE (svm_fifo_needs_deq_ntf (rx_fifo, n_read)))
1922     {
1923       svm_fifo_clear_deq_ntf (rx_fifo);
1924       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index,
1925                               SESSION_IO_EVT_RX, SVM_Q_WAIT);
1926     }
1927
1928   VDBG (2, "session %u[0x%llx]: read %d bytes from (%p)", s->session_index,
1929         s->vpp_handle, n_read, rx_fifo);
1930
1931   return n_read;
1932 }
1933
1934 int
1935 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1936 {
1937   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1938 }
1939
1940 static int
1941 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1942 {
1943   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1944 }
1945
1946 int
1947 vppcom_session_read_segments (uint32_t session_handle,
1948                               vppcom_data_segment_t * ds, uint32_t n_segments,
1949                               uint32_t max_bytes)
1950 {
1951   vcl_worker_t *wrk = vcl_worker_get_current ();
1952   int n_read = 0, is_nonblocking;
1953   vcl_session_t *s = 0;
1954   svm_fifo_t *rx_fifo;
1955   svm_msg_q_msg_t msg;
1956   session_event_t *e;
1957   svm_msg_q_t *mq;
1958   u8 is_ct;
1959
1960   s = vcl_session_get_w_handle (wrk, session_handle);
1961   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
1962     return VPPCOM_EBADFD;
1963
1964   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1965     return vcl_session_closed_error (s);
1966
1967   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
1968   is_ct = vcl_session_is_ct (s);
1969   mq = wrk->app_event_queue;
1970   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
1971   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
1972
1973   if (svm_fifo_is_empty_cons (rx_fifo))
1974     {
1975       if (is_nonblocking)
1976         {
1977           if (is_ct)
1978             svm_fifo_unset_event (s->rx_fifo);
1979           svm_fifo_unset_event (rx_fifo);
1980           return VPPCOM_EWOULDBLOCK;
1981         }
1982       while (svm_fifo_is_empty_cons (rx_fifo))
1983         {
1984           if (vcl_session_is_closing (s))
1985             return vcl_session_closing_error (s);
1986
1987           if (is_ct)
1988             svm_fifo_unset_event (s->rx_fifo);
1989           svm_fifo_unset_event (rx_fifo);
1990           svm_msg_q_lock (mq);
1991           if (svm_msg_q_is_empty (mq))
1992             svm_msg_q_wait (mq);
1993
1994           svm_msg_q_sub_w_lock (mq, &msg);
1995           e = svm_msg_q_msg_data (mq, &msg);
1996           svm_msg_q_unlock (mq);
1997           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1998             vcl_handle_mq_event (wrk, e);
1999           svm_msg_q_free_msg (mq, &msg);
2000         }
2001     }
2002
2003   n_read = svm_fifo_segments (rx_fifo, s->rx_bytes_pending,
2004                               (svm_fifo_seg_t *) ds, n_segments, max_bytes);
2005   if (n_read < 0)
2006     return VPPCOM_EAGAIN;
2007
2008   if (svm_fifo_max_dequeue_cons (rx_fifo) == n_read)
2009     {
2010       if (is_ct)
2011         svm_fifo_unset_event (s->rx_fifo);
2012       svm_fifo_unset_event (rx_fifo);
2013       if (svm_fifo_max_dequeue_cons (rx_fifo) != n_read
2014           && svm_fifo_set_event (rx_fifo)
2015           && vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
2016         {
2017           session_event_t *e;
2018           vec_add2 (wrk->unhandled_evts_vector, e, 1);
2019           e->event_type = SESSION_IO_EVT_RX;
2020           e->session_index = s->session_index;
2021         }
2022     }
2023
2024   s->rx_bytes_pending += n_read;
2025   return n_read;
2026 }
2027
2028 void
2029 vppcom_session_free_segments (uint32_t session_handle, uint32_t n_bytes)
2030 {
2031   vcl_worker_t *wrk = vcl_worker_get_current ();
2032   vcl_session_t *s;
2033   u8 is_ct;
2034
2035   s = vcl_session_get_w_handle (wrk, session_handle);
2036   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
2037     return;
2038
2039   is_ct = vcl_session_is_ct (s);
2040   svm_fifo_dequeue_drop (is_ct ? s->ct_rx_fifo : s->rx_fifo, n_bytes);
2041
2042   ASSERT (s->rx_bytes_pending < n_bytes);
2043   s->rx_bytes_pending -= n_bytes;
2044 }
2045
2046 static u8
2047 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
2048 {
2049   return (e->event_type == SESSION_IO_EVT_TX && e->session_index == sid);
2050 }
2051
2052 always_inline u8
2053 vcl_fifo_is_writeable (svm_fifo_t * f, u32 len, u8 is_dgram)
2054 {
2055   u32 max_enq = svm_fifo_max_enqueue_prod (f);
2056   if (is_dgram)
2057     return max_enq >= (sizeof (session_dgram_hdr_t) + len);
2058   else
2059     return max_enq > 0;
2060 }
2061
2062 always_inline int
2063 vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
2064                              size_t n, u8 is_flush, u8 is_dgram)
2065 {
2066   int n_write, is_nonblocking;
2067   session_evt_type_t et;
2068   svm_msg_q_msg_t msg;
2069   svm_fifo_t *tx_fifo;
2070   session_event_t *e;
2071   svm_msg_q_t *mq;
2072   u8 is_ct;
2073
2074   if (PREDICT_FALSE (!buf || n == 0))
2075     return VPPCOM_EINVAL;
2076
2077   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2078     {
2079       VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
2080             " session!", s->session_index, s->vpp_handle);
2081       return VPPCOM_EBADFD;
2082     }
2083
2084   if (PREDICT_FALSE (!vcl_session_is_open (s)))
2085     {
2086       VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
2087             s->session_index, s->vpp_handle, s->session_state,
2088             vppcom_session_state_str (s->session_state));
2089       return vcl_session_closed_error (s);;
2090     }
2091
2092   is_ct = vcl_session_is_ct (s);
2093   tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
2094   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
2095
2096   mq = wrk->app_event_queue;
2097   if (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2098     {
2099       if (is_nonblocking)
2100         {
2101           return VPPCOM_EWOULDBLOCK;
2102         }
2103       while (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2104         {
2105           svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
2106           if (vcl_session_is_closing (s))
2107             return vcl_session_closing_error (s);
2108           svm_msg_q_lock (mq);
2109           if (svm_msg_q_is_empty (mq))
2110             svm_msg_q_wait (mq);
2111
2112           svm_msg_q_sub_w_lock (mq, &msg);
2113           e = svm_msg_q_msg_data (mq, &msg);
2114           svm_msg_q_unlock (mq);
2115
2116           if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
2117             vcl_handle_mq_event (wrk, e);
2118           svm_msg_q_free_msg (mq, &msg);
2119         }
2120     }
2121
2122   et = SESSION_IO_EVT_TX;
2123   if (is_flush && !is_ct)
2124     et = SESSION_IO_EVT_TX_FLUSH;
2125
2126   if (is_dgram)
2127     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
2128                                   s->vpp_evt_q, buf, n, et,
2129                                   0 /* do_evt */ , SVM_Q_WAIT);
2130   else
2131     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
2132                                    0 /* do_evt */ , SVM_Q_WAIT);
2133
2134   if (svm_fifo_set_event (s->tx_fifo))
2135     app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo->master_session_index,
2136                             et, SVM_Q_WAIT);
2137
2138   /* The underlying fifo segment can run out of memory */
2139   if (PREDICT_FALSE (n_write < 0))
2140     return VPPCOM_EAGAIN;
2141
2142   VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
2143         s->vpp_handle, n_write);
2144
2145   return n_write;
2146 }
2147
2148 int
2149 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
2150 {
2151   vcl_worker_t *wrk = vcl_worker_get_current ();
2152   vcl_session_t *s;
2153
2154   s = vcl_session_get_w_handle (wrk, session_handle);
2155   if (PREDICT_FALSE (!s))
2156     return VPPCOM_EBADFD;
2157
2158   return vppcom_session_write_inline (wrk, s, buf, n,
2159                                       0 /* is_flush */ , s->is_dgram ? 1 : 0);
2160 }
2161
2162 int
2163 vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n)
2164 {
2165   vcl_worker_t *wrk = vcl_worker_get_current ();
2166   vcl_session_t *s;
2167
2168   s = vcl_session_get_w_handle (wrk, session_handle);
2169   if (PREDICT_FALSE (!s))
2170     return VPPCOM_EBADFD;
2171
2172   return vppcom_session_write_inline (wrk, s, buf, n,
2173                                       1 /* is_flush */ , s->is_dgram ? 1 : 0);
2174 }
2175
2176 #define vcl_fifo_rx_evt_valid_or_break(_s)                              \
2177 if (PREDICT_FALSE (!_s->rx_fifo))                                       \
2178   break;                                                                \
2179 if (PREDICT_FALSE (svm_fifo_is_empty (_s->rx_fifo)))                    \
2180   {                                                                     \
2181     if (!vcl_session_is_ct (_s))                                        \
2182       {                                                                 \
2183         svm_fifo_unset_event (_s->rx_fifo);                             \
2184         if (svm_fifo_is_empty (_s->rx_fifo))                            \
2185           break;                                                        \
2186       }                                                                 \
2187     else if (svm_fifo_is_empty (_s->ct_rx_fifo))                        \
2188       {                                                                 \
2189         svm_fifo_unset_event (_s->rx_fifo); /* rx evts on actual fifo*/ \
2190         if (svm_fifo_is_empty (_s->ct_rx_fifo))                         \
2191           break;                                                        \
2192       }                                                                 \
2193   }                                                                     \
2194
2195 static void
2196 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2197                             unsigned long n_bits, unsigned long *read_map,
2198                             unsigned long *write_map,
2199                             unsigned long *except_map, u32 * bits_set)
2200 {
2201   session_disconnected_msg_t *disconnected_msg;
2202   session_connected_msg_t *connected_msg;
2203   vcl_session_t *s;
2204   u32 sid;
2205
2206   switch (e->event_type)
2207     {
2208     case SESSION_IO_EVT_RX:
2209       sid = e->session_index;
2210       s = vcl_session_get (wrk, sid);
2211       if (!s || !vcl_session_is_open (s))
2212         break;
2213       vcl_fifo_rx_evt_valid_or_break (s);
2214       if (sid < n_bits && read_map)
2215         {
2216           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2217           *bits_set += 1;
2218         }
2219       break;
2220     case SESSION_IO_EVT_TX:
2221       sid = e->session_index;
2222       s = vcl_session_get (wrk, sid);
2223       if (!s || !vcl_session_is_open (s))
2224         break;
2225       if (sid < n_bits && write_map)
2226         {
2227           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2228           *bits_set += 1;
2229         }
2230       break;
2231     case SESSION_CTRL_EVT_ACCEPTED:
2232       if (!e->postponed)
2233         s = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
2234       else
2235         s = vcl_session_get (wrk, e->session_index);
2236       if (!s)
2237         break;
2238       sid = s->session_index;
2239       if (sid < n_bits && read_map)
2240         {
2241           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2242           *bits_set += 1;
2243         }
2244       break;
2245     case SESSION_CTRL_EVT_CONNECTED:
2246       if (!e->postponed)
2247         {
2248           connected_msg = (session_connected_msg_t *) e->data;
2249           sid = vcl_session_connected_handler (wrk, connected_msg);
2250         }
2251       else
2252         sid = e->session_index;
2253       if (sid == VCL_INVALID_SESSION_INDEX)
2254         break;
2255       if (sid < n_bits && write_map)
2256         {
2257           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2258           *bits_set += 1;
2259         }
2260       break;
2261     case SESSION_CTRL_EVT_DISCONNECTED:
2262       disconnected_msg = (session_disconnected_msg_t *) e->data;
2263       s = vcl_session_disconnected_handler (wrk, disconnected_msg);
2264       if (!s)
2265         break;
2266       sid = s->session_index;
2267       if (sid < n_bits && except_map)
2268         {
2269           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2270           *bits_set += 1;
2271         }
2272       break;
2273     case SESSION_CTRL_EVT_RESET:
2274       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2275       if (sid < n_bits && except_map)
2276         {
2277           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2278           *bits_set += 1;
2279         }
2280       break;
2281     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2282       vcl_session_unlisten_reply_handler (wrk, e->data);
2283       break;
2284     case SESSION_CTRL_EVT_MIGRATED:
2285       vcl_session_migrated_handler (wrk, e->data);
2286       break;
2287     case SESSION_CTRL_EVT_CLEANUP:
2288       vcl_session_cleanup_handler (wrk, e->data);
2289       break;
2290     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2291       vcl_session_worker_update_reply_handler (wrk, e->data);
2292       break;
2293     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2294       vcl_session_req_worker_update_handler (wrk, e->data);
2295       break;
2296     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
2297       vcl_session_app_add_segment_handler (wrk, e->data);
2298       break;
2299     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
2300       vcl_session_app_del_segment_handler (wrk, e->data);
2301       break;
2302     case SESSION_CTRL_EVT_APP_WRK_RPC:
2303       vcl_worker_rpc_handler (wrk, e->data);
2304       break;
2305     default:
2306       clib_warning ("unhandled: %u", e->event_type);
2307       break;
2308     }
2309 }
2310
2311 static int
2312 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2313                       unsigned long n_bits, unsigned long *read_map,
2314                       unsigned long *write_map, unsigned long *except_map,
2315                       double time_to_wait, u32 * bits_set)
2316 {
2317   svm_msg_q_msg_t *msg;
2318   session_event_t *e;
2319   u32 i;
2320
2321   svm_msg_q_lock (mq);
2322   if (svm_msg_q_is_empty (mq))
2323     {
2324       if (*bits_set)
2325         {
2326           svm_msg_q_unlock (mq);
2327           return 0;
2328         }
2329
2330       if (!time_to_wait)
2331         {
2332           svm_msg_q_unlock (mq);
2333           return 0;
2334         }
2335       else if (time_to_wait < 0)
2336         {
2337           svm_msg_q_wait (mq);
2338         }
2339       else
2340         {
2341           if (svm_msg_q_timedwait (mq, time_to_wait))
2342             {
2343               svm_msg_q_unlock (mq);
2344               return 0;
2345             }
2346         }
2347     }
2348   vcl_mq_dequeue_batch (wrk, mq, ~0);
2349   svm_msg_q_unlock (mq);
2350
2351   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2352     {
2353       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2354       e = svm_msg_q_msg_data (mq, msg);
2355       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
2356                                   except_map, bits_set);
2357       svm_msg_q_free_msg (mq, msg);
2358     }
2359   vec_reset_length (wrk->mq_msg_vector);
2360   vcl_handle_pending_wrk_updates (wrk);
2361   return *bits_set;
2362 }
2363
2364 static int
2365 vppcom_select_condvar (vcl_worker_t * wrk, int n_bits,
2366                        vcl_si_set * read_map, vcl_si_set * write_map,
2367                        vcl_si_set * except_map, double time_to_wait,
2368                        u32 * bits_set)
2369 {
2370   double wait = 0, start = 0;
2371
2372   if (!*bits_set)
2373     {
2374       wait = time_to_wait;
2375       start = clib_time_now (&wrk->clib_time);
2376     }
2377
2378   do
2379     {
2380       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
2381                             write_map, except_map, wait, bits_set);
2382       if (*bits_set)
2383         return *bits_set;
2384       if (wait == -1)
2385         continue;
2386
2387       wait = wait - (clib_time_now (&wrk->clib_time) - start);
2388     }
2389   while (wait > 0);
2390
2391   return 0;
2392 }
2393
2394 static int
2395 vppcom_select_eventfd (vcl_worker_t * wrk, int n_bits,
2396                        vcl_si_set * read_map, vcl_si_set * write_map,
2397                        vcl_si_set * except_map, double time_to_wait,
2398                        u32 * bits_set)
2399 {
2400   vcl_mq_evt_conn_t *mqc;
2401   int __clib_unused n_read;
2402   int n_mq_evts, i;
2403   u64 buf;
2404
2405   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2406   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2407                           vec_len (wrk->mq_events), time_to_wait);
2408   for (i = 0; i < n_mq_evts; i++)
2409     {
2410       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2411       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2412       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
2413                             except_map, 0, bits_set);
2414     }
2415
2416   return (n_mq_evts > 0 ? (int) *bits_set : 0);
2417 }
2418
2419 int
2420 vppcom_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
2421                vcl_si_set * except_map, double time_to_wait)
2422 {
2423   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
2424   vcl_worker_t *wrk = vcl_worker_get_current ();
2425   vcl_session_t *session = 0;
2426   int i;
2427
2428   if (n_bits && read_map)
2429     {
2430       clib_bitmap_validate (wrk->rd_bitmap, minbits);
2431       clib_memcpy_fast (wrk->rd_bitmap, read_map,
2432                         vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2433       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2434     }
2435   if (n_bits && write_map)
2436     {
2437       clib_bitmap_validate (wrk->wr_bitmap, minbits);
2438       clib_memcpy_fast (wrk->wr_bitmap, write_map,
2439                         vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2440       memset (write_map, 0, vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2441     }
2442   if (n_bits && except_map)
2443     {
2444       clib_bitmap_validate (wrk->ex_bitmap, minbits);
2445       clib_memcpy_fast (wrk->ex_bitmap, except_map,
2446                         vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2447       memset (except_map, 0, vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2448     }
2449
2450   if (!n_bits)
2451     return 0;
2452
2453   if (!write_map)
2454     goto check_rd;
2455
2456   /* *INDENT-OFF* */
2457   clib_bitmap_foreach (sid, wrk->wr_bitmap, ({
2458     if (!(session = vcl_session_get (wrk, sid)))
2459       {
2460         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
2461         bits_set++;
2462         continue;
2463       }
2464
2465     if (vcl_session_write_ready (session))
2466       {
2467         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
2468         bits_set++;
2469       }
2470     else
2471       svm_fifo_add_want_deq_ntf (session->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
2472   }));
2473
2474 check_rd:
2475   if (!read_map)
2476     goto check_mq;
2477
2478   clib_bitmap_foreach (sid, wrk->rd_bitmap, ({
2479     if (!(session = vcl_session_get (wrk, sid)))
2480       {
2481         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
2482         bits_set++;
2483         continue;
2484       }
2485
2486     if (vcl_session_read_ready (session))
2487       {
2488         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
2489         bits_set++;
2490       }
2491   }));
2492   /* *INDENT-ON* */
2493
2494 check_mq:
2495
2496   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2497     {
2498       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
2499                                   read_map, write_map, except_map, &bits_set);
2500     }
2501   vec_reset_length (wrk->unhandled_evts_vector);
2502
2503   if (vcm->cfg.use_mq_eventfd)
2504     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
2505                            time_to_wait, &bits_set);
2506   else
2507     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
2508                            time_to_wait, &bits_set);
2509
2510   return (bits_set);
2511 }
2512
2513 static inline void
2514 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_handle)
2515 {
2516   vppcom_epoll_t *vep;
2517   u32 sh = vep_handle;
2518   vcl_session_t *s;
2519
2520   if (VPPCOM_DEBUG <= 2)
2521     return;
2522
2523   s = vcl_session_get_w_handle (wrk, vep_handle);
2524   if (PREDICT_FALSE (!s))
2525     {
2526       VDBG (0, "ERROR: Invalid vep_sh (%u)!", vep_handle);
2527       goto done;
2528     }
2529   if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP)))
2530     {
2531       VDBG (0, "ERROR: vep_sh (%u) is not a vep!", vep_handle);
2532       goto done;
2533     }
2534   vep = &s->vep;
2535   VDBG (0, "vep_sh (%u): Dumping epoll chain\n"
2536         "{\n"
2537         "   is_vep         = %u\n"
2538         "   is_vep_session = %u\n"
2539         "   next_sh        = 0x%x (%u)\n"
2540         "}\n", vep_handle, s->flags & VCL_SESSION_F_IS_VEP,
2541         s->flags & VCL_SESSION_F_IS_VEP_SESSION, vep->next_sh, vep->next_sh);
2542
2543   for (sh = vep->next_sh; sh != ~0; sh = vep->next_sh)
2544     {
2545       s = vcl_session_get_w_handle (wrk, sh);
2546       if (PREDICT_FALSE (!s))
2547         {
2548           VDBG (0, "ERROR: Invalid sh (%u)!", sh);
2549           goto done;
2550         }
2551       if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2552         {
2553           VDBG (0, "ERROR: sh (%u) is a vep!", vep_handle);
2554         }
2555       else if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2556         {
2557           VDBG (0, "ERROR: sh (%u) is not a vep session handle!", sh);
2558           goto done;
2559         }
2560       vep = &s->vep;
2561       if (PREDICT_FALSE (vep->vep_sh != vep_handle))
2562         VDBG (0, "ERROR: session (%u) vep_sh (%u) != vep_sh (%u)!",
2563               sh, s->vep.vep_sh, vep_handle);
2564       if (s->flags & VCL_SESSION_F_IS_VEP_SESSION)
2565         {
2566           VDBG (0, "vep_sh[%u]: sh 0x%x (%u)\n"
2567                 "{\n"
2568                 "   next_sh        = 0x%x (%u)\n"
2569                 "   prev_sh        = 0x%x (%u)\n"
2570                 "   vep_sh         = 0x%x (%u)\n"
2571                 "   ev.events      = 0x%x\n"
2572                 "   ev.data.u64    = 0x%llx\n"
2573                 "   et_mask        = 0x%x\n"
2574                 "}\n",
2575                 vep_handle, sh, sh, vep->next_sh, vep->next_sh, vep->prev_sh,
2576                 vep->prev_sh, vep->vep_sh, vep->vep_sh, vep->ev.events,
2577                 vep->ev.data.u64, vep->et_mask);
2578         }
2579     }
2580
2581 done:
2582   VDBG (0, "vep_sh (%u): Dump complete!\n", vep_handle);
2583 }
2584
2585 int
2586 vppcom_epoll_create (void)
2587 {
2588   vcl_worker_t *wrk = vcl_worker_get_current ();
2589   vcl_session_t *vep_session;
2590
2591   vep_session = vcl_session_alloc (wrk);
2592
2593   vep_session->flags |= VCL_SESSION_F_IS_VEP;
2594   vep_session->vep.vep_sh = ~0;
2595   vep_session->vep.next_sh = ~0;
2596   vep_session->vep.prev_sh = ~0;
2597   vep_session->vpp_handle = ~0;
2598
2599   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_session->session_index);
2600   VDBG (0, "Created vep_idx %u", vep_session->session_index);
2601
2602   return vcl_session_handle (vep_session);
2603 }
2604
2605 int
2606 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2607                   struct epoll_event *event)
2608 {
2609   vcl_worker_t *wrk = vcl_worker_get_current ();
2610   vcl_session_t *vep_session;
2611   int rv = VPPCOM_OK;
2612   vcl_session_t *s;
2613   svm_fifo_t *txf;
2614
2615   if (vep_handle == session_handle)
2616     {
2617       VDBG (0, "vep_sh == session handle (%u)!", vep_handle);
2618       return VPPCOM_EINVAL;
2619     }
2620
2621   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2622   if (PREDICT_FALSE (!vep_session))
2623     {
2624       VDBG (0, "Invalid vep_sh (%u)!", vep_handle);
2625       return VPPCOM_EBADFD;
2626     }
2627   if (PREDICT_FALSE (!(vep_session->flags & VCL_SESSION_F_IS_VEP)))
2628     {
2629       VDBG (0, "vep_sh (%u) is not a vep!", vep_handle);
2630       return VPPCOM_EINVAL;
2631     }
2632
2633   ASSERT (vep_session->vep.vep_sh == ~0);
2634   ASSERT (vep_session->vep.prev_sh == ~0);
2635
2636   s = vcl_session_get_w_handle (wrk, session_handle);
2637   if (PREDICT_FALSE (!s))
2638     {
2639       VDBG (0, "Invalid session_handle (%u)!", session_handle);
2640       return VPPCOM_EBADFD;
2641     }
2642   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2643     {
2644       VDBG (0, "session_handle (%u) is a vep!", vep_handle);
2645       return VPPCOM_EINVAL;
2646     }
2647
2648   switch (op)
2649     {
2650     case EPOLL_CTL_ADD:
2651       if (PREDICT_FALSE (!event))
2652         {
2653           VDBG (0, "EPOLL_CTL_ADD: NULL pointer to epoll_event structure!");
2654           return VPPCOM_EINVAL;
2655         }
2656       if (vep_session->vep.next_sh != ~0)
2657         {
2658           vcl_session_t *next_session;
2659           next_session = vcl_session_get_w_handle (wrk,
2660                                                    vep_session->vep.next_sh);
2661           if (PREDICT_FALSE (!next_session))
2662             {
2663               VDBG (0, "EPOLL_CTL_ADD: Invalid vep.next_sh (%u) on "
2664                     "vep_idx (%u)!", vep_session->vep.next_sh, vep_handle);
2665               return VPPCOM_EBADFD;
2666             }
2667           ASSERT (next_session->vep.prev_sh == vep_handle);
2668           next_session->vep.prev_sh = session_handle;
2669         }
2670       s->vep.next_sh = vep_session->vep.next_sh;
2671       s->vep.prev_sh = vep_handle;
2672       s->vep.vep_sh = vep_handle;
2673       s->vep.et_mask = VEP_DEFAULT_ET_MASK;
2674       s->vep.ev = *event;
2675       s->flags &= ~VCL_SESSION_F_IS_VEP;
2676       s->flags |= VCL_SESSION_F_IS_VEP_SESSION;
2677       vep_session->vep.next_sh = session_handle;
2678
2679       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2680       if (txf && (event->events & EPOLLOUT))
2681         svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2682
2683       /* Generate EPOLLOUT if tx fifo not full */
2684       if ((event->events & EPOLLOUT) && (vcl_session_write_ready (s) > 0))
2685         {
2686           session_event_t e = { 0 };
2687           e.event_type = SESSION_IO_EVT_TX;
2688           e.session_index = s->session_index;
2689           vec_add1 (wrk->unhandled_evts_vector, e);
2690         }
2691       /* Generate EPOLLIN if rx fifo has data */
2692       if ((event->events & EPOLLIN) && (vcl_session_read_ready (s) > 0))
2693         {
2694           session_event_t e = { 0 };
2695           e.event_type = SESSION_IO_EVT_RX;
2696           e.session_index = s->session_index;
2697           vec_add1 (wrk->unhandled_evts_vector, e);
2698         }
2699       VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2700             vep_handle, session_handle, event->events, event->data.u64);
2701       vcl_evt (VCL_EVT_EPOLL_CTLADD, s, event->events, event->data.u64);
2702       break;
2703
2704     case EPOLL_CTL_MOD:
2705       if (PREDICT_FALSE (!event))
2706         {
2707           VDBG (0, "EPOLL_CTL_MOD: NULL pointer to epoll_event structure!");
2708           rv = VPPCOM_EINVAL;
2709           goto done;
2710         }
2711       else if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2712         {
2713           VDBG (0, "sh %u EPOLL_CTL_MOD: not a vep session!", session_handle);
2714           rv = VPPCOM_EINVAL;
2715           goto done;
2716         }
2717       else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
2718         {
2719           VDBG (0, "EPOLL_CTL_MOD: sh %u vep_sh (%u) != vep_sh (%u)!",
2720                 session_handle, s->vep.vep_sh, vep_handle);
2721           rv = VPPCOM_EINVAL;
2722           goto done;
2723         }
2724
2725       /* Generate EPOLLOUT when tx_fifo/ct_tx_fifo not full */
2726       if ((event->events & EPOLLOUT) &&
2727           !(s->vep.ev.events & EPOLLOUT) && (vcl_session_write_ready (s) > 0))
2728         {
2729           session_event_t e = { 0 };
2730           e.event_type = SESSION_IO_EVT_TX;
2731           e.session_index = s->session_index;
2732           vec_add1 (wrk->unhandled_evts_vector, e);
2733         }
2734       s->vep.et_mask = VEP_DEFAULT_ET_MASK;
2735       s->vep.ev = *event;
2736       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2737       if (event->events & EPOLLOUT)
2738         svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2739       else
2740         svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2741       VDBG (1, "EPOLL_CTL_MOD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2742             vep_handle, session_handle, event->events, event->data.u64);
2743       break;
2744
2745     case EPOLL_CTL_DEL:
2746       if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2747         {
2748           VDBG (0, "EPOLL_CTL_DEL: %u not a vep session!", session_handle);
2749           rv = VPPCOM_EINVAL;
2750           goto done;
2751         }
2752       else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
2753         {
2754           VDBG (0, "EPOLL_CTL_DEL: sh %u vep_sh (%u) != vep_sh (%u)!",
2755                 session_handle, s->vep.vep_sh, vep_handle);
2756           rv = VPPCOM_EINVAL;
2757           goto done;
2758         }
2759
2760       if (s->vep.prev_sh == vep_handle)
2761         vep_session->vep.next_sh = s->vep.next_sh;
2762       else
2763         {
2764           vcl_session_t *prev_session;
2765           prev_session = vcl_session_get_w_handle (wrk, s->vep.prev_sh);
2766           if (PREDICT_FALSE (!prev_session))
2767             {
2768               VDBG (0, "EPOLL_CTL_DEL: Invalid prev_sh (%u) on sh (%u)!",
2769                     s->vep.prev_sh, session_handle);
2770               return VPPCOM_EBADFD;
2771             }
2772           ASSERT (prev_session->vep.next_sh == session_handle);
2773           prev_session->vep.next_sh = s->vep.next_sh;
2774         }
2775       if (s->vep.next_sh != ~0)
2776         {
2777           vcl_session_t *next_session;
2778           next_session = vcl_session_get_w_handle (wrk, s->vep.next_sh);
2779           if (PREDICT_FALSE (!next_session))
2780             {
2781               VDBG (0, "EPOLL_CTL_DEL: Invalid next_sh (%u) on sh (%u)!",
2782                     s->vep.next_sh, session_handle);
2783               return VPPCOM_EBADFD;
2784             }
2785           ASSERT (next_session->vep.prev_sh == session_handle);
2786           next_session->vep.prev_sh = s->vep.prev_sh;
2787         }
2788
2789       memset (&s->vep, 0, sizeof (s->vep));
2790       s->vep.next_sh = ~0;
2791       s->vep.prev_sh = ~0;
2792       s->vep.vep_sh = ~0;
2793       s->flags &= ~VCL_SESSION_F_IS_VEP_SESSION;
2794
2795       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2796       if (txf)
2797         svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2798
2799       VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sh %u!", vep_handle,
2800             session_handle);
2801       vcl_evt (VCL_EVT_EPOLL_CTLDEL, s, vep_sh);
2802       break;
2803
2804     default:
2805       VDBG (0, "Invalid operation (%d)!", op);
2806       rv = VPPCOM_EINVAL;
2807     }
2808
2809   vep_verify_epoll_chain (wrk, vep_handle);
2810
2811 done:
2812   return rv;
2813 }
2814
2815 static inline void
2816 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2817                                 struct epoll_event *events, u32 * num_ev)
2818 {
2819   session_disconnected_msg_t *disconnected_msg;
2820   session_connected_msg_t *connected_msg;
2821   u32 sid = ~0, session_events;
2822   u64 session_evt_data = ~0;
2823   vcl_session_t *s;
2824   u8 add_event = 0;
2825
2826   switch (e->event_type)
2827     {
2828     case SESSION_IO_EVT_RX:
2829       sid = e->session_index;
2830       s = vcl_session_get (wrk, sid);
2831       if (vcl_session_is_closed (s))
2832         break;
2833       vcl_fifo_rx_evt_valid_or_break (s);
2834       session_events = s->vep.ev.events;
2835       if (!(EPOLLIN & s->vep.ev.events)
2836           || (s->flags & VCL_SESSION_F_HAS_RX_EVT))
2837         break;
2838       add_event = 1;
2839       events[*num_ev].events |= EPOLLIN;
2840       session_evt_data = s->vep.ev.data.u64;
2841       s->flags |= VCL_SESSION_F_HAS_RX_EVT;
2842       break;
2843     case SESSION_IO_EVT_TX:
2844       sid = e->session_index;
2845       s = vcl_session_get (wrk, sid);
2846       if (vcl_session_is_closed (s))
2847         break;
2848       session_events = s->vep.ev.events;
2849       if (!(EPOLLOUT & session_events))
2850         break;
2851       add_event = 1;
2852       events[*num_ev].events |= EPOLLOUT;
2853       session_evt_data = s->vep.ev.data.u64;
2854       svm_fifo_reset_has_deq_ntf (vcl_session_is_ct (s) ?
2855                                   s->ct_tx_fifo : s->tx_fifo);
2856       break;
2857     case SESSION_CTRL_EVT_ACCEPTED:
2858       if (!e->postponed)
2859         s = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
2860       else
2861         s = vcl_session_get (wrk, e->session_index);
2862       if (!s)
2863         break;
2864       session_events = s->vep.ev.events;
2865       sid = s->session_index;
2866       if (!(EPOLLIN & session_events))
2867         break;
2868       add_event = 1;
2869       events[*num_ev].events |= EPOLLIN;
2870       session_evt_data = s->vep.ev.data.u64;
2871       break;
2872     case SESSION_CTRL_EVT_CONNECTED:
2873       if (!e->postponed)
2874         {
2875           connected_msg = (session_connected_msg_t *) e->data;
2876           sid = vcl_session_connected_handler (wrk, connected_msg);
2877         }
2878       else
2879         sid = e->session_index;
2880       s = vcl_session_get (wrk, sid);
2881       if (vcl_session_is_closed (s))
2882         break;
2883       session_events = s->vep.ev.events;
2884       /* Generate EPOLLOUT because there's no connected event */
2885       if (!(EPOLLOUT & session_events))
2886         break;
2887       add_event = 1;
2888       events[*num_ev].events |= EPOLLOUT;
2889       session_evt_data = s->vep.ev.data.u64;
2890       if (s->session_state == VCL_STATE_DETACHED)
2891         events[*num_ev].events |= EPOLLHUP;
2892       break;
2893     case SESSION_CTRL_EVT_DISCONNECTED:
2894       disconnected_msg = (session_disconnected_msg_t *) e->data;
2895       s = vcl_session_disconnected_handler (wrk, disconnected_msg);
2896       if (vcl_session_is_closed (s))
2897         break;
2898       sid = s->session_index;
2899       session_events = s->vep.ev.events;
2900       add_event = 1;
2901       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2902       session_evt_data = s->vep.ev.data.u64;
2903       break;
2904     case SESSION_CTRL_EVT_RESET:
2905       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2906       s = vcl_session_get (wrk, sid);
2907       if (vcl_session_is_closed (s))
2908         break;
2909       session_events = s->vep.ev.events;
2910       add_event = 1;
2911       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2912       session_evt_data = s->vep.ev.data.u64;
2913       break;
2914     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2915       vcl_session_unlisten_reply_handler (wrk, e->data);
2916       break;
2917     case SESSION_CTRL_EVT_MIGRATED:
2918       vcl_session_migrated_handler (wrk, e->data);
2919       break;
2920     case SESSION_CTRL_EVT_CLEANUP:
2921       vcl_session_cleanup_handler (wrk, e->data);
2922       break;
2923     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2924       vcl_session_req_worker_update_handler (wrk, e->data);
2925       break;
2926     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2927       vcl_session_worker_update_reply_handler (wrk, e->data);
2928       break;
2929     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
2930       vcl_session_app_add_segment_handler (wrk, e->data);
2931       break;
2932     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
2933       vcl_session_app_del_segment_handler (wrk, e->data);
2934       break;
2935     case SESSION_CTRL_EVT_APP_WRK_RPC:
2936       vcl_worker_rpc_handler (wrk, e->data);
2937       break;
2938     default:
2939       VDBG (0, "unhandled: %u", e->event_type);
2940       break;
2941     }
2942
2943   if (add_event)
2944     {
2945       events[*num_ev].data.u64 = session_evt_data;
2946       if (EPOLLONESHOT & session_events)
2947         {
2948           s = vcl_session_get (wrk, sid);
2949           s->vep.ev.events = 0;
2950         }
2951       *num_ev += 1;
2952     }
2953 }
2954
2955 static int
2956 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2957                           struct epoll_event *events, u32 maxevents,
2958                           double wait_for_time, u32 * num_ev)
2959 {
2960   svm_msg_q_msg_t *msg;
2961   session_event_t *e;
2962   int i;
2963
2964   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
2965     goto handle_dequeued;
2966
2967   svm_msg_q_lock (mq);
2968   if (svm_msg_q_is_empty (mq))
2969     {
2970       if (!wait_for_time)
2971         {
2972           svm_msg_q_unlock (mq);
2973           return 0;
2974         }
2975       else if (wait_for_time < 0)
2976         {
2977           svm_msg_q_wait (mq);
2978         }
2979       else
2980         {
2981           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2982             {
2983               svm_msg_q_unlock (mq);
2984               return 0;
2985             }
2986         }
2987     }
2988   ASSERT (maxevents > *num_ev);
2989   vcl_mq_dequeue_batch (wrk, mq, ~0);
2990   svm_msg_q_unlock (mq);
2991
2992 handle_dequeued:
2993   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2994     {
2995       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2996       e = svm_msg_q_msg_data (mq, msg);
2997       if (*num_ev < maxevents)
2998         vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
2999       else
3000         vcl_handle_mq_event (wrk, e);
3001       svm_msg_q_free_msg (mq, msg);
3002     }
3003   vec_reset_length (wrk->mq_msg_vector);
3004   vcl_handle_pending_wrk_updates (wrk);
3005   return *num_ev;
3006 }
3007
3008 static int
3009 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
3010                            int maxevents, u32 n_evts, double wait_for_time)
3011 {
3012   double wait = 0, start = 0, now;
3013
3014   if (!n_evts)
3015     {
3016       wait = wait_for_time;
3017       start = clib_time_now (&wrk->clib_time);
3018     }
3019
3020   do
3021     {
3022       vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events, maxevents,
3023                                 wait, &n_evts);
3024       if (n_evts)
3025         return n_evts;
3026       if (wait == -1)
3027         continue;
3028
3029       now = clib_time_now (&wrk->clib_time);
3030       wait -= (now - start) * 1e3;
3031       start = now;
3032     }
3033   while (wait > 0);
3034
3035   return 0;
3036 }
3037
3038 static int
3039 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
3040                            int maxevents, u32 n_evts, double wait_for_time)
3041 {
3042   vcl_mq_evt_conn_t *mqc;
3043   int __clib_unused n_read;
3044   int n_mq_evts, i;
3045   u64 buf;
3046
3047   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
3048 again:
3049   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
3050                           vec_len (wrk->mq_events), wait_for_time);
3051   for (i = 0; i < n_mq_evts; i++)
3052     {
3053       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
3054       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
3055       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
3056     }
3057   if (!n_evts && n_mq_evts > 0)
3058     goto again;
3059
3060   return (int) n_evts;
3061 }
3062
3063 int
3064 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
3065                    int maxevents, double wait_for_time)
3066 {
3067   vcl_worker_t *wrk = vcl_worker_get_current ();
3068   vcl_session_t *vep_session;
3069   u32 n_evts = 0;
3070   int i;
3071
3072   if (PREDICT_FALSE (maxevents <= 0))
3073     {
3074       VDBG (0, "ERROR: Invalid maxevents (%d)!", maxevents);
3075       return VPPCOM_EINVAL;
3076     }
3077
3078   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
3079   if (!vep_session)
3080     return VPPCOM_EBADFD;
3081
3082   if (PREDICT_FALSE (!(vep_session->flags & VCL_SESSION_F_IS_VEP)))
3083     {
3084       VDBG (0, "ERROR: vep_idx (%u) is not a vep!", vep_handle);
3085       return VPPCOM_EINVAL;
3086     }
3087
3088   memset (events, 0, sizeof (*events) * maxevents);
3089
3090   if (vec_len (wrk->unhandled_evts_vector))
3091     {
3092       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
3093         {
3094           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
3095                                           events, &n_evts);
3096           if (n_evts == maxevents)
3097             {
3098               vec_delete (wrk->unhandled_evts_vector, i + 1, 0);
3099               return n_evts;
3100             }
3101         }
3102       vec_reset_length (wrk->unhandled_evts_vector);
3103     }
3104
3105   if (vcm->cfg.use_mq_eventfd)
3106     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
3107                                       wait_for_time);
3108
3109   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
3110                                     wait_for_time);
3111 }
3112
3113 int
3114 vppcom_session_attr (uint32_t session_handle, uint32_t op,
3115                      void *buffer, uint32_t * buflen)
3116 {
3117   vcl_worker_t *wrk = vcl_worker_get_current ();
3118   vcl_session_t *session;
3119   int rv = VPPCOM_OK;
3120   u32 *flags = buffer, tmp_flags = 0;
3121   vppcom_endpt_t *ep = buffer;
3122
3123   session = vcl_session_get_w_handle (wrk, session_handle);
3124   if (!session)
3125     return VPPCOM_EBADFD;
3126
3127   switch (op)
3128     {
3129     case VPPCOM_ATTR_GET_NREAD:
3130       rv = vcl_session_read_ready (session);
3131       VDBG (2, "VPPCOM_ATTR_GET_NREAD: sh %u, nread = %d", session_handle,
3132             rv);
3133       break;
3134
3135     case VPPCOM_ATTR_GET_NWRITE:
3136       rv = vcl_session_write_ready (session);
3137       VDBG (2, "VPPCOM_ATTR_GET_NWRITE: sh %u, nwrite = %d", session_handle,
3138             rv);
3139       break;
3140
3141     case VPPCOM_ATTR_GET_FLAGS:
3142       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
3143         {
3144           *flags =
3145             O_RDWR |
3146             (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK) ?
3147              O_NONBLOCK : 0);
3148           *buflen = sizeof (*flags);
3149           VDBG (2, "VPPCOM_ATTR_GET_FLAGS: sh %u, flags = 0x%08x, "
3150                 "is_nonblocking = %u", session_handle, *flags,
3151                 vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK));
3152         }
3153       else
3154         rv = VPPCOM_EINVAL;
3155       break;
3156
3157     case VPPCOM_ATTR_SET_FLAGS:
3158       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
3159         {
3160           if (*flags & O_NONBLOCK)
3161             vcl_session_set_attr (session, VCL_SESS_ATTR_NONBLOCK);
3162           else
3163             vcl_session_clear_attr (session, VCL_SESS_ATTR_NONBLOCK);
3164
3165           VDBG (2, "VPPCOM_ATTR_SET_FLAGS: sh %u, flags = 0x%08x,"
3166                 " is_nonblocking = %u", session_handle, *flags,
3167                 vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK));
3168         }
3169       else
3170         rv = VPPCOM_EINVAL;
3171       break;
3172
3173     case VPPCOM_ATTR_GET_PEER_ADDR:
3174       if (PREDICT_TRUE (buffer && buflen &&
3175                         (*buflen >= sizeof (*ep)) && ep->ip))
3176         {
3177           ep->is_ip4 = session->transport.is_ip4;
3178           ep->port = session->transport.rmt_port;
3179           if (session->transport.is_ip4)
3180             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3181                               sizeof (ip4_address_t));
3182           else
3183             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3184                               sizeof (ip6_address_t));
3185           *buflen = sizeof (*ep);
3186           VDBG (1, "VPPCOM_ATTR_GET_PEER_ADDR: sh %u, is_ip4 = %u, "
3187                 "addr = %U, port %u", session_handle, ep->is_ip4,
3188                 format_ip46_address, &session->transport.rmt_ip,
3189                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3190                 clib_net_to_host_u16 (ep->port));
3191         }
3192       else
3193         rv = VPPCOM_EINVAL;
3194       break;
3195
3196     case VPPCOM_ATTR_GET_LCL_ADDR:
3197       if (PREDICT_TRUE (buffer && buflen &&
3198                         (*buflen >= sizeof (*ep)) && ep->ip))
3199         {
3200           ep->is_ip4 = session->transport.is_ip4;
3201           ep->port = session->transport.lcl_port;
3202           if (session->transport.is_ip4)
3203             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip4,
3204                               sizeof (ip4_address_t));
3205           else
3206             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip6,
3207                               sizeof (ip6_address_t));
3208           *buflen = sizeof (*ep);
3209           VDBG (1, "VPPCOM_ATTR_GET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3210                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3211                 &session->transport.lcl_ip,
3212                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3213                 clib_net_to_host_u16 (ep->port));
3214         }
3215       else
3216         rv = VPPCOM_EINVAL;
3217       break;
3218
3219     case VPPCOM_ATTR_SET_LCL_ADDR:
3220       if (PREDICT_TRUE (buffer && buflen &&
3221                         (*buflen >= sizeof (*ep)) && ep->ip))
3222         {
3223           session->transport.is_ip4 = ep->is_ip4;
3224           session->transport.lcl_port = ep->port;
3225           vcl_ip_copy_from_ep (&session->transport.lcl_ip, ep);
3226           *buflen = sizeof (*ep);
3227           VDBG (1, "VPPCOM_ATTR_SET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3228                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3229                 &session->transport.lcl_ip,
3230                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3231                 clib_net_to_host_u16 (ep->port));
3232         }
3233       else
3234         rv = VPPCOM_EINVAL;
3235       break;
3236
3237     case VPPCOM_ATTR_GET_LIBC_EPFD:
3238       rv = session->libc_epfd;
3239       VDBG (2, "VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d", rv);
3240       break;
3241
3242     case VPPCOM_ATTR_SET_LIBC_EPFD:
3243       if (PREDICT_TRUE (buffer && buflen &&
3244                         (*buflen == sizeof (session->libc_epfd))))
3245         {
3246           session->libc_epfd = *(int *) buffer;
3247           *buflen = sizeof (session->libc_epfd);
3248
3249           VDBG (2, "VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, buflen %d",
3250                 session->libc_epfd, *buflen);
3251         }
3252       else
3253         rv = VPPCOM_EINVAL;
3254       break;
3255
3256     case VPPCOM_ATTR_GET_PROTOCOL:
3257       if (buffer && buflen && (*buflen >= sizeof (int)))
3258         {
3259           *(int *) buffer = session->session_type;
3260           *buflen = sizeof (int);
3261
3262           VDBG (2, "VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
3263                 *(int *) buffer, *(int *) buffer ? "UDP" : "TCP", *buflen);
3264         }
3265       else
3266         rv = VPPCOM_EINVAL;
3267       break;
3268
3269     case VPPCOM_ATTR_GET_LISTEN:
3270       if (buffer && buflen && (*buflen >= sizeof (int)))
3271         {
3272           *(int *) buffer = vcl_session_has_attr (session,
3273                                                   VCL_SESS_ATTR_LISTEN);
3274           *buflen = sizeof (int);
3275
3276           VDBG (2, "VPPCOM_ATTR_GET_LISTEN: %d, buflen %d", *(int *) buffer,
3277                 *buflen);
3278         }
3279       else
3280         rv = VPPCOM_EINVAL;
3281       break;
3282
3283     case VPPCOM_ATTR_GET_ERROR:
3284       if (buffer && buflen && (*buflen >= sizeof (int)))
3285         {
3286           *(int *) buffer = 0;
3287           *buflen = sizeof (int);
3288
3289           VDBG (2, "VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
3290                 *(int *) buffer, *buflen);
3291         }
3292       else
3293         rv = VPPCOM_EINVAL;
3294       break;
3295
3296     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
3297       if (buffer && buflen && (*buflen >= sizeof (u32)))
3298         {
3299
3300           /* VPP-TBD */
3301           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
3302                                 session->tx_fifo ?
3303                                 svm_fifo_size (session->tx_fifo) :
3304                                 vcm->cfg.tx_fifo_size);
3305           *buflen = sizeof (u32);
3306
3307           VDBG (2, "VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3308                 " #VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer,
3309                 *buflen);
3310         }
3311       else
3312         rv = VPPCOM_EINVAL;
3313       break;
3314
3315     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
3316       if (buffer && buflen && (*buflen == sizeof (u32)))
3317         {
3318           /* VPP-TBD */
3319           session->sndbuf_size = *(u32 *) buffer;
3320           VDBG (2, "VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3321                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3322                 *buflen);
3323         }
3324       else
3325         rv = VPPCOM_EINVAL;
3326       break;
3327
3328     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
3329       if (buffer && buflen && (*buflen >= sizeof (u32)))
3330         {
3331
3332           /* VPP-TBD */
3333           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
3334                                 session->rx_fifo ?
3335                                 svm_fifo_size (session->rx_fifo) :
3336                                 vcm->cfg.rx_fifo_size);
3337           *buflen = sizeof (u32);
3338
3339           VDBG (2, "VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), buflen %d, "
3340                 "#VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer, *buflen);
3341         }
3342       else
3343         rv = VPPCOM_EINVAL;
3344       break;
3345
3346     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
3347       if (buffer && buflen && (*buflen == sizeof (u32)))
3348         {
3349           /* VPP-TBD */
3350           session->rcvbuf_size = *(u32 *) buffer;
3351           VDBG (2, "VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), buflen %d,"
3352                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3353                 *buflen);
3354         }
3355       else
3356         rv = VPPCOM_EINVAL;
3357       break;
3358
3359     case VPPCOM_ATTR_GET_REUSEADDR:
3360       if (buffer && buflen && (*buflen >= sizeof (int)))
3361         {
3362           /* VPP-TBD */
3363           *(int *) buffer = vcl_session_has_attr (session,
3364                                                   VCL_SESS_ATTR_REUSEADDR);
3365           *buflen = sizeof (int);
3366
3367           VDBG (2, "VPPCOM_ATTR_GET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3368                 *(int *) buffer, *buflen);
3369         }
3370       else
3371         rv = VPPCOM_EINVAL;
3372       break;
3373
3374     case VPPCOM_ATTR_SET_REUSEADDR:
3375       if (buffer && buflen && (*buflen == sizeof (int)) &&
3376           !vcl_session_has_attr (session, VCL_SESS_ATTR_LISTEN))
3377         {
3378           /* VPP-TBD */
3379           if (*(int *) buffer)
3380             vcl_session_set_attr (session, VCL_SESS_ATTR_REUSEADDR);
3381           else
3382             vcl_session_clear_attr (session, VCL_SESS_ATTR_REUSEADDR);
3383
3384           VDBG (2, "VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3385                 vcl_session_has_attr (session, VCL_SESS_ATTR_REUSEADDR),
3386                 *buflen);
3387         }
3388       else
3389         rv = VPPCOM_EINVAL;
3390       break;
3391
3392     case VPPCOM_ATTR_GET_REUSEPORT:
3393       if (buffer && buflen && (*buflen >= sizeof (int)))
3394         {
3395           /* VPP-TBD */
3396           *(int *) buffer = vcl_session_has_attr (session,
3397                                                   VCL_SESS_ATTR_REUSEPORT);
3398           *buflen = sizeof (int);
3399
3400           VDBG (2, "VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3401                 *(int *) buffer, *buflen);
3402         }
3403       else
3404         rv = VPPCOM_EINVAL;
3405       break;
3406
3407     case VPPCOM_ATTR_SET_REUSEPORT:
3408       if (buffer && buflen && (*buflen == sizeof (int)) &&
3409           !vcl_session_has_attr (session, VCL_SESS_ATTR_LISTEN))
3410         {
3411           /* VPP-TBD */
3412           if (*(int *) buffer)
3413             vcl_session_set_attr (session, VCL_SESS_ATTR_REUSEPORT);
3414           else
3415             vcl_session_clear_attr (session, VCL_SESS_ATTR_REUSEPORT);
3416
3417           VDBG (2, "VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3418                 vcl_session_has_attr (session, VCL_SESS_ATTR_REUSEPORT),
3419                 *buflen);
3420         }
3421       else
3422         rv = VPPCOM_EINVAL;
3423       break;
3424
3425     case VPPCOM_ATTR_GET_BROADCAST:
3426       if (buffer && buflen && (*buflen >= sizeof (int)))
3427         {
3428           /* VPP-TBD */
3429           *(int *) buffer = vcl_session_has_attr (session,
3430                                                   VCL_SESS_ATTR_BROADCAST);
3431           *buflen = sizeof (int);
3432
3433           VDBG (2, "VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3434                 *(int *) buffer, *buflen);
3435         }
3436       else
3437         rv = VPPCOM_EINVAL;
3438       break;
3439
3440     case VPPCOM_ATTR_SET_BROADCAST:
3441       if (buffer && buflen && (*buflen == sizeof (int)))
3442         {
3443           /* VPP-TBD */
3444           if (*(int *) buffer)
3445             vcl_session_set_attr (session, VCL_SESS_ATTR_BROADCAST);
3446           else
3447             vcl_session_clear_attr (session, VCL_SESS_ATTR_BROADCAST);
3448
3449           VDBG (2, "VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3450                 vcl_session_has_attr (session, VCL_SESS_ATTR_BROADCAST),
3451                 *buflen);
3452         }
3453       else
3454         rv = VPPCOM_EINVAL;
3455       break;
3456
3457     case VPPCOM_ATTR_GET_V6ONLY:
3458       if (buffer && buflen && (*buflen >= sizeof (int)))
3459         {
3460           /* VPP-TBD */
3461           *(int *) buffer = vcl_session_has_attr (session,
3462                                                   VCL_SESS_ATTR_V6ONLY);
3463           *buflen = sizeof (int);
3464
3465           VDBG (2, "VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3466                 *(int *) buffer, *buflen);
3467         }
3468       else
3469         rv = VPPCOM_EINVAL;
3470       break;
3471
3472     case VPPCOM_ATTR_SET_V6ONLY:
3473       if (buffer && buflen && (*buflen == sizeof (int)))
3474         {
3475           /* VPP-TBD */
3476           if (*(int *) buffer)
3477             vcl_session_set_attr (session, VCL_SESS_ATTR_V6ONLY);
3478           else
3479             vcl_session_clear_attr (session, VCL_SESS_ATTR_V6ONLY);
3480
3481           VDBG (2, "VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3482                 vcl_session_has_attr (session, VCL_SESS_ATTR_V6ONLY),
3483                 *buflen);
3484         }
3485       else
3486         rv = VPPCOM_EINVAL;
3487       break;
3488
3489     case VPPCOM_ATTR_GET_KEEPALIVE:
3490       if (buffer && buflen && (*buflen >= sizeof (int)))
3491         {
3492           /* VPP-TBD */
3493           *(int *) buffer = vcl_session_has_attr (session,
3494                                                   VCL_SESS_ATTR_KEEPALIVE);
3495           *buflen = sizeof (int);
3496
3497           VDBG (2, "VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3498                 *(int *) buffer, *buflen);
3499         }
3500       else
3501         rv = VPPCOM_EINVAL;
3502       break;
3503
3504     case VPPCOM_ATTR_SET_KEEPALIVE:
3505       if (buffer && buflen && (*buflen == sizeof (int)))
3506         {
3507           /* VPP-TBD */
3508           if (*(int *) buffer)
3509             vcl_session_set_attr (session, VCL_SESS_ATTR_KEEPALIVE);
3510           else
3511             vcl_session_clear_attr (session, VCL_SESS_ATTR_KEEPALIVE);
3512
3513           VDBG (2, "VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3514                 vcl_session_has_attr (session, VCL_SESS_ATTR_KEEPALIVE),
3515                 *buflen);
3516         }
3517       else
3518         rv = VPPCOM_EINVAL;
3519       break;
3520
3521     case VPPCOM_ATTR_GET_TCP_NODELAY:
3522       if (buffer && buflen && (*buflen >= sizeof (int)))
3523         {
3524           /* VPP-TBD */
3525           *(int *) buffer = vcl_session_has_attr (session,
3526                                                   VCL_SESS_ATTR_TCP_NODELAY);
3527           *buflen = sizeof (int);
3528
3529           VDBG (2, "VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3530                 *(int *) buffer, *buflen);
3531         }
3532       else
3533         rv = VPPCOM_EINVAL;
3534       break;
3535
3536     case VPPCOM_ATTR_SET_TCP_NODELAY:
3537       if (buffer && buflen && (*buflen == sizeof (int)))
3538         {
3539           /* VPP-TBD */
3540           if (*(int *) buffer)
3541             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_NODELAY);
3542           else
3543             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_NODELAY);
3544
3545           VDBG (2, "VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3546                 vcl_session_has_attr (session, VCL_SESS_ATTR_TCP_NODELAY),
3547                 *buflen);
3548         }
3549       else
3550         rv = VPPCOM_EINVAL;
3551       break;
3552
3553     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3554       if (buffer && buflen && (*buflen >= sizeof (int)))
3555         {
3556           /* VPP-TBD */
3557           *(int *) buffer = vcl_session_has_attr (session,
3558                                                   VCL_SESS_ATTR_TCP_KEEPIDLE);
3559           *buflen = sizeof (int);
3560
3561           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3562                 *(int *) buffer, *buflen);
3563         }
3564       else
3565         rv = VPPCOM_EINVAL;
3566       break;
3567
3568     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3569       if (buffer && buflen && (*buflen == sizeof (int)))
3570         {
3571           /* VPP-TBD */
3572           if (*(int *) buffer)
3573             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_KEEPIDLE);
3574           else
3575             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_KEEPIDLE);
3576
3577           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3578                 vcl_session_has_attr (session,
3579                                       VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3580         }
3581       else
3582         rv = VPPCOM_EINVAL;
3583       break;
3584
3585     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3586       if (buffer && buflen && (*buflen >= sizeof (int)))
3587         {
3588           /* VPP-TBD */
3589           *(int *) buffer = vcl_session_has_attr (session,
3590                                                   VCL_SESS_ATTR_TCP_KEEPINTVL);
3591           *buflen = sizeof (int);
3592
3593           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3594                 *(int *) buffer, *buflen);
3595         }
3596       else
3597         rv = VPPCOM_EINVAL;
3598       break;
3599
3600     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3601       if (buffer && buflen && (*buflen == sizeof (int)))
3602         {
3603           /* VPP-TBD */
3604           if (*(int *) buffer)
3605             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_KEEPINTVL);
3606           else
3607             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_KEEPINTVL);
3608
3609           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3610                 vcl_session_has_attr (session,
3611                                       VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3612         }
3613       else
3614         rv = VPPCOM_EINVAL;
3615       break;
3616
3617     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3618       if (buffer && buflen && (*buflen >= sizeof (u32)))
3619         {
3620           /* VPP-TBD */
3621           *(u32 *) buffer = session->user_mss;
3622           *buflen = sizeof (int);
3623
3624           VDBG (2, "VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d, #VPP-TBD#",
3625                 *(int *) buffer, *buflen);
3626         }
3627       else
3628         rv = VPPCOM_EINVAL;
3629       break;
3630
3631     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3632       if (buffer && buflen && (*buflen == sizeof (u32)))
3633         {
3634           /* VPP-TBD */
3635           session->user_mss = *(u32 *) buffer;
3636
3637           VDBG (2, "VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, #VPP-TBD#",
3638                 session->user_mss, *buflen);
3639         }
3640       else
3641         rv = VPPCOM_EINVAL;
3642       break;
3643
3644     case VPPCOM_ATTR_SET_SHUT:
3645       if (*flags == SHUT_RD || *flags == SHUT_RDWR)
3646         vcl_session_set_attr (session, VCL_SESS_ATTR_SHUT_RD);
3647       if (*flags == SHUT_WR || *flags == SHUT_RDWR)
3648         vcl_session_set_attr (session, VCL_SESS_ATTR_SHUT_WR);
3649       break;
3650
3651     case VPPCOM_ATTR_GET_SHUT:
3652       if (vcl_session_has_attr (session, VCL_SESS_ATTR_SHUT_RD))
3653         tmp_flags = 1;
3654       if (vcl_session_has_attr (session, VCL_SESS_ATTR_SHUT_WR))
3655         tmp_flags |= 2;
3656       if (tmp_flags == 1)
3657         *(int *) buffer = SHUT_RD;
3658       else if (tmp_flags == 2)
3659         *(int *) buffer = SHUT_WR;
3660       else if (tmp_flags == 3)
3661         *(int *) buffer = SHUT_RDWR;
3662       *buflen = sizeof (int);
3663       break;
3664
3665     case VPPCOM_ATTR_SET_CONNECTED:
3666       session->flags |= VCL_SESSION_F_CONNECTED;
3667       break;
3668
3669     default:
3670       rv = VPPCOM_EINVAL;
3671       break;
3672     }
3673
3674   return rv;
3675 }
3676
3677 int
3678 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3679                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3680 {
3681   vcl_worker_t *wrk = vcl_worker_get_current ();
3682   vcl_session_t *session;
3683   int rv = VPPCOM_OK;
3684
3685   if (flags == 0)
3686     rv = vppcom_session_read (session_handle, buffer, buflen);
3687   else if (flags & MSG_PEEK)
3688     rv = vppcom_session_peek (session_handle, buffer, buflen);
3689   else
3690     {
3691       VDBG (0, "Unsupport flags for recvfrom %d", flags);
3692       return VPPCOM_EAFNOSUPPORT;
3693     }
3694
3695   if (ep && rv > 0)
3696     {
3697       session = vcl_session_get_w_handle (wrk, session_handle);
3698       if (session->transport.is_ip4)
3699         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3700                           sizeof (ip4_address_t));
3701       else
3702         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3703                           sizeof (ip6_address_t));
3704       ep->is_ip4 = session->transport.is_ip4;
3705       ep->port = session->transport.rmt_port;
3706     }
3707
3708   return rv;
3709 }
3710
3711 int
3712 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3713                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3714 {
3715   vcl_worker_t *wrk = vcl_worker_get_current ();
3716   vcl_session_t *s;
3717
3718   s = vcl_session_get_w_handle (wrk, session_handle);
3719   if (!s)
3720     return VPPCOM_EBADFD;
3721
3722   if (!buffer)
3723     return VPPCOM_EINVAL;
3724
3725   if (ep)
3726     {
3727       if (!vcl_session_is_cl (s))
3728         return VPPCOM_EINVAL;
3729
3730       /* Session not connected/bound in vpp. Create it by 'connecting' it */
3731       if (PREDICT_FALSE (s->session_state == VCL_STATE_CLOSED))
3732         {
3733           u32 session_index = s->session_index;
3734           f64 timeout = vcm->cfg.session_timeout;
3735           int rv;
3736
3737           vcl_send_session_connect (wrk, s);
3738           rv = vppcom_wait_for_session_state_change (session_index,
3739                                                      VCL_STATE_READY,
3740                                                      timeout);
3741           if (rv < 0)
3742             return rv;
3743           s = vcl_session_get (wrk, session_index);
3744         }
3745
3746       s->transport.is_ip4 = ep->is_ip4;
3747       s->transport.rmt_port = ep->port;
3748       vcl_ip_copy_from_ep (&s->transport.rmt_ip, ep);
3749     }
3750
3751   if (flags)
3752     {
3753       // TBD check the flags and do the right thing
3754       VDBG (2, "handling flags 0x%u (%d) not implemented yet.", flags, flags);
3755     }
3756
3757   return (vppcom_session_write_inline (wrk, s, buffer, buflen, 1,
3758                                        s->is_dgram ? 1 : 0));
3759 }
3760
3761 int
3762 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3763 {
3764   vcl_worker_t *wrk = vcl_worker_get_current ();
3765   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3766   u32 i, keep_trying = 1;
3767   svm_msg_q_msg_t msg;
3768   session_event_t *e;
3769   int rv, num_ev = 0;
3770
3771   VDBG (3, "vp %p, nsids %u, wait_for_time %f", vp, n_sids, wait_for_time);
3772
3773   if (!vp)
3774     return VPPCOM_EFAULT;
3775
3776   do
3777     {
3778       vcl_session_t *session;
3779
3780       /* Dequeue all events and drop all unhandled io events */
3781       while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
3782         {
3783           e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
3784           vcl_handle_mq_event (wrk, e);
3785           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
3786         }
3787       vec_reset_length (wrk->unhandled_evts_vector);
3788
3789       for (i = 0; i < n_sids; i++)
3790         {
3791           session = vcl_session_get (wrk, vp[i].sh);
3792           if (!session)
3793             {
3794               vp[i].revents = POLLHUP;
3795               num_ev++;
3796               continue;
3797             }
3798
3799           vp[i].revents = 0;
3800
3801           if (POLLIN & vp[i].events)
3802             {
3803               rv = vcl_session_read_ready (session);
3804               if (rv > 0)
3805                 {
3806                   vp[i].revents |= POLLIN;
3807                   num_ev++;
3808                 }
3809               else if (rv < 0)
3810                 {
3811                   switch (rv)
3812                     {
3813                     case VPPCOM_ECONNRESET:
3814                       vp[i].revents = POLLHUP;
3815                       break;
3816
3817                     default:
3818                       vp[i].revents = POLLERR;
3819                       break;
3820                     }
3821                   num_ev++;
3822                 }
3823             }
3824
3825           if (POLLOUT & vp[i].events)
3826             {
3827               rv = vcl_session_write_ready (session);
3828               if (rv > 0)
3829                 {
3830                   vp[i].revents |= POLLOUT;
3831                   num_ev++;
3832                 }
3833               else if (rv < 0)
3834                 {
3835                   switch (rv)
3836                     {
3837                     case VPPCOM_ECONNRESET:
3838                       vp[i].revents = POLLHUP;
3839                       break;
3840
3841                     default:
3842                       vp[i].revents = POLLERR;
3843                       break;
3844                     }
3845                   num_ev++;
3846                 }
3847             }
3848
3849           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3850             {
3851               vp[i].revents = POLLNVAL;
3852               num_ev++;
3853             }
3854         }
3855       if (wait_for_time != -1)
3856         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3857     }
3858   while ((num_ev == 0) && keep_trying);
3859
3860   return num_ev;
3861 }
3862
3863 int
3864 vppcom_mq_epoll_fd (void)
3865 {
3866   vcl_worker_t *wrk = vcl_worker_get_current ();
3867   return wrk->mqs_epfd;
3868 }
3869
3870 int
3871 vppcom_session_index (vcl_session_handle_t session_handle)
3872 {
3873   return session_handle & 0xFFFFFF;
3874 }
3875
3876 int
3877 vppcom_session_worker (vcl_session_handle_t session_handle)
3878 {
3879   return session_handle >> 24;
3880 }
3881
3882 int
3883 vppcom_worker_register (void)
3884 {
3885   if (!vcl_worker_alloc_and_init ())
3886     return VPPCOM_EEXIST;
3887
3888   if (vcl_worker_register_with_vpp ())
3889     return VPPCOM_EEXIST;
3890
3891   return VPPCOM_OK;
3892 }
3893
3894 void
3895 vppcom_worker_unregister (void)
3896 {
3897   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
3898   vcl_set_worker_index (~0);
3899 }
3900
3901 void
3902 vppcom_worker_index_set (int index)
3903 {
3904   vcl_set_worker_index (index);
3905 }
3906
3907 int
3908 vppcom_worker_index (void)
3909 {
3910   return vcl_get_worker_index ();
3911 }
3912
3913 int
3914 vppcom_worker_mqs_epfd (void)
3915 {
3916   vcl_worker_t *wrk = vcl_worker_get_current ();
3917   if (!vcm->cfg.use_mq_eventfd)
3918     return -1;
3919   return wrk->mqs_epfd;
3920 }
3921
3922 int
3923 vppcom_session_is_connectable_listener (uint32_t session_handle)
3924 {
3925   vcl_session_t *session;
3926   vcl_worker_t *wrk = vcl_worker_get_current ();
3927   session = vcl_session_get_w_handle (wrk, session_handle);
3928   if (!session)
3929     return VPPCOM_EBADFD;
3930   return vcl_session_is_connectable_listener (wrk, session);
3931 }
3932
3933 int
3934 vppcom_session_listener (uint32_t session_handle)
3935 {
3936   vcl_worker_t *wrk = vcl_worker_get_current ();
3937   vcl_session_t *listen_session, *session;
3938   session = vcl_session_get_w_handle (wrk, session_handle);
3939   if (!session)
3940     return VPPCOM_EBADFD;
3941   if (session->listener_index == VCL_INVALID_SESSION_INDEX)
3942     return VPPCOM_EBADFD;
3943   listen_session = vcl_session_get_w_handle (wrk, session->listener_index);
3944   if (!listen_session)
3945     return VPPCOM_EBADFD;
3946   return vcl_session_handle (listen_session);
3947 }
3948
3949 int
3950 vppcom_session_n_accepted (uint32_t session_handle)
3951 {
3952   vcl_worker_t *wrk = vcl_worker_get_current ();
3953   vcl_session_t *session = vcl_session_get_w_handle (wrk, session_handle);
3954   if (!session)
3955     return VPPCOM_EBADFD;
3956   return session->n_accepted_sessions;
3957 }
3958
3959 const char *
3960 vppcom_proto_str (vppcom_proto_t proto)
3961 {
3962   char const *proto_str;
3963
3964   switch (proto)
3965     {
3966     case VPPCOM_PROTO_TCP:
3967       proto_str = "TCP";
3968       break;
3969     case VPPCOM_PROTO_UDP:
3970       proto_str = "UDP";
3971       break;
3972     case VPPCOM_PROTO_TLS:
3973       proto_str = "TLS";
3974       break;
3975     case VPPCOM_PROTO_QUIC:
3976       proto_str = "QUIC";
3977       break;
3978     default:
3979       proto_str = "UNKNOWN";
3980       break;
3981     }
3982   return proto_str;
3983 }
3984
3985 const char *
3986 vppcom_retval_str (int retval)
3987 {
3988   char const *st;
3989
3990   switch (retval)
3991     {
3992     case VPPCOM_OK:
3993       st = "VPPCOM_OK";
3994       break;
3995
3996     case VPPCOM_EAGAIN:
3997       st = "VPPCOM_EAGAIN";
3998       break;
3999
4000     case VPPCOM_EFAULT:
4001       st = "VPPCOM_EFAULT";
4002       break;
4003
4004     case VPPCOM_ENOMEM:
4005       st = "VPPCOM_ENOMEM";
4006       break;
4007
4008     case VPPCOM_EINVAL:
4009       st = "VPPCOM_EINVAL";
4010       break;
4011
4012     case VPPCOM_EBADFD:
4013       st = "VPPCOM_EBADFD";
4014       break;
4015
4016     case VPPCOM_EAFNOSUPPORT:
4017       st = "VPPCOM_EAFNOSUPPORT";
4018       break;
4019
4020     case VPPCOM_ECONNABORTED:
4021       st = "VPPCOM_ECONNABORTED";
4022       break;
4023
4024     case VPPCOM_ECONNRESET:
4025       st = "VPPCOM_ECONNRESET";
4026       break;
4027
4028     case VPPCOM_ENOTCONN:
4029       st = "VPPCOM_ENOTCONN";
4030       break;
4031
4032     case VPPCOM_ECONNREFUSED:
4033       st = "VPPCOM_ECONNREFUSED";
4034       break;
4035
4036     case VPPCOM_ETIMEDOUT:
4037       st = "VPPCOM_ETIMEDOUT";
4038       break;
4039
4040     default:
4041       st = "UNKNOWN_STATE";
4042       break;
4043     }
4044
4045   return st;
4046 }
4047
4048 /*
4049  * fd.io coding-style-patch-verification: ON
4050  *
4051  * Local Variables:
4052  * eval: (c-set-style "gnu")
4053  * End:
4054  */