vcl: confirm reset on transport cleanup
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <vcl/vppcom.h>
19 #include <vcl/vcl_debug.h>
20 #include <vcl/vcl_private.h>
21 #include <svm/fifo_segment.h>
22
23 __thread uword __vcl_worker_index = ~0;
24
25 static int
26 vcl_segment_is_not_mounted (vcl_worker_t * wrk, u64 segment_handle)
27 {
28   u32 segment_index;
29
30   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
31     return 0;
32
33   segment_index = vcl_segment_table_lookup (segment_handle);
34   if (segment_index != VCL_INVALID_SEGMENT_INDEX)
35     return 0;
36
37   return 1;
38 }
39
40 static inline int
41 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
42 {
43   svm_msg_q_msg_t *msg;
44   u32 n_msgs;
45   int i;
46
47   n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
48   for (i = 0; i < n_msgs; i++)
49     {
50       vec_add2 (wrk->mq_msg_vector, msg, 1);
51       svm_msg_q_sub_w_lock (mq, msg);
52     }
53   return n_msgs;
54 }
55
56 const char *
57 vppcom_session_state_str (vcl_session_state_t state)
58 {
59   char *st;
60
61   switch (state)
62     {
63     case VCL_STATE_CLOSED:
64       st = "STATE_CLOSED";
65       break;
66     case VCL_STATE_LISTEN:
67       st = "STATE_LISTEN";
68       break;
69     case VCL_STATE_READY:
70       st = "STATE_READY";
71       break;
72     case VCL_STATE_VPP_CLOSING:
73       st = "STATE_VPP_CLOSING";
74       break;
75     case VCL_STATE_DISCONNECT:
76       st = "STATE_DISCONNECT";
77       break;
78     case VCL_STATE_DETACHED:
79       st = "STATE_DETACHED";
80       break;
81     case VCL_STATE_UPDATED:
82       st = "STATE_UPDATED";
83       break;
84     case VCL_STATE_LISTEN_NO_MQ:
85       st = "STATE_LISTEN_NO_MQ";
86       break;
87     default:
88       st = "UNKNOWN_STATE";
89       break;
90     }
91
92   return st;
93 }
94
95 u8 *
96 format_ip4_address (u8 * s, va_list * args)
97 {
98   u8 *a = va_arg (*args, u8 *);
99   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
100 }
101
102 u8 *
103 format_ip6_address (u8 * s, va_list * args)
104 {
105   ip6_address_t *a = va_arg (*args, ip6_address_t *);
106   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
107
108   i_max_n_zero = ARRAY_LEN (a->as_u16);
109   max_n_zeros = 0;
110   i_first_zero = i_max_n_zero;
111   n_zeros = 0;
112   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
113     {
114       u32 is_zero = a->as_u16[i] == 0;
115       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
116         {
117           i_first_zero = i;
118           n_zeros = 0;
119         }
120       n_zeros += is_zero;
121       if ((!is_zero && n_zeros > max_n_zeros)
122           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
123         {
124           i_max_n_zero = i_first_zero;
125           max_n_zeros = n_zeros;
126           i_first_zero = ARRAY_LEN (a->as_u16);
127           n_zeros = 0;
128         }
129     }
130
131   last_double_colon = 0;
132   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
133     {
134       if (i == i_max_n_zero && max_n_zeros > 1)
135         {
136           s = format (s, "::");
137           i += max_n_zeros - 1;
138           last_double_colon = 1;
139         }
140       else
141         {
142           s = format (s, "%s%x",
143                       (last_double_colon || i == 0) ? "" : ":",
144                       clib_net_to_host_u16 (a->as_u16[i]));
145           last_double_colon = 0;
146         }
147     }
148
149   return s;
150 }
151
152 /* Format an IP46 address. */
153 u8 *
154 format_ip46_address (u8 * s, va_list * args)
155 {
156   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
157   ip46_type_t type = va_arg (*args, ip46_type_t);
158   int is_ip4 = 1;
159
160   switch (type)
161     {
162     case IP46_TYPE_ANY:
163       is_ip4 = ip46_address_is_ip4 (ip46);
164       break;
165     case IP46_TYPE_IP4:
166       is_ip4 = 1;
167       break;
168     case IP46_TYPE_IP6:
169       is_ip4 = 0;
170       break;
171     }
172
173   return is_ip4 ?
174     format (s, "%U", format_ip4_address, &ip46->ip4) :
175     format (s, "%U", format_ip6_address, &ip46->ip6);
176 }
177
178 /*
179  * VPPCOM Utility Functions
180  */
181
182 static void
183 vcl_send_session_listen (vcl_worker_t * wrk, vcl_session_t * s)
184 {
185   app_session_evt_t _app_evt, *app_evt = &_app_evt;
186   session_listen_msg_t *mp;
187   svm_msg_q_t *mq;
188
189   mq = vcl_worker_ctrl_mq (wrk);
190   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_LISTEN);
191   mp = (session_listen_msg_t *) app_evt->evt->data;
192   memset (mp, 0, sizeof (*mp));
193   mp->client_index = wrk->api_client_handle;
194   mp->context = s->session_index;
195   mp->wrk_index = wrk->vpp_wrk_index;
196   mp->is_ip4 = s->transport.is_ip4;
197   clib_memcpy_fast (&mp->ip, &s->transport.lcl_ip, sizeof (mp->ip));
198   mp->port = s->transport.lcl_port;
199   mp->proto = s->session_type;
200   if (s->flags & VCL_SESSION_F_CONNECTED)
201     mp->flags = TRANSPORT_CFG_F_CONNECTED;
202   app_send_ctrl_evt_to_vpp (mq, app_evt);
203 }
204
205 static void
206 vcl_send_session_connect (vcl_worker_t * wrk, vcl_session_t * s)
207 {
208   app_session_evt_t _app_evt, *app_evt = &_app_evt;
209   session_connect_msg_t *mp;
210   svm_msg_q_t *mq;
211
212   mq = vcl_worker_ctrl_mq (wrk);
213   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_CONNECT);
214   mp = (session_connect_msg_t *) app_evt->evt->data;
215   memset (mp, 0, sizeof (*mp));
216   mp->client_index = wrk->api_client_handle;
217   mp->context = s->session_index;
218   mp->wrk_index = wrk->vpp_wrk_index;
219   mp->is_ip4 = s->transport.is_ip4;
220   mp->parent_handle = s->parent_handle;
221   clib_memcpy_fast (&mp->ip, &s->transport.rmt_ip, sizeof (mp->ip));
222   clib_memcpy_fast (&mp->lcl_ip, &s->transport.lcl_ip, sizeof (mp->lcl_ip));
223   mp->port = s->transport.rmt_port;
224   mp->lcl_port = s->transport.lcl_port;
225   mp->proto = s->session_type;
226   if (s->flags & VCL_SESSION_F_CONNECTED)
227     mp->flags |= TRANSPORT_CFG_F_CONNECTED;
228   app_send_ctrl_evt_to_vpp (mq, app_evt);
229 }
230
231 void
232 vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
233 {
234   app_session_evt_t _app_evt, *app_evt = &_app_evt;
235   session_unlisten_msg_t *mp;
236   svm_msg_q_t *mq;
237
238   mq = vcl_worker_ctrl_mq (wrk);
239   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_UNLISTEN);
240   mp = (session_unlisten_msg_t *) app_evt->evt->data;
241   memset (mp, 0, sizeof (*mp));
242   mp->client_index = wrk->api_client_handle;
243   mp->wrk_index = wrk->vpp_wrk_index;
244   mp->handle = s->vpp_handle;
245   mp->context = wrk->wrk_index;
246   app_send_ctrl_evt_to_vpp (mq, app_evt);
247 }
248
249 static void
250 vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s)
251 {
252   app_session_evt_t _app_evt, *app_evt = &_app_evt;
253   session_disconnect_msg_t *mp;
254   svm_msg_q_t *mq;
255
256   /* Send to thread that owns the session */
257   mq = s->vpp_evt_q;
258   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_DISCONNECT);
259   mp = (session_disconnect_msg_t *) app_evt->evt->data;
260   memset (mp, 0, sizeof (*mp));
261   mp->client_index = wrk->api_client_handle;
262   mp->handle = s->vpp_handle;
263   app_send_ctrl_evt_to_vpp (mq, app_evt);
264 }
265
266 static void
267 vcl_send_app_detach (vcl_worker_t * wrk)
268 {
269   app_session_evt_t _app_evt, *app_evt = &_app_evt;
270   session_app_detach_msg_t *mp;
271   svm_msg_q_t *mq;
272
273   mq = vcl_worker_ctrl_mq (wrk);
274   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_DETACH);
275   mp = (session_app_detach_msg_t *) app_evt->evt->data;
276   memset (mp, 0, sizeof (*mp));
277   mp->client_index = wrk->api_client_handle;
278   app_send_ctrl_evt_to_vpp (mq, app_evt);
279 }
280
281 static void
282 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
283                                  session_handle_t handle, int retval)
284 {
285   app_session_evt_t _app_evt, *app_evt = &_app_evt;
286   session_accepted_reply_msg_t *rmp;
287   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
288   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
289   rmp->handle = handle;
290   rmp->context = context;
291   rmp->retval = retval;
292   app_send_ctrl_evt_to_vpp (mq, app_evt);
293 }
294
295 static void
296 vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
297                                      session_handle_t handle, int retval)
298 {
299   app_session_evt_t _app_evt, *app_evt = &_app_evt;
300   session_disconnected_reply_msg_t *rmp;
301   app_alloc_ctrl_evt_to_vpp (mq, app_evt,
302                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
303   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
304   rmp->handle = handle;
305   rmp->context = context;
306   rmp->retval = retval;
307   app_send_ctrl_evt_to_vpp (mq, app_evt);
308 }
309
310 static void
311 vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
312                               session_handle_t handle, int retval)
313 {
314   app_session_evt_t _app_evt, *app_evt = &_app_evt;
315   session_reset_reply_msg_t *rmp;
316   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_RESET_REPLY);
317   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
318   rmp->handle = handle;
319   rmp->context = context;
320   rmp->retval = retval;
321   app_send_ctrl_evt_to_vpp (mq, app_evt);
322 }
323
324 void
325 vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
326                                 u32 wrk_index)
327 {
328   app_session_evt_t _app_evt, *app_evt = &_app_evt;
329   session_worker_update_msg_t *mp;
330   svm_msg_q_t *mq;
331
332   mq = vcl_session_vpp_evt_q (wrk, s);
333   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_WORKER_UPDATE);
334   mp = (session_worker_update_msg_t *) app_evt->evt->data;
335   mp->client_index = wrk->api_client_handle;
336   mp->handle = s->vpp_handle;
337   mp->req_wrk_index = wrk->vpp_wrk_index;
338   mp->wrk_index = wrk_index;
339   app_send_ctrl_evt_to_vpp (mq, app_evt);
340 }
341
342 int
343 vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len)
344 {
345   app_session_evt_t _app_evt, *app_evt = &_app_evt;
346   session_app_wrk_rpc_msg_t *mp;
347   vcl_worker_t *dst_wrk, *wrk;
348   svm_msg_q_t *mq;
349   int ret = -1;
350
351   if (data_len > sizeof (mp->data))
352     goto done;
353
354   clib_spinlock_lock (&vcm->workers_lock);
355
356   dst_wrk = vcl_worker_get_if_valid (dst_wrk_index);
357   if (!dst_wrk)
358     goto done;
359
360   wrk = vcl_worker_get_current ();
361   mq = vcl_worker_ctrl_mq (wrk);
362   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_WRK_RPC);
363   mp = (session_app_wrk_rpc_msg_t *) app_evt->evt->data;
364   mp->client_index = wrk->api_client_handle;
365   mp->wrk_index = dst_wrk->vpp_wrk_index;
366   clib_memcpy (mp->data, data, data_len);
367   app_send_ctrl_evt_to_vpp (mq, app_evt);
368   ret = 0;
369
370 done:
371   clib_spinlock_unlock (&vcm->workers_lock);
372   return ret;
373 }
374
375 static u32
376 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
377                               u32 ls_index)
378 {
379   vcl_session_t *session, *listen_session;
380   svm_fifo_t *rx_fifo, *tx_fifo;
381   u32 vpp_wrk_index;
382   svm_msg_q_t *evt_q;
383
384   session = vcl_session_alloc (wrk);
385
386   listen_session = vcl_session_get (wrk, ls_index);
387   if (listen_session->vpp_handle != mp->listener_handle)
388     {
389       VDBG (0, "ERROR: listener handle %lu does not match session %u",
390             mp->listener_handle, ls_index);
391       goto error;
392     }
393
394   if (vcl_segment_is_not_mounted (wrk, mp->segment_handle))
395     {
396       VDBG (0, "ERROR: segment for session %u is not mounted!",
397             session->session_index);
398       goto error;
399     }
400
401   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
402   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
403   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
404                                          svm_msg_q_t *);
405   rx_fifo->client_session_index = session->session_index;
406   tx_fifo->client_session_index = session->session_index;
407   rx_fifo->client_thread_index = vcl_get_worker_index ();
408   tx_fifo->client_thread_index = vcl_get_worker_index ();
409   vpp_wrk_index = tx_fifo->master_thread_index;
410   vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
411   wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
412
413   session->vpp_handle = mp->handle;
414   session->vpp_thread_index = rx_fifo->master_thread_index;
415   session->rx_fifo = rx_fifo;
416   session->tx_fifo = tx_fifo;
417
418   session->session_state = VCL_STATE_READY;
419   session->transport.rmt_port = mp->rmt.port;
420   session->transport.is_ip4 = mp->rmt.is_ip4;
421   clib_memcpy_fast (&session->transport.rmt_ip, &mp->rmt.ip,
422                     sizeof (ip46_address_t));
423
424   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
425   session->transport.lcl_port = listen_session->transport.lcl_port;
426   session->transport.lcl_ip = listen_session->transport.lcl_ip;
427   session->session_type = listen_session->session_type;
428   session->is_dgram = vcl_proto_is_dgram (session->session_type);
429   session->listener_index = listen_session->session_index;
430   listen_session->n_accepted_sessions++;
431
432   VDBG (1, "session %u [0x%llx]: client accept request from %s address %U"
433         " port %d queue %p!", session->session_index, mp->handle,
434         mp->rmt.is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->rmt.ip,
435         mp->rmt.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
436         clib_net_to_host_u16 (mp->rmt.port), session->vpp_evt_q);
437   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
438
439   vcl_send_session_accepted_reply (session->vpp_evt_q, mp->context,
440                                    session->vpp_handle, 0);
441
442   return session->session_index;
443
444 error:
445   evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
446   vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
447                                    VNET_API_ERROR_INVALID_ARGUMENT);
448   vcl_session_free (wrk, session);
449   return VCL_INVALID_SESSION_INDEX;
450 }
451
452 static u32
453 vcl_session_connected_handler (vcl_worker_t * wrk,
454                                session_connected_msg_t * mp)
455 {
456   u32 session_index, vpp_wrk_index;
457   svm_fifo_t *rx_fifo, *tx_fifo;
458   vcl_session_t *session = 0;
459
460   session_index = mp->context;
461   session = vcl_session_get (wrk, session_index);
462   if (!session)
463     {
464       VDBG (0, "ERROR: vpp handle 0x%llx has no session index (%u)!",
465             mp->handle, session_index);
466       return VCL_INVALID_SESSION_INDEX;
467     }
468   if (mp->retval)
469     {
470       VDBG (0, "ERROR: session index %u: connect failed! %U",
471             session_index, format_session_error, mp->retval);
472       session->session_state = VCL_STATE_DETACHED;
473       session->vpp_handle = mp->handle;
474       return session_index;
475     }
476
477   session->vpp_handle = mp->handle;
478   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
479                                          svm_msg_q_t *);
480   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
481   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
482   if (vcl_segment_is_not_mounted (wrk, mp->segment_handle))
483     {
484       VDBG (0, "segment for session %u is not mounted!",
485             session->session_index);
486       session->session_state = VCL_STATE_DETACHED;
487       vcl_send_session_disconnect (wrk, session);
488       return session_index;
489     }
490
491   rx_fifo->client_session_index = session_index;
492   tx_fifo->client_session_index = session_index;
493   rx_fifo->client_thread_index = vcl_get_worker_index ();
494   tx_fifo->client_thread_index = vcl_get_worker_index ();
495
496   vpp_wrk_index = tx_fifo->master_thread_index;
497   vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
498   wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
499
500   if (mp->ct_rx_fifo)
501     {
502       session->ct_rx_fifo = uword_to_pointer (mp->ct_rx_fifo, svm_fifo_t *);
503       session->ct_tx_fifo = uword_to_pointer (mp->ct_tx_fifo, svm_fifo_t *);
504       if (vcl_segment_is_not_mounted (wrk, mp->ct_segment_handle))
505         {
506           VDBG (0, "ct segment for session %u is not mounted!",
507                 session->session_index);
508           session->session_state = VCL_STATE_DETACHED;
509           vcl_send_session_disconnect (wrk, session);
510           return session_index;
511         }
512     }
513
514   session->rx_fifo = rx_fifo;
515   session->tx_fifo = tx_fifo;
516   session->vpp_thread_index = rx_fifo->master_thread_index;
517   session->transport.is_ip4 = mp->lcl.is_ip4;
518   clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip,
519                     sizeof (session->transport.lcl_ip));
520   session->transport.lcl_port = mp->lcl.port;
521
522   /* Application closed session before connect reply */
523   if (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK)
524       && session->session_state == VCL_STATE_CLOSED)
525     vcl_send_session_disconnect (wrk, session);
526   else
527     session->session_state = VCL_STATE_READY;
528
529   /* Add it to lookup table */
530   vcl_session_table_add_vpp_handle (wrk, mp->handle, session_index);
531
532   VDBG (1, "session %u [0x%llx] connected! rx_fifo %p, refcnt %d, tx_fifo %p,"
533         " refcnt %d", session_index, mp->handle, session->rx_fifo,
534         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
535
536   return session_index;
537 }
538
539 static int
540 vcl_flag_accepted_session (vcl_session_t * session, u64 handle, u32 flags)
541 {
542   vcl_session_msg_t *accepted_msg;
543   int i;
544
545   for (i = 0; i < vec_len (session->accept_evts_fifo); i++)
546     {
547       accepted_msg = &session->accept_evts_fifo[i];
548       if (accepted_msg->accepted_msg.handle == handle)
549         {
550           accepted_msg->flags |= flags;
551           return 1;
552         }
553     }
554   return 0;
555 }
556
557 static u32
558 vcl_session_reset_handler (vcl_worker_t * wrk,
559                            session_reset_msg_t * reset_msg)
560 {
561   vcl_session_t *session;
562   u32 sid;
563
564   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
565   session = vcl_session_get (wrk, sid);
566   if (!session)
567     {
568       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
569       return VCL_INVALID_SESSION_INDEX;
570     }
571
572   /* Caught a reset before actually accepting the session */
573   if (session->session_state == VCL_STATE_LISTEN)
574     {
575
576       if (!vcl_flag_accepted_session (session, reset_msg->handle,
577                                       VCL_ACCEPTED_F_RESET))
578         VDBG (0, "session was not accepted!");
579       return VCL_INVALID_SESSION_INDEX;
580     }
581
582   if (session->session_state != VCL_STATE_CLOSED)
583     session->session_state = VCL_STATE_DISCONNECT;
584   VDBG (0, "reset session %u [0x%llx]", sid, reset_msg->handle);
585   return sid;
586 }
587
588 static u32
589 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
590 {
591   vcl_session_t *session;
592   u32 sid = mp->context;
593
594   session = vcl_session_get (wrk, sid);
595   if (mp->retval)
596     {
597       VERR ("session %u [0x%llx]: bind failed: %U", sid, mp->handle,
598             format_session_error, mp->retval);
599       if (session)
600         {
601           session->session_state = VCL_STATE_DETACHED;
602           session->vpp_handle = mp->handle;
603           return sid;
604         }
605       else
606         {
607           VDBG (0, "ERROR: session %u [0x%llx]: Invalid session index!",
608                 sid, mp->handle);
609           return VCL_INVALID_SESSION_INDEX;
610         }
611     }
612
613   session->vpp_handle = mp->handle;
614   session->transport.is_ip4 = mp->lcl_is_ip4;
615   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
616                     sizeof (ip46_address_t));
617   session->transport.lcl_port = mp->lcl_port;
618   vcl_session_table_add_listener (wrk, mp->handle, sid);
619   session->session_state = VCL_STATE_LISTEN;
620
621   session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
622   vec_validate (wrk->vpp_event_queues, 0);
623   wrk->vpp_event_queues[0] = session->vpp_evt_q;
624
625   if (vcl_session_is_cl (session))
626     {
627       svm_fifo_t *rx_fifo, *tx_fifo;
628       session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
629       rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
630       rx_fifo->client_session_index = sid;
631       tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
632       tx_fifo->client_session_index = sid;
633       session->rx_fifo = rx_fifo;
634       session->tx_fifo = tx_fifo;
635     }
636
637   VDBG (0, "session %u [0x%llx]: listen succeeded!", sid, mp->handle);
638   return sid;
639 }
640
641 static void
642 vcl_session_unlisten_reply_handler (vcl_worker_t * wrk, void *data)
643 {
644   session_unlisten_reply_msg_t *mp = (session_unlisten_reply_msg_t *) data;
645   vcl_session_t *s;
646
647   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
648   if (!s)
649     {
650       VDBG (0, "Unlisten reply with wrong handle %llx", mp->handle);
651       return;
652     }
653   if (s->session_state != VCL_STATE_DISCONNECT)
654     {
655       /* Connected udp listener */
656       if (s->session_type == VPPCOM_PROTO_UDP
657           && s->session_state == VCL_STATE_CLOSED)
658         return;
659
660       VDBG (0, "Unlisten session in wrong state %llx", mp->handle);
661       return;
662     }
663
664   if (mp->retval)
665     VDBG (0, "ERROR: session %u [0xllx]: unlisten failed: %U",
666           s->session_index, mp->handle, format_session_error, mp->retval);
667
668   if (mp->context != wrk->wrk_index)
669     VDBG (0, "wrong context");
670
671   vcl_session_table_del_vpp_handle (wrk, mp->handle);
672   vcl_session_free (wrk, s);
673 }
674
675 static void
676 vcl_session_migrated_handler (vcl_worker_t * wrk, void *data)
677 {
678   session_migrated_msg_t *mp = (session_migrated_msg_t *) data;
679   vcl_session_t *s;
680
681   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
682   if (!s)
683     {
684       VDBG (0, "Migrated notification with wrong handle %llx", mp->handle);
685       return;
686     }
687
688   s->vpp_thread_index = mp->vpp_thread_index;
689   s->vpp_handle = mp->new_handle;
690   s->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
691
692   vec_validate (wrk->vpp_event_queues, s->vpp_thread_index);
693   wrk->vpp_event_queues[s->vpp_thread_index] = s->vpp_evt_q;
694
695   vcl_session_table_del_vpp_handle (wrk, mp->handle);
696   vcl_session_table_add_vpp_handle (wrk, mp->new_handle, s->session_index);
697
698   /* Generate new tx event if we have outstanding data */
699   if (svm_fifo_has_event (s->tx_fifo))
700     app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo->master_session_index,
701                             SESSION_IO_EVT_TX, SVM_Q_WAIT);
702
703   VDBG (0, "Migrated 0x%lx to thread %u 0x%lx", mp->handle,
704         s->vpp_thread_index, mp->new_handle);
705 }
706
707 static vcl_session_t *
708 vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg)
709 {
710   vcl_session_msg_t *vcl_msg;
711   vcl_session_t *session;
712
713   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
714   if (PREDICT_FALSE (session != 0))
715     VWRN ("session overlap handle %lu state %u!", msg->handle,
716           session->session_state);
717
718   session = vcl_session_table_lookup_listener (wrk, msg->listener_handle);
719   if (!session)
720     {
721       VERR ("couldn't find listen session: listener handle %llx",
722             msg->listener_handle);
723       return 0;
724     }
725
726   clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
727   vcl_msg->flags = 0;
728   vcl_msg->accepted_msg = *msg;
729   /* Session handle points to listener until fully accepted by app */
730   vcl_session_table_add_vpp_handle (wrk, msg->handle, session->session_index);
731
732   return session;
733 }
734
735 static vcl_session_t *
736 vcl_session_disconnected_handler (vcl_worker_t * wrk,
737                                   session_disconnected_msg_t * msg)
738 {
739   vcl_session_t *session;
740
741   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
742   if (!session)
743     {
744       VDBG (0, "request to disconnect unknown handle 0x%llx", msg->handle);
745       return 0;
746     }
747
748   /* Late disconnect notification on a session that has been closed */
749   if (session->session_state == VCL_STATE_CLOSED)
750     return 0;
751
752   /* Caught a disconnect before actually accepting the session */
753   if (session->session_state == VCL_STATE_LISTEN)
754     {
755       if (!vcl_flag_accepted_session (session, msg->handle,
756                                       VCL_ACCEPTED_F_CLOSED))
757         VDBG (0, "session was not accepted!");
758       return 0;
759     }
760
761   /* If not already reset change state */
762   if (session->session_state != VCL_STATE_DISCONNECT)
763     session->session_state = VCL_STATE_VPP_CLOSING;
764
765   return session;
766 }
767
768 static void
769 vcl_session_cleanup_handler (vcl_worker_t * wrk, void *data)
770 {
771   session_cleanup_msg_t *msg;
772   vcl_session_t *session;
773
774   msg = (session_cleanup_msg_t *) data;
775   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
776   if (!session)
777     {
778       VDBG (0, "disconnect confirmed for unknown handle 0x%llx", msg->handle);
779       return;
780     }
781
782   if (msg->type == SESSION_CLEANUP_TRANSPORT)
783     {
784       /* Transport was cleaned up before we confirmed close. Probably the
785        * app is still waiting for some data that cannot be delivered.
786        * Confirm close to make sure everything is cleaned up */
787       if (session->session_state == VCL_STATE_VPP_CLOSING
788           || session->session_state == VCL_STATE_DISCONNECT)
789         {
790           vcl_session_cleanup (wrk, session, vcl_session_handle (session),
791                                1 /* do_disconnect */ );
792           /* Move to undetermined state to ensure that the session is not
793            * removed before both vpp and the app cleanup.
794            * - If the app closes first, the session is moved to CLOSED state
795            *   and the session cleanup notification from vpp removes the
796            *   session.
797            * - If vpp cleans up the session first, the session is moved to
798            *   DETACHED state lower and subsequently the close from the app
799            *   frees the session
800            */
801           session->session_state = VCL_STATE_UPDATED;
802         }
803       return;
804     }
805
806   vcl_session_table_del_vpp_handle (wrk, msg->handle);
807   /* Should not happen. App did not close the connection so don't free it. */
808   if (session->session_state != VCL_STATE_CLOSED)
809     {
810       VDBG (0, "app did not close session %d", session->session_index);
811       session->session_state = VCL_STATE_DETACHED;
812       session->vpp_handle = VCL_INVALID_SESSION_HANDLE;
813       return;
814     }
815   vcl_session_free (wrk, session);
816 }
817
818 static void
819 vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
820 {
821   session_req_worker_update_msg_t *msg;
822   vcl_session_t *s;
823
824   msg = (session_req_worker_update_msg_t *) data;
825   s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
826   if (!s)
827     return;
828
829   vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
830 }
831
832 static void
833 vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
834 {
835   session_worker_update_reply_msg_t *msg;
836   vcl_session_t *s;
837
838   msg = (session_worker_update_reply_msg_t *) data;
839   s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
840   if (!s)
841     {
842       VDBG (0, "unknown handle 0x%llx", msg->handle);
843       return;
844     }
845   if (vcl_segment_is_not_mounted (wrk, msg->segment_handle))
846     {
847       clib_warning ("segment for session %u is not mounted!",
848                     s->session_index);
849       return;
850     }
851
852   if (s->rx_fifo)
853     {
854       s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
855       s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
856       s->rx_fifo->client_session_index = s->session_index;
857       s->tx_fifo->client_session_index = s->session_index;
858       s->rx_fifo->client_thread_index = wrk->wrk_index;
859       s->tx_fifo->client_thread_index = wrk->wrk_index;
860     }
861   s->session_state = VCL_STATE_UPDATED;
862
863   VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
864         s->vpp_handle, wrk->wrk_index);
865 }
866
867 static int
868 vcl_api_recv_fd (vcl_worker_t * wrk, int *fds, int n_fds)
869 {
870
871   if (vcm->cfg.vpp_app_socket_api)
872     return vcl_sapi_recv_fds (wrk, fds, n_fds);
873
874   return vcl_bapi_recv_fds (wrk, fds, n_fds);
875 }
876
877 static void
878 vcl_session_app_add_segment_handler (vcl_worker_t * wrk, void *data)
879 {
880   ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM;
881   session_app_add_segment_msg_t *msg;
882   u64 segment_handle;
883   int fd = -1;
884
885   msg = (session_app_add_segment_msg_t *) data;
886
887   if (msg->fd_flags)
888     {
889       vcl_api_recv_fd (wrk, &fd, 1);
890       seg_type = SSVM_SEGMENT_MEMFD;
891     }
892
893   segment_handle = msg->segment_handle;
894   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
895     {
896       clib_warning ("invalid segment handle");
897       return;
898     }
899
900   if (vcl_segment_attach (segment_handle, (char *) msg->segment_name,
901                           seg_type, fd))
902     {
903       VDBG (0, "vcl_segment_attach ('%s') failed", msg->segment_name);
904       return;
905     }
906
907   VDBG (1, "mapped new segment '%s' size %d", msg->segment_name,
908         msg->segment_size);
909 }
910
911 static void
912 vcl_session_app_del_segment_handler (vcl_worker_t * wrk, void *data)
913 {
914   session_app_del_segment_msg_t *msg = (session_app_del_segment_msg_t *) data;
915   vcl_segment_detach (msg->segment_handle);
916   VDBG (1, "Unmapped segment: %d", msg->segment_handle);
917 }
918
919 static void
920 vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data)
921 {
922   if (!vcm->wrk_rpc_fn)
923     return;
924
925   (vcm->wrk_rpc_fn) (((session_app_wrk_rpc_msg_t *) data)->data);
926 }
927
928 static int
929 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
930 {
931   session_disconnected_msg_t *disconnected_msg;
932   vcl_session_t *s;
933
934   switch (e->event_type)
935     {
936     case SESSION_IO_EVT_RX:
937     case SESSION_IO_EVT_TX:
938       s = vcl_session_get (wrk, e->session_index);
939       if (!s || !vcl_session_is_open (s))
940         break;
941       vec_add1 (wrk->unhandled_evts_vector, *e);
942       break;
943     case SESSION_CTRL_EVT_ACCEPTED:
944       vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
945       break;
946     case SESSION_CTRL_EVT_CONNECTED:
947       vcl_session_connected_handler (wrk,
948                                      (session_connected_msg_t *) e->data);
949       break;
950     case SESSION_CTRL_EVT_DISCONNECTED:
951       disconnected_msg = (session_disconnected_msg_t *) e->data;
952       s = vcl_session_disconnected_handler (wrk, disconnected_msg);
953       if (!s)
954         break;
955       VDBG (0, "disconnected session %u [0x%llx]", s->session_index,
956             s->vpp_handle);
957       break;
958     case SESSION_CTRL_EVT_RESET:
959       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
960       break;
961     case SESSION_CTRL_EVT_BOUND:
962       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
963       break;
964     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
965       vcl_session_unlisten_reply_handler (wrk, e->data);
966       break;
967     case SESSION_CTRL_EVT_MIGRATED:
968       vcl_session_migrated_handler (wrk, e->data);
969       break;
970     case SESSION_CTRL_EVT_CLEANUP:
971       vcl_session_cleanup_handler (wrk, e->data);
972       break;
973     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
974       vcl_session_req_worker_update_handler (wrk, e->data);
975       break;
976     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
977       vcl_session_worker_update_reply_handler (wrk, e->data);
978       break;
979     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
980       vcl_session_app_add_segment_handler (wrk, e->data);
981       break;
982     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
983       vcl_session_app_del_segment_handler (wrk, e->data);
984       break;
985     case SESSION_CTRL_EVT_APP_WRK_RPC:
986       vcl_worker_rpc_handler (wrk, e->data);
987       break;
988     default:
989       clib_warning ("unhandled %u", e->event_type);
990     }
991   return VPPCOM_OK;
992 }
993
994 static int
995 vppcom_wait_for_session_state_change (u32 session_index,
996                                       vcl_session_state_t state,
997                                       f64 wait_for_time)
998 {
999   vcl_worker_t *wrk = vcl_worker_get_current ();
1000   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
1001   vcl_session_t *volatile session;
1002   svm_msg_q_msg_t msg;
1003   session_event_t *e;
1004
1005   do
1006     {
1007       session = vcl_session_get (wrk, session_index);
1008       if (PREDICT_FALSE (!session))
1009         {
1010           return VPPCOM_EBADFD;
1011         }
1012       if (session->session_state == state)
1013         {
1014           return VPPCOM_OK;
1015         }
1016       if (session->session_state == VCL_STATE_DETACHED)
1017         {
1018           return VPPCOM_ECONNREFUSED;
1019         }
1020
1021       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
1022         {
1023           usleep (100);
1024           continue;
1025         }
1026       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1027       vcl_handle_mq_event (wrk, e);
1028       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1029     }
1030   while (clib_time_now (&wrk->clib_time) < timeout);
1031
1032   VDBG (0, "timeout waiting for state 0x%x (%s)", state,
1033         vppcom_session_state_str (state));
1034   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
1035
1036   return VPPCOM_ETIMEDOUT;
1037 }
1038
1039 static void
1040 vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
1041 {
1042   vcl_session_state_t state;
1043   vcl_session_t *s;
1044   u32 *sip;
1045
1046   if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
1047     return;
1048
1049   vec_foreach (sip, wrk->pending_session_wrk_updates)
1050   {
1051     s = vcl_session_get (wrk, *sip);
1052     vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
1053     state = s->session_state;
1054     vppcom_wait_for_session_state_change (s->session_index, VCL_STATE_UPDATED,
1055                                           5);
1056     s->session_state = state;
1057   }
1058   vec_reset_length (wrk->pending_session_wrk_updates);
1059 }
1060
1061 void
1062 vcl_flush_mq_events (void)
1063 {
1064   vcl_worker_t *wrk = vcl_worker_get_current ();
1065   svm_msg_q_msg_t *msg;
1066   session_event_t *e;
1067   svm_msg_q_t *mq;
1068   int i;
1069
1070   mq = wrk->app_event_queue;
1071   svm_msg_q_lock (mq);
1072   vcl_mq_dequeue_batch (wrk, mq, ~0);
1073   svm_msg_q_unlock (mq);
1074
1075   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
1076     {
1077       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
1078       e = svm_msg_q_msg_data (mq, msg);
1079       vcl_handle_mq_event (wrk, e);
1080       svm_msg_q_free_msg (mq, msg);
1081     }
1082   vec_reset_length (wrk->mq_msg_vector);
1083   vcl_handle_pending_wrk_updates (wrk);
1084 }
1085
1086 static int
1087 vppcom_session_unbind (u32 session_handle)
1088 {
1089   vcl_worker_t *wrk = vcl_worker_get_current ();
1090   session_accepted_msg_t *accepted_msg;
1091   vcl_session_t *session = 0;
1092   vcl_session_msg_t *evt;
1093
1094   session = vcl_session_get_w_handle (wrk, session_handle);
1095   if (!session)
1096     return VPPCOM_EBADFD;
1097
1098   /* Flush pending accept events, if any */
1099   while (clib_fifo_elts (session->accept_evts_fifo))
1100     {
1101       clib_fifo_sub2 (session->accept_evts_fifo, evt);
1102       accepted_msg = &evt->accepted_msg;
1103       vcl_session_table_del_vpp_handle (wrk, accepted_msg->handle);
1104       vcl_send_session_accepted_reply (session->vpp_evt_q,
1105                                        accepted_msg->context,
1106                                        session->vpp_handle, -1);
1107     }
1108   clib_fifo_free (session->accept_evts_fifo);
1109
1110   vcl_send_session_unlisten (wrk, session);
1111
1112   VDBG (1, "session %u [0x%llx]: sending unbind!", session->session_index,
1113         session->vpp_handle);
1114   vcl_evt (VCL_EVT_UNBIND, session);
1115
1116   session->vpp_handle = ~0;
1117   session->session_state = VCL_STATE_DISCONNECT;
1118
1119   return VPPCOM_OK;
1120 }
1121
1122 static int
1123 vppcom_session_disconnect (u32 session_handle)
1124 {
1125   vcl_worker_t *wrk = vcl_worker_get_current ();
1126   svm_msg_q_t *vpp_evt_q;
1127   vcl_session_t *session, *listen_session;
1128   vcl_session_state_t state;
1129   u64 vpp_handle;
1130
1131   session = vcl_session_get_w_handle (wrk, session_handle);
1132   if (!session)
1133     return VPPCOM_EBADFD;
1134
1135   vpp_handle = session->vpp_handle;
1136   state = session->session_state;
1137
1138   VDBG (1, "session %u [0x%llx] state 0x%x (%s)", session->session_index,
1139         vpp_handle, state, vppcom_session_state_str (state));
1140
1141   if (PREDICT_FALSE (state == VCL_STATE_LISTEN))
1142     {
1143       VDBG (0, "ERROR: Cannot disconnect a listen socket!");
1144       return VPPCOM_EBADFD;
1145     }
1146
1147   if (state == VCL_STATE_VPP_CLOSING)
1148     {
1149       vpp_evt_q = vcl_session_vpp_evt_q (wrk, session);
1150       vcl_send_session_disconnected_reply (vpp_evt_q, wrk->api_client_handle,
1151                                            vpp_handle, 0);
1152       VDBG (1, "session %u [0x%llx]: sending disconnect REPLY...",
1153             session->session_index, vpp_handle);
1154     }
1155   else
1156     {
1157       /* Session doesn't have an event queue yet. Probably a non-blocking
1158        * connect. Wait for the reply */
1159       if (PREDICT_FALSE (!session->vpp_evt_q))
1160         return VPPCOM_OK;
1161
1162       VDBG (1, "session %u [0x%llx]: sending disconnect...",
1163             session->session_index, vpp_handle);
1164       vcl_send_session_disconnect (wrk, session);
1165     }
1166
1167   if (session->listener_index != VCL_INVALID_SESSION_INDEX)
1168     {
1169       listen_session = vcl_session_get (wrk, session->listener_index);
1170       listen_session->n_accepted_sessions--;
1171     }
1172
1173   return VPPCOM_OK;
1174 }
1175
1176 /**
1177  * Handle app exit
1178  *
1179  * Notify vpp of the disconnect and mark the worker as free. If we're the
1180  * last worker, do a full cleanup otherwise, since we're probably a forked
1181  * child, avoid syscalls as much as possible. We might've lost privileges.
1182  */
1183 void
1184 vppcom_app_exit (void)
1185 {
1186   if (!pool_elts (vcm->workers))
1187     return;
1188   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
1189   vcl_set_worker_index (~0);
1190   vcl_elog_stop (vcm);
1191 }
1192
1193 static int
1194 vcl_api_attach (void)
1195 {
1196   if (vcm->cfg.vpp_app_socket_api)
1197     return vcl_sapi_attach ();
1198
1199   return vcl_bapi_attach ();
1200 }
1201
1202 static void
1203 vcl_api_detach (vcl_worker_t * wrk)
1204 {
1205   vcl_send_app_detach (wrk);
1206
1207   if (vcm->cfg.vpp_app_socket_api)
1208     return vcl_sapi_detach (wrk);
1209
1210   return vcl_bapi_disconnect_from_vpp ();
1211 }
1212
1213 /*
1214  * VPPCOM Public API functions
1215  */
1216 int
1217 vppcom_app_create (const char *app_name)
1218 {
1219   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
1220   int rv;
1221
1222   if (vcm->is_init)
1223     {
1224       VDBG (1, "already initialized");
1225       return VPPCOM_EEXIST;
1226     }
1227
1228   vcm->is_init = 1;
1229   vppcom_cfg (&vcm->cfg);
1230   vcl_cfg = &vcm->cfg;
1231
1232   vcm->main_cpu = pthread_self ();
1233   vcm->main_pid = getpid ();
1234   vcm->app_name = format (0, "%s", app_name);
1235   fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
1236                           20 /* timeout in secs */ );
1237   pool_alloc (vcm->workers, vcl_cfg->max_workers);
1238   clib_spinlock_init (&vcm->workers_lock);
1239   clib_rwlock_init (&vcm->segment_table_lock);
1240   atexit (vppcom_app_exit);
1241   vcl_elog_init (vcm);
1242
1243   /* Allocate default worker */
1244   vcl_worker_alloc_and_init ();
1245
1246   if ((rv = vcl_api_attach ()))
1247     return rv;
1248
1249   VDBG (0, "app_name '%s', my_client_index %d (0x%x)", app_name,
1250         vcm->workers[0].api_client_handle, vcm->workers[0].api_client_handle);
1251
1252   return VPPCOM_OK;
1253 }
1254
1255 void
1256 vppcom_app_destroy (void)
1257 {
1258   vcl_worker_t *wrk, *current_wrk;
1259   void *heap;
1260
1261   if (!pool_elts (vcm->workers))
1262     return;
1263
1264   vcl_evt (VCL_EVT_DETACH, vcm);
1265
1266   current_wrk = vcl_worker_get_current ();
1267
1268   /* *INDENT-OFF* */
1269   pool_foreach (wrk, vcm->workers, ({
1270     if (current_wrk != wrk)
1271       vcl_worker_cleanup (wrk, 0 /* notify vpp */ );
1272   }));
1273   /* *INDENT-ON* */
1274
1275   vcl_api_detach (current_wrk);
1276   vcl_worker_cleanup (current_wrk, 0 /* notify vpp */ );
1277
1278   vcl_elog_stop (vcm);
1279
1280   /*
1281    * Free the heap and fix vcm
1282    */
1283   heap = clib_mem_get_heap ();
1284   munmap (clib_mem_get_heap_base (heap), clib_mem_get_heap_size (heap));
1285
1286   vcm = &_vppcom_main;
1287   vcm->is_init = 0;
1288 }
1289
1290 int
1291 vppcom_session_create (u8 proto, u8 is_nonblocking)
1292 {
1293   vcl_worker_t *wrk = vcl_worker_get_current ();
1294   vcl_session_t *session;
1295
1296   session = vcl_session_alloc (wrk);
1297
1298   session->session_type = proto;
1299   session->session_state = VCL_STATE_CLOSED;
1300   session->vpp_handle = ~0;
1301   session->is_dgram = vcl_proto_is_dgram (proto);
1302
1303   if (is_nonblocking)
1304     vcl_session_set_attr (session, VCL_SESS_ATTR_NONBLOCK);
1305
1306   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
1307            is_nonblocking, session_index);
1308
1309   VDBG (0, "created session %u", session->session_index);
1310
1311   return vcl_session_handle (session);
1312 }
1313
1314 int
1315 vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * s,
1316                      vcl_session_handle_t sh, u8 do_disconnect)
1317 {
1318   int rv = VPPCOM_OK;
1319
1320   VDBG (1, "session %u [0x%llx] closing", s->session_index, s->vpp_handle);
1321
1322   if (s->flags & VCL_SESSION_F_IS_VEP)
1323     {
1324       u32 next_sh = s->vep.next_sh;
1325       while (next_sh != ~0)
1326         {
1327           rv = vppcom_epoll_ctl (sh, EPOLL_CTL_DEL, next_sh, 0);
1328           if (PREDICT_FALSE (rv < 0))
1329             VDBG (0, "vpp handle 0x%llx, sh %u: EPOLL_CTL_DEL vep_idx %u"
1330                   " failed! rv %d (%s)", s->vpp_handle, next_sh,
1331                   s->vep.vep_sh, rv, vppcom_retval_str (rv));
1332           next_sh = s->vep.next_sh;
1333         }
1334       goto cleanup;
1335     }
1336
1337   if (s->flags & VCL_SESSION_F_IS_VEP_SESSION)
1338     {
1339       rv = vppcom_epoll_ctl (s->vep.vep_sh, EPOLL_CTL_DEL, sh, 0);
1340       if (rv < 0)
1341         VDBG (0, "session %u [0x%llx]: EPOLL_CTL_DEL vep_idx %u "
1342               "failed! rv %d (%s)", s->session_index, s->vpp_handle,
1343               s->vep.vep_sh, rv, vppcom_retval_str (rv));
1344     }
1345
1346   if (!do_disconnect)
1347     {
1348       VDBG (1, "session %u [0x%llx] disconnect skipped",
1349             s->session_index, s->vpp_handle);
1350       goto cleanup;
1351     }
1352
1353   if (s->session_state == VCL_STATE_LISTEN)
1354     {
1355       rv = vppcom_session_unbind (sh);
1356       if (PREDICT_FALSE (rv < 0))
1357         VDBG (0, "session %u [0x%llx]: listener unbind failed! "
1358               "rv %d (%s)", s->session_index, s->vpp_handle, rv,
1359               vppcom_retval_str (rv));
1360       return rv;
1361     }
1362   else if (vcl_session_is_ready (s)
1363            || (vcl_session_is_connectable_listener (wrk, s)))
1364     {
1365       rv = vppcom_session_disconnect (sh);
1366       if (PREDICT_FALSE (rv < 0))
1367         VDBG (0, "ERROR: session %u [0x%llx]: disconnect failed!"
1368               " rv %d (%s)", s->session_index, s->vpp_handle,
1369               rv, vppcom_retval_str (rv));
1370     }
1371   else if (s->session_state == VCL_STATE_DISCONNECT)
1372     {
1373       svm_msg_q_t *mq = vcl_session_vpp_evt_q (wrk, s);
1374       vcl_send_session_reset_reply (mq, wrk->api_client_handle,
1375                                     s->vpp_handle, 0);
1376     }
1377   else if (s->session_state == VCL_STATE_DETACHED)
1378     {
1379       /* Should not happen. VPP cleaned up before app confirmed close */
1380       VDBG (0, "vpp freed session %d before close", s->session_index);
1381       goto free_session;
1382     }
1383
1384   s->session_state = VCL_STATE_CLOSED;
1385
1386   /* Session is removed only after vpp confirms the disconnect */
1387   return rv;
1388
1389 cleanup:
1390   vcl_session_table_del_vpp_handle (wrk, s->vpp_handle);
1391 free_session:
1392   vcl_session_free (wrk, s);
1393   vcl_evt (VCL_EVT_CLOSE, s, rv);
1394
1395   return rv;
1396 }
1397
1398 int
1399 vppcom_session_close (uint32_t session_handle)
1400 {
1401   vcl_worker_t *wrk = vcl_worker_get_current ();
1402   vcl_session_t *session;
1403
1404   session = vcl_session_get_w_handle (wrk, session_handle);
1405   if (!session)
1406     return VPPCOM_EBADFD;
1407   return vcl_session_cleanup (wrk, session, session_handle,
1408                               1 /* do_disconnect */ );
1409 }
1410
1411 int
1412 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
1413 {
1414   vcl_worker_t *wrk = vcl_worker_get_current ();
1415   vcl_session_t *session = 0;
1416
1417   if (!ep || !ep->ip)
1418     return VPPCOM_EINVAL;
1419
1420   session = vcl_session_get_w_handle (wrk, session_handle);
1421   if (!session)
1422     return VPPCOM_EBADFD;
1423
1424   if (session->flags & VCL_SESSION_F_IS_VEP)
1425     {
1426       VDBG (0, "ERROR: cannot bind to epoll session %u!",
1427             session->session_index);
1428       return VPPCOM_EBADFD;
1429     }
1430
1431   session->transport.is_ip4 = ep->is_ip4;
1432   if (ep->is_ip4)
1433     clib_memcpy_fast (&session->transport.lcl_ip.ip4, ep->ip,
1434                       sizeof (ip4_address_t));
1435   else
1436     clib_memcpy_fast (&session->transport.lcl_ip.ip6, ep->ip,
1437                       sizeof (ip6_address_t));
1438   session->transport.lcl_port = ep->port;
1439
1440   VDBG (0, "session %u handle %u: binding to local %s address %U port %u, "
1441         "proto %s", session->session_index, session_handle,
1442         session->transport.is_ip4 ? "IPv4" : "IPv6",
1443         format_ip46_address, &session->transport.lcl_ip,
1444         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1445         clib_net_to_host_u16 (session->transport.lcl_port),
1446         vppcom_proto_str (session->session_type));
1447   vcl_evt (VCL_EVT_BIND, session);
1448
1449   if (session->session_type == VPPCOM_PROTO_UDP)
1450     vppcom_session_listen (session_handle, 10);
1451
1452   return VPPCOM_OK;
1453 }
1454
1455 int
1456 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
1457 {
1458   vcl_worker_t *wrk = vcl_worker_get_current ();
1459   vcl_session_t *listen_session = 0;
1460   u64 listen_vpp_handle;
1461   int rv;
1462
1463   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1464   if (!listen_session || (listen_session->flags & VCL_SESSION_F_IS_VEP))
1465     return VPPCOM_EBADFD;
1466
1467   if (q_len == 0 || q_len == ~0)
1468     q_len = vcm->cfg.listen_queue_size;
1469
1470   listen_vpp_handle = listen_session->vpp_handle;
1471   if (listen_session->session_state == VCL_STATE_LISTEN)
1472     {
1473       VDBG (0, "session %u [0x%llx]: already in listen state!",
1474             listen_sh, listen_vpp_handle);
1475       return VPPCOM_OK;
1476     }
1477
1478   VDBG (0, "session %u: sending vpp listen request...", listen_sh);
1479
1480   /*
1481    * Send listen request to vpp and wait for reply
1482    */
1483   vcl_send_session_listen (wrk, listen_session);
1484   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1485                                              VCL_STATE_LISTEN,
1486                                              vcm->cfg.session_timeout);
1487
1488   if (PREDICT_FALSE (rv))
1489     {
1490       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1491       VDBG (0, "session %u [0x%llx]: listen failed! returning %d (%s)",
1492             listen_sh, listen_session->vpp_handle, rv,
1493             vppcom_retval_str (rv));
1494       return rv;
1495     }
1496
1497   return VPPCOM_OK;
1498 }
1499
1500 static int
1501 validate_args_session_accept_ (vcl_worker_t * wrk, vcl_session_t * ls)
1502 {
1503   if (ls->flags & VCL_SESSION_F_IS_VEP)
1504     {
1505       VDBG (0, "ERROR: cannot accept on epoll session %u!",
1506             ls->session_index);
1507       return VPPCOM_EBADFD;
1508     }
1509
1510   if ((ls->session_state != VCL_STATE_LISTEN)
1511       && (!vcl_session_is_connectable_listener (wrk, ls)))
1512     {
1513       VDBG (0, "ERROR: session [0x%llx]: not in listen state! state 0x%x"
1514             " (%s)", ls->vpp_handle, ls->session_state,
1515             vppcom_session_state_str (ls->session_state));
1516       return VPPCOM_EBADFD;
1517     }
1518   return VPPCOM_OK;
1519 }
1520
1521 int
1522 vppcom_unformat_proto (uint8_t * proto, char *proto_str)
1523 {
1524   if (!strcmp (proto_str, "TCP"))
1525     *proto = VPPCOM_PROTO_TCP;
1526   else if (!strcmp (proto_str, "tcp"))
1527     *proto = VPPCOM_PROTO_TCP;
1528   else if (!strcmp (proto_str, "UDP"))
1529     *proto = VPPCOM_PROTO_UDP;
1530   else if (!strcmp (proto_str, "udp"))
1531     *proto = VPPCOM_PROTO_UDP;
1532   else if (!strcmp (proto_str, "TLS"))
1533     *proto = VPPCOM_PROTO_TLS;
1534   else if (!strcmp (proto_str, "tls"))
1535     *proto = VPPCOM_PROTO_TLS;
1536   else if (!strcmp (proto_str, "QUIC"))
1537     *proto = VPPCOM_PROTO_QUIC;
1538   else if (!strcmp (proto_str, "quic"))
1539     *proto = VPPCOM_PROTO_QUIC;
1540   else
1541     return 1;
1542   return 0;
1543 }
1544
1545 int
1546 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1547                        uint32_t flags)
1548 {
1549   u32 client_session_index = ~0, listen_session_index, accept_flags = 0;
1550   vcl_worker_t *wrk = vcl_worker_get_current ();
1551   session_accepted_msg_t accepted_msg;
1552   vcl_session_t *listen_session = 0;
1553   vcl_session_t *client_session = 0;
1554   vcl_session_msg_t *evt;
1555   svm_msg_q_msg_t msg;
1556   session_event_t *e;
1557   u8 is_nonblocking;
1558   int rv;
1559
1560   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1561   if (!listen_session)
1562     return VPPCOM_EBADFD;
1563
1564   listen_session_index = listen_session->session_index;
1565   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1566     return rv;
1567
1568   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1569     {
1570       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1571       accept_flags = evt->flags;
1572       accepted_msg = evt->accepted_msg;
1573       goto handle;
1574     }
1575
1576   is_nonblocking = vcl_session_has_attr (listen_session,
1577                                          VCL_SESS_ATTR_NONBLOCK);
1578   while (1)
1579     {
1580       if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1581         return VPPCOM_EAGAIN;
1582
1583       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1584         return VPPCOM_EAGAIN;
1585
1586       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1587       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1588         {
1589           vcl_handle_mq_event (wrk, e);
1590           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1591           continue;
1592         }
1593       clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
1594       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1595       break;
1596     }
1597
1598 handle:
1599
1600   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg,
1601                                                        listen_session_index);
1602   if (client_session_index == VCL_INVALID_SESSION_INDEX)
1603     return VPPCOM_ECONNABORTED;
1604
1605   listen_session = vcl_session_get (wrk, listen_session_index);
1606   client_session = vcl_session_get (wrk, client_session_index);
1607
1608   if (flags & O_NONBLOCK)
1609     vcl_session_set_attr (client_session, VCL_SESS_ATTR_NONBLOCK);
1610
1611   VDBG (1, "listener %u [0x%llx]: Got a connect request! session %u [0x%llx],"
1612         " flags %d, is_nonblocking %u", listen_session->session_index,
1613         listen_session->vpp_handle, client_session_index,
1614         client_session->vpp_handle, flags,
1615         vcl_session_has_attr (client_session, VCL_SESS_ATTR_NONBLOCK));
1616
1617   if (ep)
1618     {
1619       ep->is_ip4 = client_session->transport.is_ip4;
1620       ep->port = client_session->transport.rmt_port;
1621       if (client_session->transport.is_ip4)
1622         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip4,
1623                           sizeof (ip4_address_t));
1624       else
1625         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip6,
1626                           sizeof (ip6_address_t));
1627     }
1628
1629   VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
1630         "local: %U:%u", listen_session_handle, listen_session->vpp_handle,
1631         client_session_index, client_session->vpp_handle,
1632         format_ip46_address, &client_session->transport.rmt_ip,
1633         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1634         clib_net_to_host_u16 (client_session->transport.rmt_port),
1635         format_ip46_address, &client_session->transport.lcl_ip,
1636         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1637         clib_net_to_host_u16 (client_session->transport.lcl_port));
1638   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1639            client_session_index);
1640
1641   /*
1642    * Session might have been closed already
1643    */
1644   if (accept_flags)
1645     {
1646       if (accept_flags & VCL_ACCEPTED_F_CLOSED)
1647         client_session->session_state = VCL_STATE_VPP_CLOSING;
1648       else if (accept_flags & VCL_ACCEPTED_F_RESET)
1649         client_session->session_state = VCL_STATE_DISCONNECT;
1650     }
1651   return vcl_session_handle (client_session);
1652 }
1653
1654 int
1655 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1656 {
1657   vcl_worker_t *wrk = vcl_worker_get_current ();
1658   vcl_session_t *session = 0;
1659   u32 session_index;
1660   int rv;
1661
1662   session = vcl_session_get_w_handle (wrk, session_handle);
1663   if (!session)
1664     return VPPCOM_EBADFD;
1665   session_index = session->session_index;
1666
1667   if (PREDICT_FALSE (session->flags & VCL_SESSION_F_IS_VEP))
1668     {
1669       VDBG (0, "ERROR: cannot connect epoll session %u!",
1670             session->session_index);
1671       return VPPCOM_EBADFD;
1672     }
1673
1674   if (PREDICT_FALSE (vcl_session_is_ready (session)))
1675     {
1676       VDBG (0, "session handle %u [0x%llx]: session already "
1677             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1678             session_handle, session->vpp_handle,
1679             session->transport.is_ip4 ? "IPv4" : "IPv6", format_ip46_address,
1680             &session->transport.rmt_ip, session->transport.is_ip4 ?
1681             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1682             clib_net_to_host_u16 (session->transport.rmt_port),
1683             vppcom_proto_str (session->session_type), session->session_state,
1684             vppcom_session_state_str (session->session_state));
1685       return VPPCOM_OK;
1686     }
1687
1688   /* Attempt to connect a connectionless listener */
1689   if (PREDICT_FALSE (session->session_state == VCL_STATE_LISTEN))
1690     {
1691       if (session->session_type != VPPCOM_PROTO_UDP)
1692         return VPPCOM_EINVAL;
1693       vcl_send_session_unlisten (wrk, session);
1694       session->session_state = VCL_STATE_CLOSED;
1695     }
1696
1697   session->transport.is_ip4 = server_ep->is_ip4;
1698   vcl_ip_copy_from_ep (&session->transport.rmt_ip, server_ep);
1699   session->transport.rmt_port = server_ep->port;
1700   session->parent_handle = VCL_INVALID_SESSION_HANDLE;
1701   session->flags |= VCL_SESSION_F_CONNECTED;
1702
1703   VDBG (0, "session handle %u (%s): connecting to peer %s %U "
1704         "port %d proto %s", session_handle,
1705         vppcom_session_state_str (session->session_state),
1706         session->transport.is_ip4 ? "IPv4" : "IPv6",
1707         format_ip46_address,
1708         &session->transport.rmt_ip, session->transport.is_ip4 ?
1709         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1710         clib_net_to_host_u16 (session->transport.rmt_port),
1711         vppcom_proto_str (session->session_type));
1712
1713   vcl_send_session_connect (wrk, session);
1714
1715   if (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK))
1716     {
1717       /* State set to STATE_UPDATED to ensure the session is not assumed
1718        * to be ready and to also allow the app to close it prior to vpp's
1719        * connected reply. */
1720       session->session_state = VCL_STATE_UPDATED;
1721       return VPPCOM_EINPROGRESS;
1722     }
1723
1724   /*
1725    * Wait for reply from vpp if blocking
1726    */
1727   rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY,
1728                                              vcm->cfg.session_timeout);
1729
1730   session = vcl_session_get (wrk, session_index);
1731   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1732         session->vpp_handle, rv ? "failed" : "succeeded");
1733
1734   return rv;
1735 }
1736
1737 int
1738 vppcom_session_stream_connect (uint32_t session_handle,
1739                                uint32_t parent_session_handle)
1740 {
1741   vcl_worker_t *wrk = vcl_worker_get_current ();
1742   vcl_session_t *session, *parent_session;
1743   u32 session_index, parent_session_index;
1744   int rv;
1745
1746   session = vcl_session_get_w_handle (wrk, session_handle);
1747   if (!session)
1748     return VPPCOM_EBADFD;
1749   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1750   if (!parent_session)
1751     return VPPCOM_EBADFD;
1752
1753   session_index = session->session_index;
1754   parent_session_index = parent_session->session_index;
1755   if (PREDICT_FALSE (session->flags & VCL_SESSION_F_IS_VEP))
1756     {
1757       VDBG (0, "ERROR: cannot connect epoll session %u!",
1758             session->session_index);
1759       return VPPCOM_EBADFD;
1760     }
1761
1762   if (PREDICT_FALSE (vcl_session_is_ready (session)))
1763     {
1764       VDBG (0, "session handle %u [0x%llx]: session already "
1765             "connected to session %u [0x%llx] proto %s, state 0x%x (%s)",
1766             session_handle, session->vpp_handle,
1767             parent_session_handle, parent_session->vpp_handle,
1768             vppcom_proto_str (session->session_type), session->session_state,
1769             vppcom_session_state_str (session->session_state));
1770       return VPPCOM_OK;
1771     }
1772
1773   /* Connect to quic session specifics */
1774   session->transport.is_ip4 = parent_session->transport.is_ip4;
1775   session->transport.rmt_ip.ip4.as_u32 = (uint32_t) 1;
1776   session->transport.rmt_port = 0;
1777   session->parent_handle = parent_session->vpp_handle;
1778
1779   VDBG (0, "session handle %u: connecting to session %u [0x%llx]",
1780         session_handle, parent_session_handle, parent_session->vpp_handle);
1781
1782   /*
1783    * Send connect request and wait for reply from vpp
1784    */
1785   vcl_send_session_connect (wrk, session);
1786   rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY,
1787                                              vcm->cfg.session_timeout);
1788
1789   session->listener_index = parent_session_index;
1790   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1791   if (parent_session)
1792     parent_session->n_accepted_sessions++;
1793
1794   session = vcl_session_get (wrk, session_index);
1795   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1796         session->vpp_handle, rv ? "failed" : "succeeded");
1797
1798   return rv;
1799 }
1800
1801 static u8
1802 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1803 {
1804   return (e->event_type == SESSION_IO_EVT_RX && e->session_index == sid);
1805 }
1806
1807 static inline int
1808 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1809                               u8 peek)
1810 {
1811   vcl_worker_t *wrk = vcl_worker_get_current ();
1812   int rv, n_read = 0, is_nonblocking;
1813   vcl_session_t *s = 0;
1814   svm_fifo_t *rx_fifo;
1815   svm_msg_q_msg_t msg;
1816   session_event_t *e;
1817   svm_msg_q_t *mq;
1818   u8 is_ct;
1819
1820   if (PREDICT_FALSE (!buf))
1821     return VPPCOM_EINVAL;
1822
1823   s = vcl_session_get_w_handle (wrk, session_handle);
1824   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
1825     return VPPCOM_EBADFD;
1826
1827   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1828     {
1829       VDBG (0, "session %u[0x%llx] is not open! state 0x%x (%s)",
1830             s->session_index, s->vpp_handle, s->session_state,
1831             vppcom_session_state_str (s->session_state));
1832       return vcl_session_closed_error (s);
1833     }
1834
1835   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
1836   is_ct = vcl_session_is_ct (s);
1837   mq = wrk->app_event_queue;
1838   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
1839   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
1840
1841   if (svm_fifo_is_empty_cons (rx_fifo))
1842     {
1843       if (is_nonblocking)
1844         {
1845           if (vcl_session_is_closing (s))
1846             return vcl_session_closing_error (s);
1847           svm_fifo_unset_event (s->rx_fifo);
1848           return VPPCOM_EWOULDBLOCK;
1849         }
1850       while (svm_fifo_is_empty_cons (rx_fifo))
1851         {
1852           if (vcl_session_is_closing (s))
1853             return vcl_session_closing_error (s);
1854
1855           svm_fifo_unset_event (s->rx_fifo);
1856           svm_msg_q_lock (mq);
1857           if (svm_msg_q_is_empty (mq))
1858             svm_msg_q_wait (mq);
1859
1860           svm_msg_q_sub_w_lock (mq, &msg);
1861           e = svm_msg_q_msg_data (mq, &msg);
1862           svm_msg_q_unlock (mq);
1863           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1864             vcl_handle_mq_event (wrk, e);
1865           svm_msg_q_free_msg (mq, &msg);
1866         }
1867     }
1868
1869 read_again:
1870
1871   if (s->is_dgram)
1872     rv = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1873   else
1874     rv = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1875
1876   ASSERT (rv >= 0);
1877   n_read += rv;
1878
1879   if (svm_fifo_is_empty_cons (rx_fifo))
1880     {
1881       svm_fifo_unset_event (s->rx_fifo);
1882       if (!svm_fifo_is_empty_cons (rx_fifo)
1883           && svm_fifo_set_event (s->rx_fifo) && is_nonblocking)
1884         {
1885           vec_add2 (wrk->unhandled_evts_vector, e, 1);
1886           e->event_type = SESSION_IO_EVT_RX;
1887           e->session_index = s->session_index;
1888         }
1889     }
1890   else if (PREDICT_FALSE (rv < n))
1891     {
1892       /* More data enqueued while reading. Try to drain it
1893        * or fill the buffer */
1894       buf += rv;
1895       n -= rv;
1896       goto read_again;
1897     }
1898
1899   if (PREDICT_FALSE (svm_fifo_needs_deq_ntf (rx_fifo, n_read)))
1900     {
1901       svm_fifo_clear_deq_ntf (rx_fifo);
1902       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index,
1903                               SESSION_IO_EVT_RX, SVM_Q_WAIT);
1904     }
1905
1906   VDBG (2, "session %u[0x%llx]: read %d bytes from (%p)", s->session_index,
1907         s->vpp_handle, n_read, rx_fifo);
1908
1909   return n_read;
1910 }
1911
1912 int
1913 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1914 {
1915   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1916 }
1917
1918 static int
1919 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1920 {
1921   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1922 }
1923
1924 int
1925 vppcom_session_read_segments (uint32_t session_handle,
1926                               vppcom_data_segment_t * ds, uint32_t n_segments,
1927                               uint32_t max_bytes)
1928 {
1929   vcl_worker_t *wrk = vcl_worker_get_current ();
1930   int n_read = 0, is_nonblocking;
1931   vcl_session_t *s = 0;
1932   svm_fifo_t *rx_fifo;
1933   svm_msg_q_msg_t msg;
1934   session_event_t *e;
1935   svm_msg_q_t *mq;
1936   u8 is_ct;
1937
1938   s = vcl_session_get_w_handle (wrk, session_handle);
1939   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
1940     return VPPCOM_EBADFD;
1941
1942   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1943     return vcl_session_closed_error (s);
1944
1945   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
1946   is_ct = vcl_session_is_ct (s);
1947   mq = wrk->app_event_queue;
1948   rx_fifo = s->rx_fifo;
1949   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
1950
1951   if (is_ct)
1952     svm_fifo_unset_event (s->rx_fifo);
1953
1954   if (svm_fifo_is_empty_cons (rx_fifo))
1955     {
1956       if (is_nonblocking)
1957         {
1958           svm_fifo_unset_event (rx_fifo);
1959           return VPPCOM_EWOULDBLOCK;
1960         }
1961       while (svm_fifo_is_empty_cons (rx_fifo))
1962         {
1963           if (vcl_session_is_closing (s))
1964             return vcl_session_closing_error (s);
1965
1966           svm_fifo_unset_event (rx_fifo);
1967           svm_msg_q_lock (mq);
1968           if (svm_msg_q_is_empty (mq))
1969             svm_msg_q_wait (mq);
1970
1971           svm_msg_q_sub_w_lock (mq, &msg);
1972           e = svm_msg_q_msg_data (mq, &msg);
1973           svm_msg_q_unlock (mq);
1974           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1975             vcl_handle_mq_event (wrk, e);
1976           svm_msg_q_free_msg (mq, &msg);
1977         }
1978     }
1979
1980   n_read = svm_fifo_segments (rx_fifo, s->rx_bytes_pending,
1981                               (svm_fifo_seg_t *) ds, n_segments, max_bytes);
1982   if (n_read < 0)
1983     return VPPCOM_EAGAIN;
1984
1985   if (svm_fifo_max_dequeue_cons (rx_fifo) == n_read)
1986     {
1987       svm_fifo_unset_event (s->rx_fifo);
1988       if (svm_fifo_max_dequeue_cons (rx_fifo) != n_read
1989           && svm_fifo_set_event (s->rx_fifo)
1990           && vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
1991         {
1992           session_event_t *e;
1993           vec_add2 (wrk->unhandled_evts_vector, e, 1);
1994           e->event_type = SESSION_IO_EVT_RX;
1995           e->session_index = s->session_index;
1996         }
1997     }
1998
1999   s->rx_bytes_pending += n_read;
2000   return n_read;
2001 }
2002
2003 void
2004 vppcom_session_free_segments (uint32_t session_handle, uint32_t n_bytes)
2005 {
2006   vcl_worker_t *wrk = vcl_worker_get_current ();
2007   vcl_session_t *s;
2008
2009   s = vcl_session_get_w_handle (wrk, session_handle);
2010   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
2011     return;
2012
2013   svm_fifo_dequeue_drop (s->rx_fifo, n_bytes);
2014
2015   ASSERT (s->rx_bytes_pending < n_bytes);
2016   s->rx_bytes_pending -= n_bytes;
2017 }
2018
2019 static u8
2020 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
2021 {
2022   return (e->event_type == SESSION_IO_EVT_TX && e->session_index == sid);
2023 }
2024
2025 always_inline u8
2026 vcl_fifo_is_writeable (svm_fifo_t * f, u32 len, u8 is_dgram)
2027 {
2028   u32 max_enq = svm_fifo_max_enqueue_prod (f);
2029   if (is_dgram)
2030     return max_enq >= (sizeof (session_dgram_hdr_t) + len);
2031   else
2032     return max_enq > 0;
2033 }
2034
2035 always_inline int
2036 vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
2037                              size_t n, u8 is_flush, u8 is_dgram)
2038 {
2039   int n_write, is_nonblocking;
2040   session_evt_type_t et;
2041   svm_msg_q_msg_t msg;
2042   svm_fifo_t *tx_fifo;
2043   session_event_t *e;
2044   svm_msg_q_t *mq;
2045   u8 is_ct;
2046
2047   if (PREDICT_FALSE (!buf || n == 0))
2048     return VPPCOM_EINVAL;
2049
2050   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2051     {
2052       VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
2053             " session!", s->session_index, s->vpp_handle);
2054       return VPPCOM_EBADFD;
2055     }
2056
2057   if (PREDICT_FALSE (!vcl_session_is_open (s)))
2058     {
2059       VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
2060             s->session_index, s->vpp_handle, s->session_state,
2061             vppcom_session_state_str (s->session_state));
2062       return vcl_session_closed_error (s);;
2063     }
2064
2065   is_ct = vcl_session_is_ct (s);
2066   tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
2067   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
2068
2069   mq = wrk->app_event_queue;
2070   if (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2071     {
2072       if (is_nonblocking)
2073         {
2074           return VPPCOM_EWOULDBLOCK;
2075         }
2076       while (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2077         {
2078           svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
2079           if (vcl_session_is_closing (s))
2080             return vcl_session_closing_error (s);
2081           svm_msg_q_lock (mq);
2082           if (svm_msg_q_is_empty (mq))
2083             svm_msg_q_wait (mq);
2084
2085           svm_msg_q_sub_w_lock (mq, &msg);
2086           e = svm_msg_q_msg_data (mq, &msg);
2087           svm_msg_q_unlock (mq);
2088
2089           if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
2090             vcl_handle_mq_event (wrk, e);
2091           svm_msg_q_free_msg (mq, &msg);
2092         }
2093     }
2094
2095   et = SESSION_IO_EVT_TX;
2096   if (is_flush && !is_ct)
2097     et = SESSION_IO_EVT_TX_FLUSH;
2098
2099   if (is_dgram)
2100     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
2101                                   s->vpp_evt_q, buf, n, et,
2102                                   0 /* do_evt */ , SVM_Q_WAIT);
2103   else
2104     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
2105                                    0 /* do_evt */ , SVM_Q_WAIT);
2106
2107   if (svm_fifo_set_event (s->tx_fifo))
2108     app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo->master_session_index,
2109                             et, SVM_Q_WAIT);
2110
2111   /* The underlying fifo segment can run out of memory */
2112   if (PREDICT_FALSE (n_write < 0))
2113     return VPPCOM_EAGAIN;
2114
2115   VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
2116         s->vpp_handle, n_write);
2117
2118   return n_write;
2119 }
2120
2121 int
2122 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
2123 {
2124   vcl_worker_t *wrk = vcl_worker_get_current ();
2125   vcl_session_t *s;
2126
2127   s = vcl_session_get_w_handle (wrk, session_handle);
2128   if (PREDICT_FALSE (!s))
2129     return VPPCOM_EBADFD;
2130
2131   return vppcom_session_write_inline (wrk, s, buf, n,
2132                                       0 /* is_flush */ , s->is_dgram ? 1 : 0);
2133 }
2134
2135 int
2136 vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n)
2137 {
2138   vcl_worker_t *wrk = vcl_worker_get_current ();
2139   vcl_session_t *s;
2140
2141   s = vcl_session_get_w_handle (wrk, session_handle);
2142   if (PREDICT_FALSE (!s))
2143     return VPPCOM_EBADFD;
2144
2145   return vppcom_session_write_inline (wrk, s, buf, n,
2146                                       1 /* is_flush */ , s->is_dgram ? 1 : 0);
2147 }
2148
2149 #define vcl_fifo_rx_evt_valid_or_break(_s)                              \
2150 if (PREDICT_FALSE (!_s->rx_fifo))                                       \
2151   break;                                                                \
2152 if (PREDICT_FALSE (svm_fifo_is_empty (_s->rx_fifo)))                    \
2153   {                                                                     \
2154     if (!vcl_session_is_ct (_s))                                        \
2155       {                                                                 \
2156         svm_fifo_unset_event (_s->rx_fifo);                             \
2157         if (svm_fifo_is_empty (_s->rx_fifo))                            \
2158           break;                                                        \
2159       }                                                                 \
2160     else if (svm_fifo_is_empty (_s->ct_rx_fifo))                        \
2161       {                                                                 \
2162         svm_fifo_unset_event (_s->rx_fifo); /* rx evts on actual fifo*/ \
2163         if (svm_fifo_is_empty (_s->ct_rx_fifo))                         \
2164           break;                                                        \
2165       }                                                                 \
2166   }                                                                     \
2167
2168 static void
2169 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2170                             unsigned long n_bits, unsigned long *read_map,
2171                             unsigned long *write_map,
2172                             unsigned long *except_map, u32 * bits_set)
2173 {
2174   session_disconnected_msg_t *disconnected_msg;
2175   session_connected_msg_t *connected_msg;
2176   vcl_session_t *session;
2177   u32 sid;
2178
2179   switch (e->event_type)
2180     {
2181     case SESSION_IO_EVT_RX:
2182       sid = e->session_index;
2183       session = vcl_session_get (wrk, sid);
2184       if (!session || !vcl_session_is_open (session))
2185         break;
2186       vcl_fifo_rx_evt_valid_or_break (session);
2187       if (sid < n_bits && read_map)
2188         {
2189           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2190           *bits_set += 1;
2191         }
2192       break;
2193     case SESSION_IO_EVT_TX:
2194       sid = e->session_index;
2195       session = vcl_session_get (wrk, sid);
2196       if (!session || !vcl_session_is_open (session))
2197         break;
2198       if (sid < n_bits && write_map)
2199         {
2200           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2201           *bits_set += 1;
2202         }
2203       break;
2204     case SESSION_CTRL_EVT_ACCEPTED:
2205       session = vcl_session_accepted (wrk,
2206                                       (session_accepted_msg_t *) e->data);
2207       if (!session)
2208         break;
2209       sid = session->session_index;
2210       if (sid < n_bits && read_map)
2211         {
2212           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2213           *bits_set += 1;
2214         }
2215       break;
2216     case SESSION_CTRL_EVT_CONNECTED:
2217       connected_msg = (session_connected_msg_t *) e->data;
2218       sid = vcl_session_connected_handler (wrk, connected_msg);
2219       if (sid == VCL_INVALID_SESSION_INDEX)
2220         break;
2221       if (sid < n_bits && write_map)
2222         {
2223           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2224           *bits_set += 1;
2225         }
2226       break;
2227     case SESSION_CTRL_EVT_DISCONNECTED:
2228       disconnected_msg = (session_disconnected_msg_t *) e->data;
2229       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
2230       if (!session)
2231         break;
2232       sid = session->session_index;
2233       if (sid < n_bits && except_map)
2234         {
2235           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2236           *bits_set += 1;
2237         }
2238       break;
2239     case SESSION_CTRL_EVT_RESET:
2240       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2241       if (sid < n_bits && except_map)
2242         {
2243           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2244           *bits_set += 1;
2245         }
2246       break;
2247     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2248       vcl_session_unlisten_reply_handler (wrk, e->data);
2249       break;
2250     case SESSION_CTRL_EVT_MIGRATED:
2251       vcl_session_migrated_handler (wrk, e->data);
2252       break;
2253     case SESSION_CTRL_EVT_CLEANUP:
2254       vcl_session_cleanup_handler (wrk, e->data);
2255       break;
2256     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2257       vcl_session_worker_update_reply_handler (wrk, e->data);
2258       break;
2259     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2260       vcl_session_req_worker_update_handler (wrk, e->data);
2261       break;
2262     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
2263       vcl_session_app_add_segment_handler (wrk, e->data);
2264       break;
2265     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
2266       vcl_session_app_del_segment_handler (wrk, e->data);
2267       break;
2268     case SESSION_CTRL_EVT_APP_WRK_RPC:
2269       vcl_worker_rpc_handler (wrk, e->data);
2270       break;
2271     default:
2272       clib_warning ("unhandled: %u", e->event_type);
2273       break;
2274     }
2275 }
2276
2277 static int
2278 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2279                       unsigned long n_bits, unsigned long *read_map,
2280                       unsigned long *write_map, unsigned long *except_map,
2281                       double time_to_wait, u32 * bits_set)
2282 {
2283   svm_msg_q_msg_t *msg;
2284   session_event_t *e;
2285   u32 i;
2286
2287   svm_msg_q_lock (mq);
2288   if (svm_msg_q_is_empty (mq))
2289     {
2290       if (*bits_set)
2291         {
2292           svm_msg_q_unlock (mq);
2293           return 0;
2294         }
2295
2296       if (!time_to_wait)
2297         {
2298           svm_msg_q_unlock (mq);
2299           return 0;
2300         }
2301       else if (time_to_wait < 0)
2302         {
2303           svm_msg_q_wait (mq);
2304         }
2305       else
2306         {
2307           if (svm_msg_q_timedwait (mq, time_to_wait))
2308             {
2309               svm_msg_q_unlock (mq);
2310               return 0;
2311             }
2312         }
2313     }
2314   vcl_mq_dequeue_batch (wrk, mq, ~0);
2315   svm_msg_q_unlock (mq);
2316
2317   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2318     {
2319       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2320       e = svm_msg_q_msg_data (mq, msg);
2321       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
2322                                   except_map, bits_set);
2323       svm_msg_q_free_msg (mq, msg);
2324     }
2325   vec_reset_length (wrk->mq_msg_vector);
2326   vcl_handle_pending_wrk_updates (wrk);
2327   return *bits_set;
2328 }
2329
2330 static int
2331 vppcom_select_condvar (vcl_worker_t * wrk, int n_bits,
2332                        vcl_si_set * read_map, vcl_si_set * write_map,
2333                        vcl_si_set * except_map, double time_to_wait,
2334                        u32 * bits_set)
2335 {
2336   double wait = 0, start = 0;
2337
2338   if (!*bits_set)
2339     {
2340       wait = time_to_wait;
2341       start = clib_time_now (&wrk->clib_time);
2342     }
2343
2344   do
2345     {
2346       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
2347                             write_map, except_map, wait, bits_set);
2348       if (*bits_set)
2349         return *bits_set;
2350       if (wait == -1)
2351         continue;
2352
2353       wait = wait - (clib_time_now (&wrk->clib_time) - start);
2354     }
2355   while (wait > 0);
2356
2357   return 0;
2358 }
2359
2360 static int
2361 vppcom_select_eventfd (vcl_worker_t * wrk, int n_bits,
2362                        vcl_si_set * read_map, vcl_si_set * write_map,
2363                        vcl_si_set * except_map, double time_to_wait,
2364                        u32 * bits_set)
2365 {
2366   vcl_mq_evt_conn_t *mqc;
2367   int __clib_unused n_read;
2368   int n_mq_evts, i;
2369   u64 buf;
2370
2371   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2372   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2373                           vec_len (wrk->mq_events), time_to_wait);
2374   for (i = 0; i < n_mq_evts; i++)
2375     {
2376       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2377       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2378       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
2379                             except_map, 0, bits_set);
2380     }
2381
2382   return (n_mq_evts > 0 ? (int) *bits_set : 0);
2383 }
2384
2385 int
2386 vppcom_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
2387                vcl_si_set * except_map, double time_to_wait)
2388 {
2389   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
2390   vcl_worker_t *wrk = vcl_worker_get_current ();
2391   vcl_session_t *session = 0;
2392   int i;
2393
2394   if (n_bits && read_map)
2395     {
2396       clib_bitmap_validate (wrk->rd_bitmap, minbits);
2397       clib_memcpy_fast (wrk->rd_bitmap, read_map,
2398                         vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2399       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2400     }
2401   if (n_bits && write_map)
2402     {
2403       clib_bitmap_validate (wrk->wr_bitmap, minbits);
2404       clib_memcpy_fast (wrk->wr_bitmap, write_map,
2405                         vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2406       memset (write_map, 0, vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2407     }
2408   if (n_bits && except_map)
2409     {
2410       clib_bitmap_validate (wrk->ex_bitmap, minbits);
2411       clib_memcpy_fast (wrk->ex_bitmap, except_map,
2412                         vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2413       memset (except_map, 0, vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2414     }
2415
2416   if (!n_bits)
2417     return 0;
2418
2419   if (!write_map)
2420     goto check_rd;
2421
2422   /* *INDENT-OFF* */
2423   clib_bitmap_foreach (sid, wrk->wr_bitmap, ({
2424     if (!(session = vcl_session_get (wrk, sid)))
2425       {
2426         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
2427         bits_set++;
2428         continue;
2429       }
2430
2431     if (vcl_session_write_ready (session))
2432       {
2433         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
2434         bits_set++;
2435       }
2436     else
2437       svm_fifo_add_want_deq_ntf (session->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
2438   }));
2439
2440 check_rd:
2441   if (!read_map)
2442     goto check_mq;
2443
2444   clib_bitmap_foreach (sid, wrk->rd_bitmap, ({
2445     if (!(session = vcl_session_get (wrk, sid)))
2446       {
2447         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
2448         bits_set++;
2449         continue;
2450       }
2451
2452     if (vcl_session_read_ready (session))
2453       {
2454         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
2455         bits_set++;
2456       }
2457   }));
2458   /* *INDENT-ON* */
2459
2460 check_mq:
2461
2462   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2463     {
2464       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
2465                                   read_map, write_map, except_map, &bits_set);
2466     }
2467   vec_reset_length (wrk->unhandled_evts_vector);
2468
2469   if (vcm->cfg.use_mq_eventfd)
2470     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
2471                            time_to_wait, &bits_set);
2472   else
2473     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
2474                            time_to_wait, &bits_set);
2475
2476   return (bits_set);
2477 }
2478
2479 static inline void
2480 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_handle)
2481 {
2482   vppcom_epoll_t *vep;
2483   u32 sh = vep_handle;
2484   vcl_session_t *s;
2485
2486   if (VPPCOM_DEBUG <= 2)
2487     return;
2488
2489   s = vcl_session_get_w_handle (wrk, vep_handle);
2490   if (PREDICT_FALSE (!s))
2491     {
2492       VDBG (0, "ERROR: Invalid vep_sh (%u)!", vep_handle);
2493       goto done;
2494     }
2495   if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP)))
2496     {
2497       VDBG (0, "ERROR: vep_sh (%u) is not a vep!", vep_handle);
2498       goto done;
2499     }
2500   vep = &s->vep;
2501   VDBG (0, "vep_sh (%u): Dumping epoll chain\n"
2502         "{\n"
2503         "   is_vep         = %u\n"
2504         "   is_vep_session = %u\n"
2505         "   next_sh        = 0x%x (%u)\n"
2506         "}\n", vep_handle, s->flags & VCL_SESSION_F_IS_VEP,
2507         s->flags & VCL_SESSION_F_IS_VEP_SESSION, vep->next_sh, vep->next_sh);
2508
2509   for (sh = vep->next_sh; sh != ~0; sh = vep->next_sh)
2510     {
2511       s = vcl_session_get_w_handle (wrk, sh);
2512       if (PREDICT_FALSE (!s))
2513         {
2514           VDBG (0, "ERROR: Invalid sh (%u)!", sh);
2515           goto done;
2516         }
2517       if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2518         {
2519           VDBG (0, "ERROR: sh (%u) is a vep!", vep_handle);
2520         }
2521       else if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2522         {
2523           VDBG (0, "ERROR: sh (%u) is not a vep session handle!", sh);
2524           goto done;
2525         }
2526       vep = &s->vep;
2527       if (PREDICT_FALSE (vep->vep_sh != vep_handle))
2528         VDBG (0, "ERROR: session (%u) vep_sh (%u) != vep_sh (%u)!",
2529               sh, s->vep.vep_sh, vep_handle);
2530       if (s->flags & VCL_SESSION_F_IS_VEP_SESSION)
2531         {
2532           VDBG (0, "vep_sh[%u]: sh 0x%x (%u)\n"
2533                 "{\n"
2534                 "   next_sh        = 0x%x (%u)\n"
2535                 "   prev_sh        = 0x%x (%u)\n"
2536                 "   vep_sh         = 0x%x (%u)\n"
2537                 "   ev.events      = 0x%x\n"
2538                 "   ev.data.u64    = 0x%llx\n"
2539                 "   et_mask        = 0x%x\n"
2540                 "}\n",
2541                 vep_handle, sh, sh, vep->next_sh, vep->next_sh, vep->prev_sh,
2542                 vep->prev_sh, vep->vep_sh, vep->vep_sh, vep->ev.events,
2543                 vep->ev.data.u64, vep->et_mask);
2544         }
2545     }
2546
2547 done:
2548   VDBG (0, "vep_sh (%u): Dump complete!\n", vep_handle);
2549 }
2550
2551 int
2552 vppcom_epoll_create (void)
2553 {
2554   vcl_worker_t *wrk = vcl_worker_get_current ();
2555   vcl_session_t *vep_session;
2556
2557   vep_session = vcl_session_alloc (wrk);
2558
2559   vep_session->flags |= VCL_SESSION_F_IS_VEP;
2560   vep_session->vep.vep_sh = ~0;
2561   vep_session->vep.next_sh = ~0;
2562   vep_session->vep.prev_sh = ~0;
2563   vep_session->vpp_handle = ~0;
2564
2565   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_session->session_index);
2566   VDBG (0, "Created vep_idx %u", vep_session->session_index);
2567
2568   return vcl_session_handle (vep_session);
2569 }
2570
2571 int
2572 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2573                   struct epoll_event *event)
2574 {
2575   vcl_worker_t *wrk = vcl_worker_get_current ();
2576   vcl_session_t *vep_session;
2577   int rv = VPPCOM_OK;
2578   vcl_session_t *s;
2579   svm_fifo_t *txf;
2580
2581   if (vep_handle == session_handle)
2582     {
2583       VDBG (0, "vep_sh == session handle (%u)!", vep_handle);
2584       return VPPCOM_EINVAL;
2585     }
2586
2587   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2588   if (PREDICT_FALSE (!vep_session))
2589     {
2590       VDBG (0, "Invalid vep_sh (%u)!", vep_handle);
2591       return VPPCOM_EBADFD;
2592     }
2593   if (PREDICT_FALSE (!(vep_session->flags & VCL_SESSION_F_IS_VEP)))
2594     {
2595       VDBG (0, "vep_sh (%u) is not a vep!", vep_handle);
2596       return VPPCOM_EINVAL;
2597     }
2598
2599   ASSERT (vep_session->vep.vep_sh == ~0);
2600   ASSERT (vep_session->vep.prev_sh == ~0);
2601
2602   s = vcl_session_get_w_handle (wrk, session_handle);
2603   if (PREDICT_FALSE (!s))
2604     {
2605       VDBG (0, "Invalid session_handle (%u)!", session_handle);
2606       return VPPCOM_EBADFD;
2607     }
2608   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2609     {
2610       VDBG (0, "session_handle (%u) is a vep!", vep_handle);
2611       return VPPCOM_EINVAL;
2612     }
2613
2614   switch (op)
2615     {
2616     case EPOLL_CTL_ADD:
2617       if (PREDICT_FALSE (!event))
2618         {
2619           VDBG (0, "EPOLL_CTL_ADD: NULL pointer to epoll_event structure!");
2620           return VPPCOM_EINVAL;
2621         }
2622       if (vep_session->vep.next_sh != ~0)
2623         {
2624           vcl_session_t *next_session;
2625           next_session = vcl_session_get_w_handle (wrk,
2626                                                    vep_session->vep.next_sh);
2627           if (PREDICT_FALSE (!next_session))
2628             {
2629               VDBG (0, "EPOLL_CTL_ADD: Invalid vep.next_sh (%u) on "
2630                     "vep_idx (%u)!", vep_session->vep.next_sh, vep_handle);
2631               return VPPCOM_EBADFD;
2632             }
2633           ASSERT (next_session->vep.prev_sh == vep_handle);
2634           next_session->vep.prev_sh = session_handle;
2635         }
2636       s->vep.next_sh = vep_session->vep.next_sh;
2637       s->vep.prev_sh = vep_handle;
2638       s->vep.vep_sh = vep_handle;
2639       s->vep.et_mask = VEP_DEFAULT_ET_MASK;
2640       s->vep.ev = *event;
2641       s->flags &= ~VCL_SESSION_F_IS_VEP;
2642       s->flags |= VCL_SESSION_F_IS_VEP_SESSION;
2643       vep_session->vep.next_sh = session_handle;
2644
2645       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2646       if (txf && (event->events & EPOLLOUT))
2647         svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2648
2649       /* Generate EPOLLOUT if tx fifo not full */
2650       if ((event->events & EPOLLOUT) && (vcl_session_write_ready (s) > 0))
2651         {
2652           session_event_t e = { 0 };
2653           e.event_type = SESSION_IO_EVT_TX;
2654           e.session_index = s->session_index;
2655           vec_add1 (wrk->unhandled_evts_vector, e);
2656         }
2657       /* Generate EPOLLIN if rx fifo has data */
2658       if ((event->events & EPOLLIN) && (vcl_session_read_ready (s) > 0))
2659         {
2660           session_event_t e = { 0 };
2661           e.event_type = SESSION_IO_EVT_RX;
2662           e.session_index = s->session_index;
2663           vec_add1 (wrk->unhandled_evts_vector, e);
2664         }
2665       VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2666             vep_handle, session_handle, event->events, event->data.u64);
2667       vcl_evt (VCL_EVT_EPOLL_CTLADD, s, event->events, event->data.u64);
2668       break;
2669
2670     case EPOLL_CTL_MOD:
2671       if (PREDICT_FALSE (!event))
2672         {
2673           VDBG (0, "EPOLL_CTL_MOD: NULL pointer to epoll_event structure!");
2674           rv = VPPCOM_EINVAL;
2675           goto done;
2676         }
2677       else if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2678         {
2679           VDBG (0, "sh %u EPOLL_CTL_MOD: not a vep session!", session_handle);
2680           rv = VPPCOM_EINVAL;
2681           goto done;
2682         }
2683       else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
2684         {
2685           VDBG (0, "EPOLL_CTL_MOD: sh %u vep_sh (%u) != vep_sh (%u)!",
2686                 session_handle, s->vep.vep_sh, vep_handle);
2687           rv = VPPCOM_EINVAL;
2688           goto done;
2689         }
2690
2691       /* Generate EPOLLOUT when tx_fifo/ct_tx_fifo not full */
2692       if ((event->events & EPOLLOUT) &&
2693           !(s->vep.ev.events & EPOLLOUT) && (vcl_session_write_ready (s) > 0))
2694         {
2695           session_event_t e = { 0 };
2696           e.event_type = SESSION_IO_EVT_TX;
2697           e.session_index = s->session_index;
2698           vec_add1 (wrk->unhandled_evts_vector, e);
2699         }
2700       s->vep.et_mask = VEP_DEFAULT_ET_MASK;
2701       s->vep.ev = *event;
2702       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2703       if (event->events & EPOLLOUT)
2704         svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2705       else
2706         svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2707       VDBG (1, "EPOLL_CTL_MOD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2708             vep_handle, session_handle, event->events, event->data.u64);
2709       break;
2710
2711     case EPOLL_CTL_DEL:
2712       if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2713         {
2714           VDBG (0, "EPOLL_CTL_DEL: %u not a vep session!", session_handle);
2715           rv = VPPCOM_EINVAL;
2716           goto done;
2717         }
2718       else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
2719         {
2720           VDBG (0, "EPOLL_CTL_DEL: sh %u vep_sh (%u) != vep_sh (%u)!",
2721                 session_handle, s->vep.vep_sh, vep_handle);
2722           rv = VPPCOM_EINVAL;
2723           goto done;
2724         }
2725
2726       if (s->vep.prev_sh == vep_handle)
2727         vep_session->vep.next_sh = s->vep.next_sh;
2728       else
2729         {
2730           vcl_session_t *prev_session;
2731           prev_session = vcl_session_get_w_handle (wrk, s->vep.prev_sh);
2732           if (PREDICT_FALSE (!prev_session))
2733             {
2734               VDBG (0, "EPOLL_CTL_DEL: Invalid prev_sh (%u) on sh (%u)!",
2735                     s->vep.prev_sh, session_handle);
2736               return VPPCOM_EBADFD;
2737             }
2738           ASSERT (prev_session->vep.next_sh == session_handle);
2739           prev_session->vep.next_sh = s->vep.next_sh;
2740         }
2741       if (s->vep.next_sh != ~0)
2742         {
2743           vcl_session_t *next_session;
2744           next_session = vcl_session_get_w_handle (wrk, s->vep.next_sh);
2745           if (PREDICT_FALSE (!next_session))
2746             {
2747               VDBG (0, "EPOLL_CTL_DEL: Invalid next_sh (%u) on sh (%u)!",
2748                     s->vep.next_sh, session_handle);
2749               return VPPCOM_EBADFD;
2750             }
2751           ASSERT (next_session->vep.prev_sh == session_handle);
2752           next_session->vep.prev_sh = s->vep.prev_sh;
2753         }
2754
2755       memset (&s->vep, 0, sizeof (s->vep));
2756       s->vep.next_sh = ~0;
2757       s->vep.prev_sh = ~0;
2758       s->vep.vep_sh = ~0;
2759       s->flags &= ~VCL_SESSION_F_IS_VEP_SESSION;
2760
2761       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2762       if (txf)
2763         svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2764
2765       VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sh %u!", vep_handle,
2766             session_handle);
2767       vcl_evt (VCL_EVT_EPOLL_CTLDEL, s, vep_sh);
2768       break;
2769
2770     default:
2771       VDBG (0, "Invalid operation (%d)!", op);
2772       rv = VPPCOM_EINVAL;
2773     }
2774
2775   vep_verify_epoll_chain (wrk, vep_handle);
2776
2777 done:
2778   return rv;
2779 }
2780
2781 static inline void
2782 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2783                                 struct epoll_event *events, u32 * num_ev)
2784 {
2785   session_disconnected_msg_t *disconnected_msg;
2786   session_connected_msg_t *connected_msg;
2787   u32 sid = ~0, session_events;
2788   u64 session_evt_data = ~0;
2789   vcl_session_t *session;
2790   u8 add_event = 0;
2791
2792   switch (e->event_type)
2793     {
2794     case SESSION_IO_EVT_RX:
2795       sid = e->session_index;
2796       session = vcl_session_get (wrk, sid);
2797       if (vcl_session_is_closed (session))
2798         break;
2799       vcl_fifo_rx_evt_valid_or_break (session);
2800       session_events = session->vep.ev.events;
2801       if (!(EPOLLIN & session->vep.ev.events)
2802           || (session->flags & VCL_SESSION_F_HAS_RX_EVT))
2803         break;
2804       add_event = 1;
2805       events[*num_ev].events |= EPOLLIN;
2806       session_evt_data = session->vep.ev.data.u64;
2807       session->flags |= VCL_SESSION_F_HAS_RX_EVT;
2808       break;
2809     case SESSION_IO_EVT_TX:
2810       sid = e->session_index;
2811       session = vcl_session_get (wrk, sid);
2812       if (vcl_session_is_closed (session))
2813         break;
2814       session_events = session->vep.ev.events;
2815       if (!(EPOLLOUT & session_events))
2816         break;
2817       add_event = 1;
2818       events[*num_ev].events |= EPOLLOUT;
2819       session_evt_data = session->vep.ev.data.u64;
2820       svm_fifo_reset_has_deq_ntf (vcl_session_is_ct (session) ?
2821                                   session->ct_tx_fifo : session->tx_fifo);
2822       break;
2823     case SESSION_CTRL_EVT_ACCEPTED:
2824       session = vcl_session_accepted (wrk,
2825                                       (session_accepted_msg_t *) e->data);
2826       if (!session)
2827         break;
2828
2829       session_events = session->vep.ev.events;
2830       if (!(EPOLLIN & session_events))
2831         break;
2832
2833       add_event = 1;
2834       events[*num_ev].events |= EPOLLIN;
2835       session_evt_data = session->vep.ev.data.u64;
2836       break;
2837     case SESSION_CTRL_EVT_CONNECTED:
2838       connected_msg = (session_connected_msg_t *) e->data;
2839       sid = vcl_session_connected_handler (wrk, connected_msg);
2840       /* Generate EPOLLOUT because there's no connected event */
2841       session = vcl_session_get (wrk, sid);
2842       if (vcl_session_is_closed (session))
2843         break;
2844       session_events = session->vep.ev.events;
2845       if (!(EPOLLOUT & session_events))
2846         break;
2847       add_event = 1;
2848       events[*num_ev].events |= EPOLLOUT;
2849       session_evt_data = session->vep.ev.data.u64;
2850       if (session->session_state == VCL_STATE_DETACHED)
2851         events[*num_ev].events |= EPOLLHUP;
2852       break;
2853     case SESSION_CTRL_EVT_DISCONNECTED:
2854       disconnected_msg = (session_disconnected_msg_t *) e->data;
2855       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
2856       if (vcl_session_is_closed (session))
2857         break;
2858       session_events = session->vep.ev.events;
2859       add_event = 1;
2860       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2861       session_evt_data = session->vep.ev.data.u64;
2862       break;
2863     case SESSION_CTRL_EVT_RESET:
2864       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2865       session = vcl_session_get (wrk, sid);
2866       if (vcl_session_is_closed (session))
2867         break;
2868       session_events = session->vep.ev.events;
2869       add_event = 1;
2870       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2871       session_evt_data = session->vep.ev.data.u64;
2872       break;
2873     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2874       vcl_session_unlisten_reply_handler (wrk, e->data);
2875       break;
2876     case SESSION_CTRL_EVT_MIGRATED:
2877       vcl_session_migrated_handler (wrk, e->data);
2878       break;
2879     case SESSION_CTRL_EVT_CLEANUP:
2880       vcl_session_cleanup_handler (wrk, e->data);
2881       break;
2882     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2883       vcl_session_req_worker_update_handler (wrk, e->data);
2884       break;
2885     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2886       vcl_session_worker_update_reply_handler (wrk, e->data);
2887       break;
2888     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
2889       vcl_session_app_add_segment_handler (wrk, e->data);
2890       break;
2891     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
2892       vcl_session_app_del_segment_handler (wrk, e->data);
2893       break;
2894     case SESSION_CTRL_EVT_APP_WRK_RPC:
2895       vcl_worker_rpc_handler (wrk, e->data);
2896       break;
2897     default:
2898       VDBG (0, "unhandled: %u", e->event_type);
2899       break;
2900     }
2901
2902   if (add_event)
2903     {
2904       events[*num_ev].data.u64 = session_evt_data;
2905       if (EPOLLONESHOT & session_events)
2906         {
2907           session = vcl_session_get (wrk, sid);
2908           session->vep.ev.events = 0;
2909         }
2910       *num_ev += 1;
2911     }
2912 }
2913
2914 static int
2915 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2916                           struct epoll_event *events, u32 maxevents,
2917                           double wait_for_time, u32 * num_ev)
2918 {
2919   svm_msg_q_msg_t *msg;
2920   session_event_t *e;
2921   int i;
2922
2923   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
2924     goto handle_dequeued;
2925
2926   svm_msg_q_lock (mq);
2927   if (svm_msg_q_is_empty (mq))
2928     {
2929       if (!wait_for_time)
2930         {
2931           svm_msg_q_unlock (mq);
2932           return 0;
2933         }
2934       else if (wait_for_time < 0)
2935         {
2936           svm_msg_q_wait (mq);
2937         }
2938       else
2939         {
2940           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2941             {
2942               svm_msg_q_unlock (mq);
2943               return 0;
2944             }
2945         }
2946     }
2947   ASSERT (maxevents > *num_ev);
2948   vcl_mq_dequeue_batch (wrk, mq, ~0);
2949   svm_msg_q_unlock (mq);
2950
2951 handle_dequeued:
2952   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2953     {
2954       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2955       e = svm_msg_q_msg_data (mq, msg);
2956       if (*num_ev < maxevents)
2957         vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
2958       else
2959         vcl_handle_mq_event (wrk, e);
2960       svm_msg_q_free_msg (mq, msg);
2961     }
2962   vec_reset_length (wrk->mq_msg_vector);
2963   vcl_handle_pending_wrk_updates (wrk);
2964   return *num_ev;
2965 }
2966
2967 static int
2968 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
2969                            int maxevents, u32 n_evts, double wait_for_time)
2970 {
2971   double wait = 0, start = 0, now;
2972
2973   if (!n_evts)
2974     {
2975       wait = wait_for_time;
2976       start = clib_time_now (&wrk->clib_time);
2977     }
2978
2979   do
2980     {
2981       vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events, maxevents,
2982                                 wait, &n_evts);
2983       if (n_evts)
2984         return n_evts;
2985       if (wait == -1)
2986         continue;
2987
2988       now = clib_time_now (&wrk->clib_time);
2989       wait -= (now - start) * 1e3;
2990       start = now;
2991     }
2992   while (wait > 0);
2993
2994   return 0;
2995 }
2996
2997 static int
2998 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
2999                            int maxevents, u32 n_evts, double wait_for_time)
3000 {
3001   vcl_mq_evt_conn_t *mqc;
3002   int __clib_unused n_read;
3003   int n_mq_evts, i;
3004   u64 buf;
3005
3006   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
3007 again:
3008   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
3009                           vec_len (wrk->mq_events), wait_for_time);
3010   for (i = 0; i < n_mq_evts; i++)
3011     {
3012       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
3013       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
3014       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
3015     }
3016   if (!n_evts && n_mq_evts > 0)
3017     goto again;
3018
3019   return (int) n_evts;
3020 }
3021
3022 int
3023 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
3024                    int maxevents, double wait_for_time)
3025 {
3026   vcl_worker_t *wrk = vcl_worker_get_current ();
3027   vcl_session_t *vep_session;
3028   u32 n_evts = 0;
3029   int i;
3030
3031   if (PREDICT_FALSE (maxevents <= 0))
3032     {
3033       VDBG (0, "ERROR: Invalid maxevents (%d)!", maxevents);
3034       return VPPCOM_EINVAL;
3035     }
3036
3037   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
3038   if (!vep_session)
3039     return VPPCOM_EBADFD;
3040
3041   if (PREDICT_FALSE (!(vep_session->flags & VCL_SESSION_F_IS_VEP)))
3042     {
3043       VDBG (0, "ERROR: vep_idx (%u) is not a vep!", vep_handle);
3044       return VPPCOM_EINVAL;
3045     }
3046
3047   memset (events, 0, sizeof (*events) * maxevents);
3048
3049   if (vec_len (wrk->unhandled_evts_vector))
3050     {
3051       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
3052         {
3053           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
3054                                           events, &n_evts);
3055           if (n_evts == maxevents)
3056             {
3057               vec_delete (wrk->unhandled_evts_vector, i + 1, 0);
3058               return n_evts;
3059             }
3060         }
3061       vec_reset_length (wrk->unhandled_evts_vector);
3062     }
3063
3064   if (vcm->cfg.use_mq_eventfd)
3065     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
3066                                       wait_for_time);
3067
3068   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
3069                                     wait_for_time);
3070 }
3071
3072 int
3073 vppcom_session_attr (uint32_t session_handle, uint32_t op,
3074                      void *buffer, uint32_t * buflen)
3075 {
3076   vcl_worker_t *wrk = vcl_worker_get_current ();
3077   vcl_session_t *session;
3078   int rv = VPPCOM_OK;
3079   u32 *flags = buffer, tmp_flags = 0;
3080   vppcom_endpt_t *ep = buffer;
3081
3082   session = vcl_session_get_w_handle (wrk, session_handle);
3083   if (!session)
3084     return VPPCOM_EBADFD;
3085
3086   switch (op)
3087     {
3088     case VPPCOM_ATTR_GET_NREAD:
3089       rv = vcl_session_read_ready (session);
3090       VDBG (2, "VPPCOM_ATTR_GET_NREAD: sh %u, nread = %d", session_handle,
3091             rv);
3092       break;
3093
3094     case VPPCOM_ATTR_GET_NWRITE:
3095       rv = vcl_session_write_ready (session);
3096       VDBG (2, "VPPCOM_ATTR_GET_NWRITE: sh %u, nwrite = %d", session_handle,
3097             rv);
3098       break;
3099
3100     case VPPCOM_ATTR_GET_FLAGS:
3101       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
3102         {
3103           *flags =
3104             O_RDWR |
3105             (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK) ?
3106              O_NONBLOCK : 0);
3107           *buflen = sizeof (*flags);
3108           VDBG (2, "VPPCOM_ATTR_GET_FLAGS: sh %u, flags = 0x%08x, "
3109                 "is_nonblocking = %u", session_handle, *flags,
3110                 vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK));
3111         }
3112       else
3113         rv = VPPCOM_EINVAL;
3114       break;
3115
3116     case VPPCOM_ATTR_SET_FLAGS:
3117       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
3118         {
3119           if (*flags & O_NONBLOCK)
3120             vcl_session_set_attr (session, VCL_SESS_ATTR_NONBLOCK);
3121           else
3122             vcl_session_clear_attr (session, VCL_SESS_ATTR_NONBLOCK);
3123
3124           VDBG (2, "VPPCOM_ATTR_SET_FLAGS: sh %u, flags = 0x%08x,"
3125                 " is_nonblocking = %u", session_handle, *flags,
3126                 vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK));
3127         }
3128       else
3129         rv = VPPCOM_EINVAL;
3130       break;
3131
3132     case VPPCOM_ATTR_GET_PEER_ADDR:
3133       if (PREDICT_TRUE (buffer && buflen &&
3134                         (*buflen >= sizeof (*ep)) && ep->ip))
3135         {
3136           ep->is_ip4 = session->transport.is_ip4;
3137           ep->port = session->transport.rmt_port;
3138           if (session->transport.is_ip4)
3139             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3140                               sizeof (ip4_address_t));
3141           else
3142             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3143                               sizeof (ip6_address_t));
3144           *buflen = sizeof (*ep);
3145           VDBG (1, "VPPCOM_ATTR_GET_PEER_ADDR: sh %u, is_ip4 = %u, "
3146                 "addr = %U, port %u", session_handle, ep->is_ip4,
3147                 format_ip46_address, &session->transport.rmt_ip,
3148                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3149                 clib_net_to_host_u16 (ep->port));
3150         }
3151       else
3152         rv = VPPCOM_EINVAL;
3153       break;
3154
3155     case VPPCOM_ATTR_GET_LCL_ADDR:
3156       if (PREDICT_TRUE (buffer && buflen &&
3157                         (*buflen >= sizeof (*ep)) && ep->ip))
3158         {
3159           ep->is_ip4 = session->transport.is_ip4;
3160           ep->port = session->transport.lcl_port;
3161           if (session->transport.is_ip4)
3162             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip4,
3163                               sizeof (ip4_address_t));
3164           else
3165             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip6,
3166                               sizeof (ip6_address_t));
3167           *buflen = sizeof (*ep);
3168           VDBG (1, "VPPCOM_ATTR_GET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3169                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3170                 &session->transport.lcl_ip,
3171                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3172                 clib_net_to_host_u16 (ep->port));
3173         }
3174       else
3175         rv = VPPCOM_EINVAL;
3176       break;
3177
3178     case VPPCOM_ATTR_SET_LCL_ADDR:
3179       if (PREDICT_TRUE (buffer && buflen &&
3180                         (*buflen >= sizeof (*ep)) && ep->ip))
3181         {
3182           session->transport.is_ip4 = ep->is_ip4;
3183           session->transport.lcl_port = ep->port;
3184           vcl_ip_copy_from_ep (&session->transport.lcl_ip, ep);
3185           *buflen = sizeof (*ep);
3186           VDBG (1, "VPPCOM_ATTR_SET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3187                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3188                 &session->transport.lcl_ip,
3189                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3190                 clib_net_to_host_u16 (ep->port));
3191         }
3192       else
3193         rv = VPPCOM_EINVAL;
3194       break;
3195
3196     case VPPCOM_ATTR_GET_LIBC_EPFD:
3197       rv = session->libc_epfd;
3198       VDBG (2, "VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d", rv);
3199       break;
3200
3201     case VPPCOM_ATTR_SET_LIBC_EPFD:
3202       if (PREDICT_TRUE (buffer && buflen &&
3203                         (*buflen == sizeof (session->libc_epfd))))
3204         {
3205           session->libc_epfd = *(int *) buffer;
3206           *buflen = sizeof (session->libc_epfd);
3207
3208           VDBG (2, "VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, buflen %d",
3209                 session->libc_epfd, *buflen);
3210         }
3211       else
3212         rv = VPPCOM_EINVAL;
3213       break;
3214
3215     case VPPCOM_ATTR_GET_PROTOCOL:
3216       if (buffer && buflen && (*buflen >= sizeof (int)))
3217         {
3218           *(int *) buffer = session->session_type;
3219           *buflen = sizeof (int);
3220
3221           VDBG (2, "VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
3222                 *(int *) buffer, *(int *) buffer ? "UDP" : "TCP", *buflen);
3223         }
3224       else
3225         rv = VPPCOM_EINVAL;
3226       break;
3227
3228     case VPPCOM_ATTR_GET_LISTEN:
3229       if (buffer && buflen && (*buflen >= sizeof (int)))
3230         {
3231           *(int *) buffer = vcl_session_has_attr (session,
3232                                                   VCL_SESS_ATTR_LISTEN);
3233           *buflen = sizeof (int);
3234
3235           VDBG (2, "VPPCOM_ATTR_GET_LISTEN: %d, buflen %d", *(int *) buffer,
3236                 *buflen);
3237         }
3238       else
3239         rv = VPPCOM_EINVAL;
3240       break;
3241
3242     case VPPCOM_ATTR_GET_ERROR:
3243       if (buffer && buflen && (*buflen >= sizeof (int)))
3244         {
3245           *(int *) buffer = 0;
3246           *buflen = sizeof (int);
3247
3248           VDBG (2, "VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
3249                 *(int *) buffer, *buflen);
3250         }
3251       else
3252         rv = VPPCOM_EINVAL;
3253       break;
3254
3255     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
3256       if (buffer && buflen && (*buflen >= sizeof (u32)))
3257         {
3258
3259           /* VPP-TBD */
3260           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
3261                                 session->tx_fifo ?
3262                                 svm_fifo_size (session->tx_fifo) :
3263                                 vcm->cfg.tx_fifo_size);
3264           *buflen = sizeof (u32);
3265
3266           VDBG (2, "VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3267                 " #VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer,
3268                 *buflen);
3269         }
3270       else
3271         rv = VPPCOM_EINVAL;
3272       break;
3273
3274     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
3275       if (buffer && buflen && (*buflen == sizeof (u32)))
3276         {
3277           /* VPP-TBD */
3278           session->sndbuf_size = *(u32 *) buffer;
3279           VDBG (2, "VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3280                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3281                 *buflen);
3282         }
3283       else
3284         rv = VPPCOM_EINVAL;
3285       break;
3286
3287     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
3288       if (buffer && buflen && (*buflen >= sizeof (u32)))
3289         {
3290
3291           /* VPP-TBD */
3292           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
3293                                 session->rx_fifo ?
3294                                 svm_fifo_size (session->rx_fifo) :
3295                                 vcm->cfg.rx_fifo_size);
3296           *buflen = sizeof (u32);
3297
3298           VDBG (2, "VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), buflen %d, "
3299                 "#VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer, *buflen);
3300         }
3301       else
3302         rv = VPPCOM_EINVAL;
3303       break;
3304
3305     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
3306       if (buffer && buflen && (*buflen == sizeof (u32)))
3307         {
3308           /* VPP-TBD */
3309           session->rcvbuf_size = *(u32 *) buffer;
3310           VDBG (2, "VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), buflen %d,"
3311                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3312                 *buflen);
3313         }
3314       else
3315         rv = VPPCOM_EINVAL;
3316       break;
3317
3318     case VPPCOM_ATTR_GET_REUSEADDR:
3319       if (buffer && buflen && (*buflen >= sizeof (int)))
3320         {
3321           /* VPP-TBD */
3322           *(int *) buffer = vcl_session_has_attr (session,
3323                                                   VCL_SESS_ATTR_REUSEADDR);
3324           *buflen = sizeof (int);
3325
3326           VDBG (2, "VPPCOM_ATTR_GET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3327                 *(int *) buffer, *buflen);
3328         }
3329       else
3330         rv = VPPCOM_EINVAL;
3331       break;
3332
3333     case VPPCOM_ATTR_SET_REUSEADDR:
3334       if (buffer && buflen && (*buflen == sizeof (int)) &&
3335           !vcl_session_has_attr (session, VCL_SESS_ATTR_LISTEN))
3336         {
3337           /* VPP-TBD */
3338           if (*(int *) buffer)
3339             vcl_session_set_attr (session, VCL_SESS_ATTR_REUSEADDR);
3340           else
3341             vcl_session_clear_attr (session, VCL_SESS_ATTR_REUSEADDR);
3342
3343           VDBG (2, "VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3344                 vcl_session_has_attr (session, VCL_SESS_ATTR_REUSEADDR),
3345                 *buflen);
3346         }
3347       else
3348         rv = VPPCOM_EINVAL;
3349       break;
3350
3351     case VPPCOM_ATTR_GET_REUSEPORT:
3352       if (buffer && buflen && (*buflen >= sizeof (int)))
3353         {
3354           /* VPP-TBD */
3355           *(int *) buffer = vcl_session_has_attr (session,
3356                                                   VCL_SESS_ATTR_REUSEPORT);
3357           *buflen = sizeof (int);
3358
3359           VDBG (2, "VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3360                 *(int *) buffer, *buflen);
3361         }
3362       else
3363         rv = VPPCOM_EINVAL;
3364       break;
3365
3366     case VPPCOM_ATTR_SET_REUSEPORT:
3367       if (buffer && buflen && (*buflen == sizeof (int)) &&
3368           !vcl_session_has_attr (session, VCL_SESS_ATTR_LISTEN))
3369         {
3370           /* VPP-TBD */
3371           if (*(int *) buffer)
3372             vcl_session_set_attr (session, VCL_SESS_ATTR_REUSEPORT);
3373           else
3374             vcl_session_clear_attr (session, VCL_SESS_ATTR_REUSEPORT);
3375
3376           VDBG (2, "VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3377                 vcl_session_has_attr (session, VCL_SESS_ATTR_REUSEPORT),
3378                 *buflen);
3379         }
3380       else
3381         rv = VPPCOM_EINVAL;
3382       break;
3383
3384     case VPPCOM_ATTR_GET_BROADCAST:
3385       if (buffer && buflen && (*buflen >= sizeof (int)))
3386         {
3387           /* VPP-TBD */
3388           *(int *) buffer = vcl_session_has_attr (session,
3389                                                   VCL_SESS_ATTR_BROADCAST);
3390           *buflen = sizeof (int);
3391
3392           VDBG (2, "VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3393                 *(int *) buffer, *buflen);
3394         }
3395       else
3396         rv = VPPCOM_EINVAL;
3397       break;
3398
3399     case VPPCOM_ATTR_SET_BROADCAST:
3400       if (buffer && buflen && (*buflen == sizeof (int)))
3401         {
3402           /* VPP-TBD */
3403           if (*(int *) buffer)
3404             vcl_session_set_attr (session, VCL_SESS_ATTR_BROADCAST);
3405           else
3406             vcl_session_clear_attr (session, VCL_SESS_ATTR_BROADCAST);
3407
3408           VDBG (2, "VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3409                 vcl_session_has_attr (session, VCL_SESS_ATTR_BROADCAST),
3410                 *buflen);
3411         }
3412       else
3413         rv = VPPCOM_EINVAL;
3414       break;
3415
3416     case VPPCOM_ATTR_GET_V6ONLY:
3417       if (buffer && buflen && (*buflen >= sizeof (int)))
3418         {
3419           /* VPP-TBD */
3420           *(int *) buffer = vcl_session_has_attr (session,
3421                                                   VCL_SESS_ATTR_V6ONLY);
3422           *buflen = sizeof (int);
3423
3424           VDBG (2, "VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3425                 *(int *) buffer, *buflen);
3426         }
3427       else
3428         rv = VPPCOM_EINVAL;
3429       break;
3430
3431     case VPPCOM_ATTR_SET_V6ONLY:
3432       if (buffer && buflen && (*buflen == sizeof (int)))
3433         {
3434           /* VPP-TBD */
3435           if (*(int *) buffer)
3436             vcl_session_set_attr (session, VCL_SESS_ATTR_V6ONLY);
3437           else
3438             vcl_session_clear_attr (session, VCL_SESS_ATTR_V6ONLY);
3439
3440           VDBG (2, "VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3441                 vcl_session_has_attr (session, VCL_SESS_ATTR_V6ONLY),
3442                 *buflen);
3443         }
3444       else
3445         rv = VPPCOM_EINVAL;
3446       break;
3447
3448     case VPPCOM_ATTR_GET_KEEPALIVE:
3449       if (buffer && buflen && (*buflen >= sizeof (int)))
3450         {
3451           /* VPP-TBD */
3452           *(int *) buffer = vcl_session_has_attr (session,
3453                                                   VCL_SESS_ATTR_KEEPALIVE);
3454           *buflen = sizeof (int);
3455
3456           VDBG (2, "VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3457                 *(int *) buffer, *buflen);
3458         }
3459       else
3460         rv = VPPCOM_EINVAL;
3461       break;
3462
3463     case VPPCOM_ATTR_SET_KEEPALIVE:
3464       if (buffer && buflen && (*buflen == sizeof (int)))
3465         {
3466           /* VPP-TBD */
3467           if (*(int *) buffer)
3468             vcl_session_set_attr (session, VCL_SESS_ATTR_KEEPALIVE);
3469           else
3470             vcl_session_clear_attr (session, VCL_SESS_ATTR_KEEPALIVE);
3471
3472           VDBG (2, "VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3473                 vcl_session_has_attr (session, VCL_SESS_ATTR_KEEPALIVE),
3474                 *buflen);
3475         }
3476       else
3477         rv = VPPCOM_EINVAL;
3478       break;
3479
3480     case VPPCOM_ATTR_GET_TCP_NODELAY:
3481       if (buffer && buflen && (*buflen >= sizeof (int)))
3482         {
3483           /* VPP-TBD */
3484           *(int *) buffer = vcl_session_has_attr (session,
3485                                                   VCL_SESS_ATTR_TCP_NODELAY);
3486           *buflen = sizeof (int);
3487
3488           VDBG (2, "VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3489                 *(int *) buffer, *buflen);
3490         }
3491       else
3492         rv = VPPCOM_EINVAL;
3493       break;
3494
3495     case VPPCOM_ATTR_SET_TCP_NODELAY:
3496       if (buffer && buflen && (*buflen == sizeof (int)))
3497         {
3498           /* VPP-TBD */
3499           if (*(int *) buffer)
3500             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_NODELAY);
3501           else
3502             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_NODELAY);
3503
3504           VDBG (2, "VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3505                 vcl_session_has_attr (session, VCL_SESS_ATTR_TCP_NODELAY),
3506                 *buflen);
3507         }
3508       else
3509         rv = VPPCOM_EINVAL;
3510       break;
3511
3512     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3513       if (buffer && buflen && (*buflen >= sizeof (int)))
3514         {
3515           /* VPP-TBD */
3516           *(int *) buffer = vcl_session_has_attr (session,
3517                                                   VCL_SESS_ATTR_TCP_KEEPIDLE);
3518           *buflen = sizeof (int);
3519
3520           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3521                 *(int *) buffer, *buflen);
3522         }
3523       else
3524         rv = VPPCOM_EINVAL;
3525       break;
3526
3527     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3528       if (buffer && buflen && (*buflen == sizeof (int)))
3529         {
3530           /* VPP-TBD */
3531           if (*(int *) buffer)
3532             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_KEEPIDLE);
3533           else
3534             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_KEEPIDLE);
3535
3536           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3537                 vcl_session_has_attr (session,
3538                                       VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3539         }
3540       else
3541         rv = VPPCOM_EINVAL;
3542       break;
3543
3544     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3545       if (buffer && buflen && (*buflen >= sizeof (int)))
3546         {
3547           /* VPP-TBD */
3548           *(int *) buffer = vcl_session_has_attr (session,
3549                                                   VCL_SESS_ATTR_TCP_KEEPINTVL);
3550           *buflen = sizeof (int);
3551
3552           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3553                 *(int *) buffer, *buflen);
3554         }
3555       else
3556         rv = VPPCOM_EINVAL;
3557       break;
3558
3559     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3560       if (buffer && buflen && (*buflen == sizeof (int)))
3561         {
3562           /* VPP-TBD */
3563           if (*(int *) buffer)
3564             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_KEEPINTVL);
3565           else
3566             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_KEEPINTVL);
3567
3568           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3569                 vcl_session_has_attr (session,
3570                                       VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3571         }
3572       else
3573         rv = VPPCOM_EINVAL;
3574       break;
3575
3576     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3577       if (buffer && buflen && (*buflen >= sizeof (u32)))
3578         {
3579           /* VPP-TBD */
3580           *(u32 *) buffer = session->user_mss;
3581           *buflen = sizeof (int);
3582
3583           VDBG (2, "VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d, #VPP-TBD#",
3584                 *(int *) buffer, *buflen);
3585         }
3586       else
3587         rv = VPPCOM_EINVAL;
3588       break;
3589
3590     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3591       if (buffer && buflen && (*buflen == sizeof (u32)))
3592         {
3593           /* VPP-TBD */
3594           session->user_mss = *(u32 *) buffer;
3595
3596           VDBG (2, "VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, #VPP-TBD#",
3597                 session->user_mss, *buflen);
3598         }
3599       else
3600         rv = VPPCOM_EINVAL;
3601       break;
3602
3603     case VPPCOM_ATTR_SET_SHUT:
3604       if (*flags == SHUT_RD || *flags == SHUT_RDWR)
3605         vcl_session_set_attr (session, VCL_SESS_ATTR_SHUT_RD);
3606       if (*flags == SHUT_WR || *flags == SHUT_RDWR)
3607         vcl_session_set_attr (session, VCL_SESS_ATTR_SHUT_WR);
3608       break;
3609
3610     case VPPCOM_ATTR_GET_SHUT:
3611       if (vcl_session_has_attr (session, VCL_SESS_ATTR_SHUT_RD))
3612         tmp_flags = 1;
3613       if (vcl_session_has_attr (session, VCL_SESS_ATTR_SHUT_WR))
3614         tmp_flags |= 2;
3615       if (tmp_flags == 1)
3616         *(int *) buffer = SHUT_RD;
3617       else if (tmp_flags == 2)
3618         *(int *) buffer = SHUT_WR;
3619       else if (tmp_flags == 3)
3620         *(int *) buffer = SHUT_RDWR;
3621       *buflen = sizeof (int);
3622       break;
3623
3624     case VPPCOM_ATTR_SET_CONNECTED:
3625       session->flags |= VCL_SESSION_F_CONNECTED;
3626       break;
3627
3628     default:
3629       rv = VPPCOM_EINVAL;
3630       break;
3631     }
3632
3633   return rv;
3634 }
3635
3636 int
3637 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3638                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3639 {
3640   vcl_worker_t *wrk = vcl_worker_get_current ();
3641   vcl_session_t *session;
3642   int rv = VPPCOM_OK;
3643
3644   if (flags == 0)
3645     rv = vppcom_session_read (session_handle, buffer, buflen);
3646   else if (flags & MSG_PEEK)
3647     rv = vppcom_session_peek (session_handle, buffer, buflen);
3648   else
3649     {
3650       VDBG (0, "Unsupport flags for recvfrom %d", flags);
3651       return VPPCOM_EAFNOSUPPORT;
3652     }
3653
3654   if (ep && rv > 0)
3655     {
3656       session = vcl_session_get_w_handle (wrk, session_handle);
3657       if (session->transport.is_ip4)
3658         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3659                           sizeof (ip4_address_t));
3660       else
3661         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3662                           sizeof (ip6_address_t));
3663       ep->is_ip4 = session->transport.is_ip4;
3664       ep->port = session->transport.rmt_port;
3665     }
3666
3667   return rv;
3668 }
3669
3670 int
3671 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3672                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3673 {
3674   vcl_worker_t *wrk = vcl_worker_get_current ();
3675   vcl_session_t *s;
3676
3677   s = vcl_session_get_w_handle (wrk, session_handle);
3678   if (!s)
3679     return VPPCOM_EBADFD;
3680
3681   if (!buffer)
3682     return VPPCOM_EINVAL;
3683
3684   if (ep)
3685     {
3686       if (s->session_type != VPPCOM_PROTO_UDP
3687           || (s->flags & VCL_SESSION_F_CONNECTED))
3688         return VPPCOM_EINVAL;
3689
3690       /* Session not connected/bound in vpp. Create it by 'connecting' it */
3691       if (PREDICT_FALSE (s->session_state == VCL_STATE_CLOSED))
3692         {
3693           vcl_send_session_connect (wrk, s);
3694         }
3695       else
3696         {
3697           s->transport.is_ip4 = ep->is_ip4;
3698           s->transport.rmt_port = ep->port;
3699           vcl_ip_copy_from_ep (&s->transport.rmt_ip, ep);
3700         }
3701     }
3702
3703   if (flags)
3704     {
3705       // TBD check the flags and do the right thing
3706       VDBG (2, "handling flags 0x%u (%d) not implemented yet.", flags, flags);
3707     }
3708
3709   return (vppcom_session_write_inline (wrk, s, buffer, buflen, 1,
3710                                        s->is_dgram ? 1 : 0));
3711 }
3712
3713 int
3714 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3715 {
3716   vcl_worker_t *wrk = vcl_worker_get_current ();
3717   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3718   u32 i, keep_trying = 1;
3719   svm_msg_q_msg_t msg;
3720   session_event_t *e;
3721   int rv, num_ev = 0;
3722
3723   VDBG (3, "vp %p, nsids %u, wait_for_time %f", vp, n_sids, wait_for_time);
3724
3725   if (!vp)
3726     return VPPCOM_EFAULT;
3727
3728   do
3729     {
3730       vcl_session_t *session;
3731
3732       /* Dequeue all events and drop all unhandled io events */
3733       while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
3734         {
3735           e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
3736           vcl_handle_mq_event (wrk, e);
3737           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
3738         }
3739       vec_reset_length (wrk->unhandled_evts_vector);
3740
3741       for (i = 0; i < n_sids; i++)
3742         {
3743           session = vcl_session_get (wrk, vp[i].sh);
3744           if (!session)
3745             {
3746               vp[i].revents = POLLHUP;
3747               num_ev++;
3748               continue;
3749             }
3750
3751           vp[i].revents = 0;
3752
3753           if (POLLIN & vp[i].events)
3754             {
3755               rv = vcl_session_read_ready (session);
3756               if (rv > 0)
3757                 {
3758                   vp[i].revents |= POLLIN;
3759                   num_ev++;
3760                 }
3761               else if (rv < 0)
3762                 {
3763                   switch (rv)
3764                     {
3765                     case VPPCOM_ECONNRESET:
3766                       vp[i].revents = POLLHUP;
3767                       break;
3768
3769                     default:
3770                       vp[i].revents = POLLERR;
3771                       break;
3772                     }
3773                   num_ev++;
3774                 }
3775             }
3776
3777           if (POLLOUT & vp[i].events)
3778             {
3779               rv = vcl_session_write_ready (session);
3780               if (rv > 0)
3781                 {
3782                   vp[i].revents |= POLLOUT;
3783                   num_ev++;
3784                 }
3785               else if (rv < 0)
3786                 {
3787                   switch (rv)
3788                     {
3789                     case VPPCOM_ECONNRESET:
3790                       vp[i].revents = POLLHUP;
3791                       break;
3792
3793                     default:
3794                       vp[i].revents = POLLERR;
3795                       break;
3796                     }
3797                   num_ev++;
3798                 }
3799             }
3800
3801           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3802             {
3803               vp[i].revents = POLLNVAL;
3804               num_ev++;
3805             }
3806         }
3807       if (wait_for_time != -1)
3808         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3809     }
3810   while ((num_ev == 0) && keep_trying);
3811
3812   return num_ev;
3813 }
3814
3815 int
3816 vppcom_mq_epoll_fd (void)
3817 {
3818   vcl_worker_t *wrk = vcl_worker_get_current ();
3819   return wrk->mqs_epfd;
3820 }
3821
3822 int
3823 vppcom_session_index (vcl_session_handle_t session_handle)
3824 {
3825   return session_handle & 0xFFFFFF;
3826 }
3827
3828 int
3829 vppcom_session_worker (vcl_session_handle_t session_handle)
3830 {
3831   return session_handle >> 24;
3832 }
3833
3834 int
3835 vppcom_worker_register (void)
3836 {
3837   if (!vcl_worker_alloc_and_init ())
3838     return VPPCOM_EEXIST;
3839
3840   if (vcl_worker_register_with_vpp ())
3841     return VPPCOM_EEXIST;
3842
3843   return VPPCOM_OK;
3844 }
3845
3846 void
3847 vppcom_worker_unregister (void)
3848 {
3849   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
3850   vcl_set_worker_index (~0);
3851 }
3852
3853 void
3854 vppcom_worker_index_set (int index)
3855 {
3856   vcl_set_worker_index (index);
3857 }
3858
3859 int
3860 vppcom_worker_index (void)
3861 {
3862   return vcl_get_worker_index ();
3863 }
3864
3865 int
3866 vppcom_worker_mqs_epfd (void)
3867 {
3868   vcl_worker_t *wrk = vcl_worker_get_current ();
3869   if (!vcm->cfg.use_mq_eventfd)
3870     return -1;
3871   return wrk->mqs_epfd;
3872 }
3873
3874 int
3875 vppcom_session_is_connectable_listener (uint32_t session_handle)
3876 {
3877   vcl_session_t *session;
3878   vcl_worker_t *wrk = vcl_worker_get_current ();
3879   session = vcl_session_get_w_handle (wrk, session_handle);
3880   if (!session)
3881     return VPPCOM_EBADFD;
3882   return vcl_session_is_connectable_listener (wrk, session);
3883 }
3884
3885 int
3886 vppcom_session_listener (uint32_t session_handle)
3887 {
3888   vcl_worker_t *wrk = vcl_worker_get_current ();
3889   vcl_session_t *listen_session, *session;
3890   session = vcl_session_get_w_handle (wrk, session_handle);
3891   if (!session)
3892     return VPPCOM_EBADFD;
3893   if (session->listener_index == VCL_INVALID_SESSION_INDEX)
3894     return VPPCOM_EBADFD;
3895   listen_session = vcl_session_get_w_handle (wrk, session->listener_index);
3896   if (!listen_session)
3897     return VPPCOM_EBADFD;
3898   return vcl_session_handle (listen_session);
3899 }
3900
3901 int
3902 vppcom_session_n_accepted (uint32_t session_handle)
3903 {
3904   vcl_worker_t *wrk = vcl_worker_get_current ();
3905   vcl_session_t *session = vcl_session_get_w_handle (wrk, session_handle);
3906   if (!session)
3907     return VPPCOM_EBADFD;
3908   return session->n_accepted_sessions;
3909 }
3910
3911 const char *
3912 vppcom_proto_str (vppcom_proto_t proto)
3913 {
3914   char const *proto_str;
3915
3916   switch (proto)
3917     {
3918     case VPPCOM_PROTO_TCP:
3919       proto_str = "TCP";
3920       break;
3921     case VPPCOM_PROTO_UDP:
3922       proto_str = "UDP";
3923       break;
3924     case VPPCOM_PROTO_TLS:
3925       proto_str = "TLS";
3926       break;
3927     case VPPCOM_PROTO_QUIC:
3928       proto_str = "QUIC";
3929       break;
3930     default:
3931       proto_str = "UNKNOWN";
3932       break;
3933     }
3934   return proto_str;
3935 }
3936
3937 const char *
3938 vppcom_retval_str (int retval)
3939 {
3940   char const *st;
3941
3942   switch (retval)
3943     {
3944     case VPPCOM_OK:
3945       st = "VPPCOM_OK";
3946       break;
3947
3948     case VPPCOM_EAGAIN:
3949       st = "VPPCOM_EAGAIN";
3950       break;
3951
3952     case VPPCOM_EFAULT:
3953       st = "VPPCOM_EFAULT";
3954       break;
3955
3956     case VPPCOM_ENOMEM:
3957       st = "VPPCOM_ENOMEM";
3958       break;
3959
3960     case VPPCOM_EINVAL:
3961       st = "VPPCOM_EINVAL";
3962       break;
3963
3964     case VPPCOM_EBADFD:
3965       st = "VPPCOM_EBADFD";
3966       break;
3967
3968     case VPPCOM_EAFNOSUPPORT:
3969       st = "VPPCOM_EAFNOSUPPORT";
3970       break;
3971
3972     case VPPCOM_ECONNABORTED:
3973       st = "VPPCOM_ECONNABORTED";
3974       break;
3975
3976     case VPPCOM_ECONNRESET:
3977       st = "VPPCOM_ECONNRESET";
3978       break;
3979
3980     case VPPCOM_ENOTCONN:
3981       st = "VPPCOM_ENOTCONN";
3982       break;
3983
3984     case VPPCOM_ECONNREFUSED:
3985       st = "VPPCOM_ECONNREFUSED";
3986       break;
3987
3988     case VPPCOM_ETIMEDOUT:
3989       st = "VPPCOM_ETIMEDOUT";
3990       break;
3991
3992     default:
3993       st = "UNKNOWN_STATE";
3994       break;
3995     }
3996
3997   return st;
3998 }
3999
4000 /*
4001  * fd.io coding-style-patch-verification: ON
4002  *
4003  * Local Variables:
4004  * eval: (c-set-style "gnu")
4005  * End:
4006  */