vcl: improve shutdown()
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <vcl/vppcom.h>
19 #include <vcl/vcl_debug.h>
20 #include <vcl/vcl_private.h>
21 #include <svm/fifo_segment.h>
22
23 __thread uword __vcl_worker_index = ~0;
24
25 static inline int
26 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
27 {
28   u32 n_msgs = 0, sz, len;
29
30   while ((sz = svm_msg_q_size (mq)))
31     {
32       len = vec_len (wrk->mq_msg_vector);
33       vec_validate (wrk->mq_msg_vector, len + sz - 1);
34       svm_msg_q_sub_raw_batch (mq, wrk->mq_msg_vector + len, sz);
35       n_msgs += sz;
36     }
37   return n_msgs;
38 }
39
40 const char *
41 vppcom_session_state_str (vcl_session_state_t state)
42 {
43   char *st;
44
45   switch (state)
46     {
47     case VCL_STATE_CLOSED:
48       st = "STATE_CLOSED";
49       break;
50     case VCL_STATE_LISTEN:
51       st = "STATE_LISTEN";
52       break;
53     case VCL_STATE_READY:
54       st = "STATE_READY";
55       break;
56     case VCL_STATE_VPP_CLOSING:
57       st = "STATE_VPP_CLOSING";
58       break;
59     case VCL_STATE_DISCONNECT:
60       st = "STATE_DISCONNECT";
61       break;
62     case VCL_STATE_DETACHED:
63       st = "STATE_DETACHED";
64       break;
65     case VCL_STATE_UPDATED:
66       st = "STATE_UPDATED";
67       break;
68     case VCL_STATE_LISTEN_NO_MQ:
69       st = "STATE_LISTEN_NO_MQ";
70       break;
71     default:
72       st = "UNKNOWN_STATE";
73       break;
74     }
75
76   return st;
77 }
78
79 u8 *
80 format_ip4_address (u8 * s, va_list * args)
81 {
82   u8 *a = va_arg (*args, u8 *);
83   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
84 }
85
86 u8 *
87 format_ip6_address (u8 * s, va_list * args)
88 {
89   ip6_address_t *a = va_arg (*args, ip6_address_t *);
90   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
91
92   i_max_n_zero = ARRAY_LEN (a->as_u16);
93   max_n_zeros = 0;
94   i_first_zero = i_max_n_zero;
95   n_zeros = 0;
96   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
97     {
98       u32 is_zero = a->as_u16[i] == 0;
99       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
100         {
101           i_first_zero = i;
102           n_zeros = 0;
103         }
104       n_zeros += is_zero;
105       if ((!is_zero && n_zeros > max_n_zeros)
106           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
107         {
108           i_max_n_zero = i_first_zero;
109           max_n_zeros = n_zeros;
110           i_first_zero = ARRAY_LEN (a->as_u16);
111           n_zeros = 0;
112         }
113     }
114
115   last_double_colon = 0;
116   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
117     {
118       if (i == i_max_n_zero && max_n_zeros > 1)
119         {
120           s = format (s, "::");
121           i += max_n_zeros - 1;
122           last_double_colon = 1;
123         }
124       else
125         {
126           s = format (s, "%s%x",
127                       (last_double_colon || i == 0) ? "" : ":",
128                       clib_net_to_host_u16 (a->as_u16[i]));
129           last_double_colon = 0;
130         }
131     }
132
133   return s;
134 }
135
136 /* Format an IP46 address. */
137 u8 *
138 format_ip46_address (u8 * s, va_list * args)
139 {
140   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
141   ip46_type_t type = va_arg (*args, ip46_type_t);
142   int is_ip4 = 1;
143
144   switch (type)
145     {
146     case IP46_TYPE_ANY:
147       is_ip4 = ip46_address_is_ip4 (ip46);
148       break;
149     case IP46_TYPE_IP4:
150       is_ip4 = 1;
151       break;
152     case IP46_TYPE_IP6:
153       is_ip4 = 0;
154       break;
155     }
156
157   return is_ip4 ?
158     format (s, "%U", format_ip4_address, &ip46->ip4) :
159     format (s, "%U", format_ip6_address, &ip46->ip6);
160 }
161
162 /*
163  * VPPCOM Utility Functions
164  */
165
166 static void
167 vcl_msg_add_ext_config (vcl_session_t *s, uword *offset)
168 {
169   svm_fifo_chunk_t *c;
170
171   c = vcl_segment_alloc_chunk (vcl_vpp_worker_segment_handle (0),
172                                0 /* one slice only */, s->ext_config->len,
173                                offset);
174   if (c)
175     clib_memcpy_fast (c->data, s->ext_config, s->ext_config->len);
176 }
177
178 static void
179 vcl_send_session_listen (vcl_worker_t * wrk, vcl_session_t * s)
180 {
181   app_session_evt_t _app_evt, *app_evt = &_app_evt;
182   session_listen_msg_t *mp;
183   svm_msg_q_t *mq;
184
185   mq = vcl_worker_ctrl_mq (wrk);
186   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_LISTEN);
187   mp = (session_listen_msg_t *) app_evt->evt->data;
188   memset (mp, 0, sizeof (*mp));
189   mp->client_index = wrk->api_client_handle;
190   mp->context = s->session_index;
191   mp->wrk_index = wrk->vpp_wrk_index;
192   mp->is_ip4 = s->transport.is_ip4;
193   clib_memcpy_fast (&mp->ip, &s->transport.lcl_ip, sizeof (mp->ip));
194   mp->port = s->transport.lcl_port;
195   mp->proto = s->session_type;
196   mp->vrf = s->vrf;
197   if (s->flags & VCL_SESSION_F_CONNECTED)
198     mp->flags = TRANSPORT_CFG_F_CONNECTED;
199   if (s->ext_config)
200     vcl_msg_add_ext_config (s, &mp->ext_config);
201   app_send_ctrl_evt_to_vpp (mq, app_evt);
202   if (s->ext_config)
203     {
204       clib_mem_free (s->ext_config);
205       s->ext_config = 0;
206     }
207 }
208
209 static void
210 vcl_send_session_connect (vcl_worker_t * wrk, vcl_session_t * s)
211 {
212   app_session_evt_t _app_evt, *app_evt = &_app_evt;
213   session_connect_msg_t *mp;
214   svm_msg_q_t *mq;
215
216   mq = vcl_worker_ctrl_mq (wrk);
217   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_CONNECT);
218   mp = (session_connect_msg_t *) app_evt->evt->data;
219   memset (mp, 0, sizeof (*mp));
220   mp->client_index = wrk->api_client_handle;
221   mp->context = s->session_index;
222   mp->wrk_index = wrk->vpp_wrk_index;
223   mp->is_ip4 = s->transport.is_ip4;
224   mp->parent_handle = s->parent_handle;
225   clib_memcpy_fast (&mp->ip, &s->transport.rmt_ip, sizeof (mp->ip));
226   clib_memcpy_fast (&mp->lcl_ip, &s->transport.lcl_ip, sizeof (mp->lcl_ip));
227   mp->port = s->transport.rmt_port;
228   mp->lcl_port = s->transport.lcl_port;
229   mp->proto = s->session_type;
230   mp->vrf = s->vrf;
231   if (s->flags & VCL_SESSION_F_CONNECTED)
232     mp->flags |= TRANSPORT_CFG_F_CONNECTED;
233   if (s->ext_config)
234     vcl_msg_add_ext_config (s, &mp->ext_config);
235   app_send_ctrl_evt_to_vpp (mq, app_evt);
236
237   if (s->ext_config)
238     {
239       clib_mem_free (s->ext_config);
240       s->ext_config = 0;
241     }
242 }
243
244 void
245 vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
246 {
247   app_session_evt_t _app_evt, *app_evt = &_app_evt;
248   session_unlisten_msg_t *mp;
249   svm_msg_q_t *mq;
250
251   mq = vcl_worker_ctrl_mq (wrk);
252   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_UNLISTEN);
253   mp = (session_unlisten_msg_t *) app_evt->evt->data;
254   memset (mp, 0, sizeof (*mp));
255   mp->client_index = wrk->api_client_handle;
256   mp->wrk_index = wrk->vpp_wrk_index;
257   mp->handle = s->vpp_handle;
258   mp->context = wrk->wrk_index;
259   app_send_ctrl_evt_to_vpp (mq, app_evt);
260 }
261
262 static void
263 vcl_send_session_shutdown (vcl_worker_t *wrk, vcl_session_t *s)
264 {
265   app_session_evt_t _app_evt, *app_evt = &_app_evt;
266   session_shutdown_msg_t *mp;
267   svm_msg_q_t *mq;
268
269   /* Send to thread that owns the session */
270   mq = s->vpp_evt_q;
271   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_SHUTDOWN);
272   mp = (session_shutdown_msg_t *) app_evt->evt->data;
273   memset (mp, 0, sizeof (*mp));
274   mp->client_index = wrk->api_client_handle;
275   mp->handle = s->vpp_handle;
276   app_send_ctrl_evt_to_vpp (mq, app_evt);
277 }
278
279 static void
280 vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s)
281 {
282   app_session_evt_t _app_evt, *app_evt = &_app_evt;
283   session_disconnect_msg_t *mp;
284   svm_msg_q_t *mq;
285
286   /* Send to thread that owns the session */
287   mq = s->vpp_evt_q;
288   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_DISCONNECT);
289   mp = (session_disconnect_msg_t *) app_evt->evt->data;
290   memset (mp, 0, sizeof (*mp));
291   mp->client_index = wrk->api_client_handle;
292   mp->handle = s->vpp_handle;
293   app_send_ctrl_evt_to_vpp (mq, app_evt);
294 }
295
296 static void
297 vcl_send_app_detach (vcl_worker_t * wrk)
298 {
299   app_session_evt_t _app_evt, *app_evt = &_app_evt;
300   session_app_detach_msg_t *mp;
301   svm_msg_q_t *mq;
302
303   mq = vcl_worker_ctrl_mq (wrk);
304   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_DETACH);
305   mp = (session_app_detach_msg_t *) app_evt->evt->data;
306   memset (mp, 0, sizeof (*mp));
307   mp->client_index = wrk->api_client_handle;
308   app_send_ctrl_evt_to_vpp (mq, app_evt);
309 }
310
311 static void
312 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
313                                  session_handle_t handle, int retval)
314 {
315   app_session_evt_t _app_evt, *app_evt = &_app_evt;
316   session_accepted_reply_msg_t *rmp;
317   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
318   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
319   rmp->handle = handle;
320   rmp->context = context;
321   rmp->retval = retval;
322   app_send_ctrl_evt_to_vpp (mq, app_evt);
323 }
324
325 static void
326 vcl_send_session_disconnected_reply (vcl_worker_t * wrk, vcl_session_t * s,
327                                      int retval)
328 {
329   app_session_evt_t _app_evt, *app_evt = &_app_evt;
330   session_disconnected_reply_msg_t *rmp;
331   app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
332                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
333   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
334   rmp->handle = s->vpp_handle;
335   rmp->context = wrk->api_client_handle;
336   rmp->retval = retval;
337   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
338 }
339
340 static void
341 vcl_send_session_reset_reply (vcl_worker_t * wrk, vcl_session_t * s,
342                               int retval)
343 {
344   app_session_evt_t _app_evt, *app_evt = &_app_evt;
345   session_reset_reply_msg_t *rmp;
346   app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
347                              SESSION_CTRL_EVT_RESET_REPLY);
348   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
349   rmp->handle = s->vpp_handle;
350   rmp->context = wrk->api_client_handle;
351   rmp->retval = retval;
352   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
353 }
354
355 void
356 vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
357                                 u32 wrk_index)
358 {
359   app_session_evt_t _app_evt, *app_evt = &_app_evt;
360   session_worker_update_msg_t *mp;
361
362   app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
363                              SESSION_CTRL_EVT_WORKER_UPDATE);
364   mp = (session_worker_update_msg_t *) app_evt->evt->data;
365   mp->client_index = wrk->api_client_handle;
366   mp->handle = s->vpp_handle;
367   mp->req_wrk_index = wrk->vpp_wrk_index;
368   mp->wrk_index = wrk_index;
369   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
370 }
371
372 int
373 vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len)
374 {
375   app_session_evt_t _app_evt, *app_evt = &_app_evt;
376   session_app_wrk_rpc_msg_t *mp;
377   vcl_worker_t *dst_wrk, *wrk;
378   svm_msg_q_t *mq;
379   int ret = -1;
380
381   if (data_len > sizeof (mp->data))
382     goto done;
383
384   clib_spinlock_lock (&vcm->workers_lock);
385
386   dst_wrk = vcl_worker_get_if_valid (dst_wrk_index);
387   if (!dst_wrk)
388     goto done;
389
390   wrk = vcl_worker_get_current ();
391   mq = vcl_worker_ctrl_mq (wrk);
392   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_WRK_RPC);
393   mp = (session_app_wrk_rpc_msg_t *) app_evt->evt->data;
394   mp->client_index = wrk->api_client_handle;
395   mp->wrk_index = dst_wrk->vpp_wrk_index;
396   clib_memcpy (mp->data, data, data_len);
397   app_send_ctrl_evt_to_vpp (mq, app_evt);
398   ret = 0;
399
400 done:
401   clib_spinlock_unlock (&vcm->workers_lock);
402   return ret;
403 }
404
405 int
406 vcl_session_transport_attr (vcl_worker_t *wrk, vcl_session_t *s, u8 is_get,
407                             transport_endpt_attr_t *attr)
408 {
409   app_session_evt_t _app_evt, *app_evt = &_app_evt;
410   session_transport_attr_msg_t *mp;
411   svm_msg_q_t *mq;
412   f64 timeout;
413
414   ASSERT (!wrk->session_attr_op);
415   wrk->session_attr_op = 1;
416   wrk->session_attr_op_rv = -1;
417
418   mq = s->vpp_evt_q;
419   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_TRANSPORT_ATTR);
420   mp = (session_transport_attr_msg_t *) app_evt->evt->data;
421   memset (mp, 0, sizeof (*mp));
422   mp->client_index = wrk->api_client_handle;
423   mp->handle = s->vpp_handle;
424   mp->is_get = is_get;
425   mp->attr = *attr;
426   app_send_ctrl_evt_to_vpp (mq, app_evt);
427
428   timeout = clib_time_now (&wrk->clib_time) + 1;
429
430   while (wrk->session_attr_op && clib_time_now (&wrk->clib_time) < timeout)
431     vcl_flush_mq_events ();
432
433   if (!wrk->session_attr_op_rv && is_get)
434     *attr = wrk->session_attr_rv;
435
436   wrk->session_attr_op = 0;
437
438   return wrk->session_attr_op_rv;
439 }
440
441 static u32
442 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
443                               u32 ls_index)
444 {
445   vcl_session_t *session, *listen_session;
446   svm_msg_q_t *evt_q;
447
448   session = vcl_session_alloc (wrk);
449
450   listen_session = vcl_session_get (wrk, ls_index);
451   if (listen_session->vpp_handle != mp->listener_handle)
452     {
453       VDBG (0, "ERROR: listener handle %lu does not match session %u",
454             mp->listener_handle, ls_index);
455       goto error;
456     }
457
458   if (vcl_segment_attach_session (mp->segment_handle, mp->server_rx_fifo,
459                                   mp->server_tx_fifo,
460                                   mp->vpp_event_queue_address, 0, session))
461     {
462       VDBG (0, "failed to attach fifos for %u", session->session_index);
463       goto error;
464     }
465
466   session->vpp_handle = mp->handle;
467   session->session_state = VCL_STATE_READY;
468   session->transport.rmt_port = mp->rmt.port;
469   session->transport.is_ip4 = mp->rmt.is_ip4;
470   clib_memcpy_fast (&session->transport.rmt_ip, &mp->rmt.ip,
471                     sizeof (ip46_address_t));
472
473   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
474   session->transport.lcl_port = mp->lcl.port;
475   session->transport.lcl_ip = mp->lcl.ip;
476   session->session_type = listen_session->session_type;
477   session->is_dgram = vcl_proto_is_dgram (session->session_type);
478   session->listener_index = listen_session->session_index;
479   listen_session->n_accepted_sessions++;
480
481   VDBG (1, "session %u [0x%llx]: client accept request from %s address %U"
482         " port %d queue %p!", session->session_index, mp->handle,
483         mp->rmt.is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->rmt.ip,
484         mp->rmt.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
485         clib_net_to_host_u16 (mp->rmt.port), session->vpp_evt_q);
486   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
487
488   vcl_send_session_accepted_reply (session->vpp_evt_q, mp->context,
489                                    session->vpp_handle, 0);
490
491   return session->session_index;
492
493 error:
494   vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0),
495                          mp->vpp_event_queue_address, mp->mq_index, &evt_q);
496   vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
497                                    VNET_API_ERROR_INVALID_ARGUMENT);
498   vcl_session_free (wrk, session);
499   return VCL_INVALID_SESSION_INDEX;
500 }
501
502 static u32
503 vcl_session_connected_handler (vcl_worker_t * wrk,
504                                session_connected_msg_t * mp)
505 {
506   vcl_session_t *session = 0;
507   u32 session_index;
508
509   session_index = mp->context;
510   session = vcl_session_get (wrk, session_index);
511   if (!session)
512     {
513       VDBG (0, "ERROR: vpp handle 0x%llx has no session index (%u)!",
514             mp->handle, session_index);
515       return VCL_INVALID_SESSION_INDEX;
516     }
517   if (mp->retval)
518     {
519       VDBG (0, "ERROR: session index %u: connect failed! %U",
520             session_index, format_session_error, mp->retval);
521       session->session_state = VCL_STATE_DETACHED;
522       session->vpp_handle = mp->handle;
523       return session_index;
524     }
525
526   session->vpp_handle = mp->handle;
527
528   if (vcl_segment_attach_session (mp->segment_handle, mp->server_rx_fifo,
529                                   mp->server_tx_fifo,
530                                   mp->vpp_event_queue_address, 0, session))
531     {
532       VDBG (0, "failed to attach fifos for %u", session->session_index);
533       session->session_state = VCL_STATE_DETACHED;
534       vcl_send_session_disconnect (wrk, session);
535       return session_index;
536     }
537
538   if (mp->ct_rx_fifo)
539     {
540       if (vcl_segment_attach_session (mp->ct_segment_handle, mp->ct_rx_fifo,
541                                       mp->ct_tx_fifo, (uword) ~0, 1, session))
542         {
543           VDBG (0, "failed to attach ct fifos for %u", session->session_index);
544           session->session_state = VCL_STATE_DETACHED;
545           vcl_send_session_disconnect (wrk, session);
546           return session_index;
547         }
548     }
549
550   session->transport.is_ip4 = mp->lcl.is_ip4;
551   clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip,
552                     sizeof (session->transport.lcl_ip));
553   session->transport.lcl_port = mp->lcl.port;
554
555   /* Application closed session before connect reply */
556   if (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK)
557       && session->session_state == VCL_STATE_CLOSED)
558     vcl_send_session_disconnect (wrk, session);
559   else
560     session->session_state = VCL_STATE_READY;
561
562   /* Add it to lookup table */
563   vcl_session_table_add_vpp_handle (wrk, mp->handle, session_index);
564
565   VDBG (1, "session %u [0x%llx] connected! rx_fifo %p, refcnt %d, tx_fifo %p,"
566         " refcnt %d", session_index, mp->handle, session->rx_fifo,
567         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
568
569   return session_index;
570 }
571
572 static int
573 vcl_flag_accepted_session (vcl_session_t * session, u64 handle, u32 flags)
574 {
575   vcl_session_msg_t *accepted_msg;
576   int i;
577
578   for (i = 0; i < vec_len (session->accept_evts_fifo); i++)
579     {
580       accepted_msg = &session->accept_evts_fifo[i];
581       if (accepted_msg->accepted_msg.handle == handle)
582         {
583           accepted_msg->flags |= flags;
584           return 1;
585         }
586     }
587   return 0;
588 }
589
590 static u32
591 vcl_session_reset_handler (vcl_worker_t * wrk,
592                            session_reset_msg_t * reset_msg)
593 {
594   vcl_session_t *session;
595   u32 sid;
596
597   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
598   session = vcl_session_get (wrk, sid);
599   if (!session)
600     {
601       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
602       return VCL_INVALID_SESSION_INDEX;
603     }
604
605   /* Caught a reset before actually accepting the session */
606   if (session->session_state == VCL_STATE_LISTEN)
607     {
608
609       if (!vcl_flag_accepted_session (session, reset_msg->handle,
610                                       VCL_ACCEPTED_F_RESET))
611         VDBG (0, "session was not accepted!");
612       return VCL_INVALID_SESSION_INDEX;
613     }
614
615   if (session->session_state != VCL_STATE_CLOSED)
616     session->session_state = VCL_STATE_DISCONNECT;
617   VDBG (0, "reset session %u [0x%llx]", sid, reset_msg->handle);
618   return sid;
619 }
620
621 static u32
622 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
623 {
624   vcl_session_t *session;
625   u32 sid = mp->context;
626
627   session = vcl_session_get (wrk, sid);
628   if (mp->retval)
629     {
630       VERR ("session %u [0x%llx]: bind failed: %U", sid, mp->handle,
631             format_session_error, mp->retval);
632       if (session)
633         {
634           session->session_state = VCL_STATE_DETACHED;
635           session->vpp_handle = mp->handle;
636           return sid;
637         }
638       else
639         {
640           VDBG (0, "ERROR: session %u [0x%llx]: Invalid session index!",
641                 sid, mp->handle);
642           return VCL_INVALID_SESSION_INDEX;
643         }
644     }
645
646   session->vpp_handle = mp->handle;
647   session->transport.is_ip4 = mp->lcl_is_ip4;
648   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
649                     sizeof (ip46_address_t));
650   session->transport.lcl_port = mp->lcl_port;
651   vcl_session_table_add_listener (wrk, mp->handle, sid);
652   session->session_state = VCL_STATE_LISTEN;
653
654   if (vcl_session_is_cl (session))
655     {
656       if (vcl_segment_attach_session (mp->segment_handle, mp->rx_fifo,
657                                       mp->tx_fifo, mp->vpp_evt_q, 0, session))
658         {
659           VDBG (0, "failed to attach fifos for %u", session->session_index);
660           session->session_state = VCL_STATE_DETACHED;
661           return VCL_INVALID_SESSION_INDEX;
662         }
663     }
664
665   VDBG (0, "session %u [0x%llx]: listen succeeded!", sid, mp->handle);
666   return sid;
667 }
668
669 static void
670 vcl_session_unlisten_reply_handler (vcl_worker_t * wrk, void *data)
671 {
672   session_unlisten_reply_msg_t *mp = (session_unlisten_reply_msg_t *) data;
673   vcl_session_t *s;
674
675   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
676   if (!s)
677     {
678       VDBG (0, "Unlisten reply with wrong handle %llx", mp->handle);
679       return;
680     }
681   if (s->session_state != VCL_STATE_DISCONNECT)
682     {
683       /* Connected udp listener */
684       if (s->session_type == VPPCOM_PROTO_UDP
685           && s->session_state == VCL_STATE_CLOSED)
686         return;
687
688       VDBG (0, "Unlisten session in wrong state %llx", mp->handle);
689       return;
690     }
691
692   if (mp->retval)
693     VDBG (0, "ERROR: session %u [0xllx]: unlisten failed: %U",
694           s->session_index, mp->handle, format_session_error, mp->retval);
695
696   if (mp->context != wrk->wrk_index)
697     VDBG (0, "wrong context");
698
699   vcl_session_table_del_vpp_handle (wrk, mp->handle);
700   vcl_session_free (wrk, s);
701 }
702
703 static void
704 vcl_session_migrated_handler (vcl_worker_t * wrk, void *data)
705 {
706   session_migrated_msg_t *mp = (session_migrated_msg_t *) data;
707   vcl_session_t *s;
708   u32 fs_index;
709
710   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
711   if (!s)
712     {
713       VDBG (0, "Migrated notification with wrong handle %llx", mp->handle);
714       return;
715     }
716
717   /* Only validate if a value is provided */
718   if (mp->segment_handle != SESSION_INVALID_HANDLE)
719     {
720       fs_index = vcl_segment_table_lookup (mp->segment_handle);
721       if (fs_index == VCL_INVALID_SEGMENT_INDEX)
722         {
723           VDBG (0, "segment %lx for session %u is not mounted!",
724                 mp->segment_handle, s->session_index);
725           s->session_state = VCL_STATE_DETACHED;
726           return;
727         }
728     }
729
730   s->vpp_handle = mp->new_handle;
731
732   vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0), mp->vpp_evt_q,
733                          mp->vpp_thread_index, &s->vpp_evt_q);
734
735   vcl_session_table_del_vpp_handle (wrk, mp->handle);
736   vcl_session_table_add_vpp_handle (wrk, mp->new_handle, s->session_index);
737
738   /* Generate new tx event if we have outstanding data */
739   if (svm_fifo_has_event (s->tx_fifo))
740     app_send_io_evt_to_vpp (s->vpp_evt_q,
741                             s->tx_fifo->shr->master_session_index,
742                             SESSION_IO_EVT_TX, SVM_Q_WAIT);
743
744   VDBG (0, "Migrated 0x%lx to thread %u 0x%lx", mp->handle,
745         mp->vpp_thread_index, mp->new_handle);
746 }
747
748 static vcl_session_t *
749 vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg)
750 {
751   vcl_session_msg_t *vcl_msg;
752   vcl_session_t *session;
753
754   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
755   if (PREDICT_FALSE (session != 0))
756     VWRN ("session overlap handle %lu state %u!", msg->handle,
757           session->session_state);
758
759   session = vcl_session_table_lookup_listener (wrk, msg->listener_handle);
760   if (!session)
761     {
762       VERR ("couldn't find listen session: listener handle %llx",
763             msg->listener_handle);
764       return 0;
765     }
766
767   clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
768   vcl_msg->flags = 0;
769   vcl_msg->accepted_msg = *msg;
770   /* Session handle points to listener until fully accepted by app */
771   vcl_session_table_add_vpp_handle (wrk, msg->handle, session->session_index);
772
773   return session;
774 }
775
776 static vcl_session_t *
777 vcl_session_disconnected_handler (vcl_worker_t * wrk,
778                                   session_disconnected_msg_t * msg)
779 {
780   vcl_session_t *session;
781
782   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
783   if (!session)
784     {
785       VDBG (0, "request to disconnect unknown handle 0x%llx", msg->handle);
786       return 0;
787     }
788
789   /* Late disconnect notification on a session that has been closed */
790   if (session->session_state == VCL_STATE_CLOSED)
791     return 0;
792
793   /* Caught a disconnect before actually accepting the session */
794   if (session->session_state == VCL_STATE_LISTEN)
795     {
796       if (!vcl_flag_accepted_session (session, msg->handle,
797                                       VCL_ACCEPTED_F_CLOSED))
798         VDBG (0, "session was not accepted!");
799       return 0;
800     }
801
802   /* If not already reset change state */
803   if (session->session_state != VCL_STATE_DISCONNECT)
804     session->session_state = VCL_STATE_VPP_CLOSING;
805
806   return session;
807 }
808
809 int
810 vppcom_session_shutdown (uint32_t session_handle, int how)
811 {
812   vcl_worker_t *wrk = vcl_worker_get_current ();
813   vcl_session_t *session;
814   vcl_session_state_t state;
815   u64 vpp_handle;
816
817   session = vcl_session_get_w_handle (wrk, session_handle);
818   if (PREDICT_FALSE (!session))
819     return VPPCOM_EBADFD;
820
821   vpp_handle = session->vpp_handle;
822   state = session->session_state;
823
824   VDBG (1, "session %u [0x%llx] state 0x%x (%s)", session->session_index,
825         vpp_handle, state, vppcom_session_state_str (state));
826
827   if (PREDICT_FALSE (state == VCL_STATE_LISTEN))
828     {
829       VDBG (0, "ERROR: Cannot shutdown a listen socket!");
830       return VPPCOM_EBADFD;
831     }
832
833   if (how == SHUT_RD || how == SHUT_RDWR)
834     {
835       session->flags |= VCL_SESSION_F_RD_SHUTDOWN;
836       if (how == SHUT_RD)
837         return VPPCOM_OK;
838     }
839   session->flags |= VCL_SESSION_F_WR_SHUTDOWN;
840
841   if (PREDICT_TRUE (state == VCL_STATE_READY))
842     {
843       VDBG (1, "session %u [0x%llx]: sending shutdown...",
844             session->session_index, vpp_handle);
845
846       vcl_send_session_shutdown (wrk, session);
847     }
848
849   return VPPCOM_OK;
850 }
851
852 static int
853 vppcom_session_disconnect (u32 session_handle)
854 {
855   vcl_worker_t *wrk = vcl_worker_get_current ();
856   vcl_session_t *session, *listen_session;
857   vcl_session_state_t state;
858   u64 vpp_handle;
859
860   session = vcl_session_get_w_handle (wrk, session_handle);
861   if (!session)
862     return VPPCOM_EBADFD;
863
864   vpp_handle = session->vpp_handle;
865   state = session->session_state;
866
867   VDBG (1, "session %u [0x%llx] state 0x%x (%s)", session->session_index,
868         vpp_handle, state, vppcom_session_state_str (state));
869
870   if (PREDICT_FALSE (state == VCL_STATE_LISTEN))
871     {
872       VDBG (0, "ERROR: Cannot disconnect a listen socket!");
873       return VPPCOM_EBADFD;
874     }
875
876   if (state == VCL_STATE_VPP_CLOSING)
877     {
878       vcl_send_session_disconnected_reply (wrk, session, 0);
879       VDBG (1, "session %u [0x%llx]: sending disconnect REPLY...",
880             session->session_index, vpp_handle);
881     }
882   else
883     {
884       /* Session doesn't have an event queue yet. Probably a non-blocking
885        * connect. Wait for the reply */
886       if (PREDICT_FALSE (!session->vpp_evt_q))
887         return VPPCOM_OK;
888
889       VDBG (1, "session %u [0x%llx]: sending disconnect...",
890             session->session_index, vpp_handle);
891       vcl_send_session_disconnect (wrk, session);
892     }
893
894   if (session->listener_index != VCL_INVALID_SESSION_INDEX)
895     {
896       listen_session = vcl_session_get (wrk, session->listener_index);
897       listen_session->n_accepted_sessions--;
898     }
899
900   return VPPCOM_OK;
901 }
902
903 static void
904 vcl_session_cleanup_handler (vcl_worker_t * wrk, void *data)
905 {
906   session_cleanup_msg_t *msg;
907   vcl_session_t *session;
908
909   msg = (session_cleanup_msg_t *) data;
910   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
911   if (!session)
912     {
913       VDBG (0, "disconnect confirmed for unknown handle 0x%llx", msg->handle);
914       return;
915     }
916
917   if (msg->type == SESSION_CLEANUP_TRANSPORT)
918     {
919       /* Transport was cleaned up before we confirmed close. Probably the
920        * app is still waiting for some data that cannot be delivered.
921        * Confirm close to make sure everything is cleaned up.
922        * Move to undetermined state to ensure that the session is not
923        * removed before both vpp and the app cleanup.
924        * - If the app closes first, the session is moved to CLOSED state
925        *   and the session cleanup notification from vpp removes the
926        *   session.
927        * - If vpp cleans up the session first, the session is moved to
928        *   DETACHED state lower and subsequently the close from the app
929        *   frees the session
930        */
931       if (session->session_state == VCL_STATE_VPP_CLOSING)
932         {
933           vppcom_session_disconnect (vcl_session_handle (session));
934           session->session_state = VCL_STATE_UPDATED;
935         }
936       else if (session->session_state == VCL_STATE_DISCONNECT)
937         {
938           vcl_send_session_reset_reply (wrk, session, 0);
939           session->session_state = VCL_STATE_UPDATED;
940         }
941       return;
942     }
943
944   vcl_session_table_del_vpp_handle (wrk, msg->handle);
945   /* Should not happen. App did not close the connection so don't free it. */
946   if (session->session_state != VCL_STATE_CLOSED)
947     {
948       VDBG (0, "app did not close session %d", session->session_index);
949       session->session_state = VCL_STATE_DETACHED;
950       session->vpp_handle = VCL_INVALID_SESSION_HANDLE;
951       return;
952     }
953   vcl_session_free (wrk, session);
954 }
955
956 static void
957 vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
958 {
959   session_req_worker_update_msg_t *msg;
960   vcl_session_t *s;
961
962   msg = (session_req_worker_update_msg_t *) data;
963   s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
964   if (!s)
965     return;
966
967   vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
968 }
969
970 static void
971 vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
972 {
973   session_worker_update_reply_msg_t *msg;
974   vcl_session_t *s;
975
976   msg = (session_worker_update_reply_msg_t *) data;
977   s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
978   if (!s)
979     {
980       VDBG (0, "unknown handle 0x%llx", msg->handle);
981       return;
982     }
983
984   if (s->rx_fifo)
985     {
986       if (vcl_segment_attach_session (msg->segment_handle, msg->rx_fifo,
987                                       msg->tx_fifo, (uword) ~0, 0, s))
988         {
989           VDBG (0, "failed to attach fifos for %u", s->session_index);
990           return;
991         }
992     }
993   s->session_state = VCL_STATE_UPDATED;
994
995   VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
996         s->vpp_handle, wrk->wrk_index);
997 }
998
999 static int
1000 vcl_api_recv_fd (vcl_worker_t * wrk, int *fds, int n_fds)
1001 {
1002
1003   if (vcm->cfg.vpp_app_socket_api)
1004     return vcl_sapi_recv_fds (wrk, fds, n_fds);
1005
1006   return vcl_bapi_recv_fds (wrk, fds, n_fds);
1007 }
1008
1009 static void
1010 vcl_session_app_add_segment_handler (vcl_worker_t * wrk, void *data)
1011 {
1012   ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM;
1013   session_app_add_segment_msg_t *msg;
1014   u64 segment_handle;
1015   int fd = -1;
1016
1017   msg = (session_app_add_segment_msg_t *) data;
1018
1019   if (msg->fd_flags)
1020     {
1021       vcl_api_recv_fd (wrk, &fd, 1);
1022       seg_type = SSVM_SEGMENT_MEMFD;
1023     }
1024
1025   segment_handle = msg->segment_handle;
1026   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
1027     {
1028       clib_warning ("invalid segment handle");
1029       return;
1030     }
1031
1032   if (vcl_segment_attach (segment_handle, (char *) msg->segment_name,
1033                           seg_type, fd))
1034     {
1035       VDBG (0, "vcl_segment_attach ('%s') failed", msg->segment_name);
1036       return;
1037     }
1038
1039   VDBG (1, "mapped new segment '%s' size %d", msg->segment_name,
1040         msg->segment_size);
1041 }
1042
1043 static void
1044 vcl_session_app_del_segment_handler (vcl_worker_t * wrk, void *data)
1045 {
1046   session_app_del_segment_msg_t *msg = (session_app_del_segment_msg_t *) data;
1047   vcl_segment_detach (msg->segment_handle);
1048   VDBG (1, "Unmapped segment: %d", msg->segment_handle);
1049 }
1050
1051 static void
1052 vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data)
1053 {
1054   if (!vcm->wrk_rpc_fn)
1055     return;
1056
1057   (vcm->wrk_rpc_fn) (((session_app_wrk_rpc_msg_t *) data)->data);
1058 }
1059
1060 static void
1061 vcl_session_transport_attr_reply_handler (vcl_worker_t *wrk, void *data)
1062 {
1063   session_transport_attr_reply_msg_t *mp;
1064
1065   if (!wrk->session_attr_op)
1066     return;
1067
1068   mp = (session_transport_attr_reply_msg_t *) data;
1069
1070   wrk->session_attr_op_rv = mp->retval;
1071   wrk->session_attr_op = 0;
1072   wrk->session_attr_rv = mp->attr;
1073 }
1074
1075 static int
1076 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
1077 {
1078   session_disconnected_msg_t *disconnected_msg;
1079   session_connected_msg_t *connected_msg;
1080   session_reset_msg_t *reset_msg;
1081   session_event_t *ecpy;
1082   vcl_session_t *s;
1083   u32 sid;
1084
1085   switch (e->event_type)
1086     {
1087     case SESSION_IO_EVT_RX:
1088     case SESSION_IO_EVT_TX:
1089       s = vcl_session_get (wrk, e->session_index);
1090       if (!s || !vcl_session_is_open (s))
1091         break;
1092       vec_add1 (wrk->unhandled_evts_vector, *e);
1093       break;
1094     case SESSION_CTRL_EVT_BOUND:
1095       /* We can only wait for only one listen so not postponed */
1096       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
1097       break;
1098     case SESSION_CTRL_EVT_ACCEPTED:
1099       s = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
1100       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
1101         {
1102           vec_add2 (wrk->unhandled_evts_vector, ecpy, 1);
1103           *ecpy = *e;
1104           ecpy->postponed = 1;
1105           ecpy->session_index = s->session_index;
1106         }
1107       break;
1108     case SESSION_CTRL_EVT_CONNECTED:
1109       connected_msg = (session_connected_msg_t *) e->data;
1110       sid = vcl_session_connected_handler (wrk, connected_msg);
1111       if (!(s = vcl_session_get (wrk, sid)))
1112         break;
1113       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
1114         {
1115           vec_add2 (wrk->unhandled_evts_vector, ecpy, 1);
1116           *ecpy = *e;
1117           ecpy->postponed = 1;
1118           ecpy->session_index = s->session_index;
1119         }
1120       break;
1121     case SESSION_CTRL_EVT_DISCONNECTED:
1122       disconnected_msg = (session_disconnected_msg_t *) e->data;
1123       if (!(s = vcl_session_get_w_vpp_handle (wrk, disconnected_msg->handle)))
1124         break;
1125       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
1126         {
1127           vec_add1 (wrk->unhandled_evts_vector, *e);
1128           break;
1129         }
1130       if (!(s = vcl_session_disconnected_handler (wrk, disconnected_msg)))
1131         break;
1132       VDBG (0, "disconnected session %u [0x%llx]", s->session_index,
1133             s->vpp_handle);
1134       break;
1135     case SESSION_CTRL_EVT_RESET:
1136       reset_msg = (session_reset_msg_t *) e->data;
1137       if (!(s = vcl_session_get_w_vpp_handle (wrk, reset_msg->handle)))
1138         break;
1139       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
1140         {
1141           vec_add1 (wrk->unhandled_evts_vector, *e);
1142           break;
1143         }
1144       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
1145       break;
1146     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
1147       vcl_session_unlisten_reply_handler (wrk, e->data);
1148       break;
1149     case SESSION_CTRL_EVT_MIGRATED:
1150       vcl_session_migrated_handler (wrk, e->data);
1151       break;
1152     case SESSION_CTRL_EVT_CLEANUP:
1153       vcl_session_cleanup_handler (wrk, e->data);
1154       break;
1155     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
1156       vcl_session_req_worker_update_handler (wrk, e->data);
1157       break;
1158     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
1159       vcl_session_worker_update_reply_handler (wrk, e->data);
1160       break;
1161     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
1162       vcl_session_app_add_segment_handler (wrk, e->data);
1163       break;
1164     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
1165       vcl_session_app_del_segment_handler (wrk, e->data);
1166       break;
1167     case SESSION_CTRL_EVT_APP_WRK_RPC:
1168       vcl_worker_rpc_handler (wrk, e->data);
1169       break;
1170     case SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY:
1171       vcl_session_transport_attr_reply_handler (wrk, e->data);
1172       break;
1173     default:
1174       clib_warning ("unhandled %u", e->event_type);
1175     }
1176   return VPPCOM_OK;
1177 }
1178
1179 static int
1180 vppcom_wait_for_session_state_change (u32 session_index,
1181                                       vcl_session_state_t state,
1182                                       f64 wait_for_time)
1183 {
1184   vcl_worker_t *wrk = vcl_worker_get_current ();
1185   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
1186   vcl_session_t *volatile session;
1187   svm_msg_q_msg_t msg;
1188   session_event_t *e;
1189
1190   do
1191     {
1192       session = vcl_session_get (wrk, session_index);
1193       if (PREDICT_FALSE (!session))
1194         {
1195           return VPPCOM_EBADFD;
1196         }
1197       if (session->session_state == state)
1198         {
1199           return VPPCOM_OK;
1200         }
1201       if (session->session_state == VCL_STATE_DETACHED)
1202         {
1203           return VPPCOM_ECONNREFUSED;
1204         }
1205
1206       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
1207         {
1208           usleep (100);
1209           continue;
1210         }
1211       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1212       vcl_handle_mq_event (wrk, e);
1213       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1214     }
1215   while (clib_time_now (&wrk->clib_time) < timeout);
1216
1217   VDBG (0, "timeout waiting for state 0x%x (%s)", state,
1218         vppcom_session_state_str (state));
1219   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
1220
1221   return VPPCOM_ETIMEDOUT;
1222 }
1223
1224 static void
1225 vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
1226 {
1227   vcl_session_state_t state;
1228   vcl_session_t *s;
1229   u32 *sip;
1230
1231   if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
1232     return;
1233
1234   vec_foreach (sip, wrk->pending_session_wrk_updates)
1235   {
1236     s = vcl_session_get (wrk, *sip);
1237     vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
1238     state = s->session_state;
1239     vppcom_wait_for_session_state_change (s->session_index, VCL_STATE_UPDATED,
1240                                           5);
1241     s->session_state = state;
1242   }
1243   vec_reset_length (wrk->pending_session_wrk_updates);
1244 }
1245
1246 void
1247 vcl_worker_flush_mq_events (vcl_worker_t *wrk)
1248 {
1249   svm_msg_q_msg_t *msg;
1250   session_event_t *e;
1251   svm_msg_q_t *mq;
1252   int i;
1253
1254   mq = wrk->app_event_queue;
1255   vcl_mq_dequeue_batch (wrk, mq, ~0);
1256
1257   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
1258     {
1259       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
1260       e = svm_msg_q_msg_data (mq, msg);
1261       vcl_handle_mq_event (wrk, e);
1262       svm_msg_q_free_msg (mq, msg);
1263     }
1264   vec_reset_length (wrk->mq_msg_vector);
1265   vcl_handle_pending_wrk_updates (wrk);
1266 }
1267
1268 void
1269 vcl_flush_mq_events (void)
1270 {
1271   vcl_worker_flush_mq_events (vcl_worker_get_current ());
1272 }
1273
1274 static int
1275 vppcom_session_unbind (u32 session_handle)
1276 {
1277   vcl_worker_t *wrk = vcl_worker_get_current ();
1278   session_accepted_msg_t *accepted_msg;
1279   vcl_session_t *session = 0;
1280   vcl_session_msg_t *evt;
1281
1282   session = vcl_session_get_w_handle (wrk, session_handle);
1283   if (!session)
1284     return VPPCOM_EBADFD;
1285
1286   /* Flush pending accept events, if any */
1287   while (clib_fifo_elts (session->accept_evts_fifo))
1288     {
1289       clib_fifo_sub2 (session->accept_evts_fifo, evt);
1290       accepted_msg = &evt->accepted_msg;
1291       vcl_session_table_del_vpp_handle (wrk, accepted_msg->handle);
1292       vcl_send_session_accepted_reply (session->vpp_evt_q,
1293                                        accepted_msg->context,
1294                                        accepted_msg->handle, -1);
1295     }
1296   clib_fifo_free (session->accept_evts_fifo);
1297
1298   vcl_send_session_unlisten (wrk, session);
1299
1300   VDBG (1, "session %u [0x%llx]: sending unbind!", session->session_index,
1301         session->vpp_handle);
1302   vcl_evt (VCL_EVT_UNBIND, session);
1303
1304   session->vpp_handle = ~0;
1305   session->session_state = VCL_STATE_DISCONNECT;
1306
1307   return VPPCOM_OK;
1308 }
1309
1310 /**
1311  * Handle app exit
1312  *
1313  * Notify vpp of the disconnect and mark the worker as free. If we're the
1314  * last worker, do a full cleanup otherwise, since we're probably a forked
1315  * child, avoid syscalls as much as possible. We might've lost privileges.
1316  */
1317 void
1318 vppcom_app_exit (void)
1319 {
1320   if (!pool_elts (vcm->workers))
1321     return;
1322   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
1323   vcl_set_worker_index (~0);
1324   vcl_elog_stop (vcm);
1325 }
1326
1327 static int
1328 vcl_api_attach (void)
1329 {
1330   if (vcm->cfg.vpp_app_socket_api)
1331     return vcl_sapi_attach ();
1332
1333   return vcl_bapi_attach ();
1334 }
1335
1336 static void
1337 vcl_api_detach (vcl_worker_t * wrk)
1338 {
1339   vcl_send_app_detach (wrk);
1340
1341   if (vcm->cfg.vpp_app_socket_api)
1342     return vcl_sapi_detach (wrk);
1343
1344   return vcl_bapi_disconnect_from_vpp ();
1345 }
1346
1347 /*
1348  * VPPCOM Public API functions
1349  */
1350 int
1351 vppcom_app_create (const char *app_name)
1352 {
1353   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
1354   int rv;
1355
1356   if (vcm->is_init)
1357     {
1358       VDBG (1, "already initialized");
1359       return VPPCOM_EEXIST;
1360     }
1361
1362   vcm->is_init = 1;
1363   vppcom_cfg (&vcm->cfg);
1364   vcl_cfg = &vcm->cfg;
1365
1366   vcm->main_cpu = pthread_self ();
1367   vcm->main_pid = getpid ();
1368   vcm->app_name = format (0, "%s", app_name);
1369   fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
1370                           20 /* timeout in secs */ );
1371   pool_alloc (vcm->workers, vcl_cfg->max_workers);
1372   clib_spinlock_init (&vcm->workers_lock);
1373   clib_rwlock_init (&vcm->segment_table_lock);
1374   atexit (vppcom_app_exit);
1375   vcl_elog_init (vcm);
1376
1377   /* Allocate default worker */
1378   vcl_worker_alloc_and_init ();
1379
1380   if ((rv = vcl_api_attach ()))
1381     return rv;
1382
1383   VDBG (0, "app_name '%s', my_client_index %d (0x%x)", app_name,
1384         vcm->workers[0].api_client_handle, vcm->workers[0].api_client_handle);
1385
1386   return VPPCOM_OK;
1387 }
1388
1389 void
1390 vppcom_app_destroy (void)
1391 {
1392   vcl_worker_t *wrk, *current_wrk;
1393   void *heap;
1394
1395   if (!pool_elts (vcm->workers))
1396     return;
1397
1398   vcl_evt (VCL_EVT_DETACH, vcm);
1399
1400   current_wrk = vcl_worker_get_current ();
1401
1402   /* *INDENT-OFF* */
1403   pool_foreach (wrk, vcm->workers)  {
1404     if (current_wrk != wrk)
1405       vcl_worker_cleanup (wrk, 0 /* notify vpp */ );
1406   }
1407   /* *INDENT-ON* */
1408
1409   vcl_api_detach (current_wrk);
1410   vcl_worker_cleanup (current_wrk, 0 /* notify vpp */ );
1411
1412   vcl_elog_stop (vcm);
1413
1414   /*
1415    * Free the heap and fix vcm
1416    */
1417   heap = clib_mem_get_heap ();
1418   munmap (clib_mem_get_heap_base (heap), clib_mem_get_heap_size (heap));
1419
1420   vcm = &_vppcom_main;
1421   vcm->is_init = 0;
1422 }
1423
1424 int
1425 vppcom_session_create (u8 proto, u8 is_nonblocking)
1426 {
1427   vcl_worker_t *wrk = vcl_worker_get_current ();
1428   vcl_session_t *session;
1429
1430   session = vcl_session_alloc (wrk);
1431
1432   session->session_type = proto;
1433   session->session_state = VCL_STATE_CLOSED;
1434   session->vpp_handle = ~0;
1435   session->is_dgram = vcl_proto_is_dgram (proto);
1436
1437   if (is_nonblocking)
1438     vcl_session_set_attr (session, VCL_SESS_ATTR_NONBLOCK);
1439
1440   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
1441            is_nonblocking, session_index);
1442
1443   VDBG (0, "created session %u", session->session_index);
1444
1445   return vcl_session_handle (session);
1446 }
1447
1448 int
1449 vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * s,
1450                      vcl_session_handle_t sh, u8 do_disconnect)
1451 {
1452   int rv = VPPCOM_OK;
1453
1454   VDBG (1, "session %u [0x%llx] closing", s->session_index, s->vpp_handle);
1455
1456   if (s->flags & VCL_SESSION_F_IS_VEP)
1457     {
1458       u32 next_sh = s->vep.next_sh;
1459       while (next_sh != ~0)
1460         {
1461           rv = vppcom_epoll_ctl (sh, EPOLL_CTL_DEL, next_sh, 0);
1462           if (PREDICT_FALSE (rv < 0))
1463             VDBG (0, "vpp handle 0x%llx, sh %u: EPOLL_CTL_DEL vep_idx %u"
1464                   " failed! rv %d (%s)", s->vpp_handle, next_sh,
1465                   s->vep.vep_sh, rv, vppcom_retval_str (rv));
1466           next_sh = s->vep.next_sh;
1467         }
1468       goto free_session;
1469     }
1470
1471   if (s->flags & VCL_SESSION_F_IS_VEP_SESSION)
1472     {
1473       rv = vppcom_epoll_ctl (s->vep.vep_sh, EPOLL_CTL_DEL, sh, 0);
1474       if (rv < 0)
1475         VDBG (0, "session %u [0x%llx]: EPOLL_CTL_DEL vep_idx %u "
1476               "failed! rv %d (%s)", s->session_index, s->vpp_handle,
1477               s->vep.vep_sh, rv, vppcom_retval_str (rv));
1478     }
1479
1480   if (!do_disconnect)
1481     {
1482       VDBG (1, "session %u [0x%llx] disconnect skipped",
1483             s->session_index, s->vpp_handle);
1484       goto cleanup;
1485     }
1486
1487   if (s->session_state == VCL_STATE_LISTEN)
1488     {
1489       rv = vppcom_session_unbind (sh);
1490       if (PREDICT_FALSE (rv < 0))
1491         VDBG (0, "session %u [0x%llx]: listener unbind failed! "
1492               "rv %d (%s)", s->session_index, s->vpp_handle, rv,
1493               vppcom_retval_str (rv));
1494       return rv;
1495     }
1496   else if (vcl_session_is_ready (s)
1497            || (vcl_session_is_connectable_listener (wrk, s)))
1498     {
1499       rv = vppcom_session_disconnect (sh);
1500       if (PREDICT_FALSE (rv < 0))
1501         VDBG (0, "ERROR: session %u [0x%llx]: disconnect failed!"
1502               " rv %d (%s)", s->session_index, s->vpp_handle,
1503               rv, vppcom_retval_str (rv));
1504     }
1505   else if (s->session_state == VCL_STATE_DISCONNECT)
1506     {
1507       vcl_send_session_reset_reply (wrk, s, 0);
1508     }
1509   else if (s->session_state == VCL_STATE_DETACHED)
1510     {
1511       /* Should not happen. VPP cleaned up before app confirmed close */
1512       VDBG (0, "vpp freed session %d before close", s->session_index);
1513       goto free_session;
1514     }
1515
1516   s->session_state = VCL_STATE_CLOSED;
1517
1518   /* Session is removed only after vpp confirms the disconnect */
1519   return rv;
1520
1521 cleanup:
1522   vcl_session_table_del_vpp_handle (wrk, s->vpp_handle);
1523 free_session:
1524   vcl_session_free (wrk, s);
1525   vcl_evt (VCL_EVT_CLOSE, s, rv);
1526
1527   return rv;
1528 }
1529
1530 int
1531 vppcom_session_close (uint32_t session_handle)
1532 {
1533   vcl_worker_t *wrk = vcl_worker_get_current ();
1534   vcl_session_t *session;
1535
1536   session = vcl_session_get_w_handle (wrk, session_handle);
1537   if (!session)
1538     return VPPCOM_EBADFD;
1539   return vcl_session_cleanup (wrk, session, session_handle,
1540                               1 /* do_disconnect */ );
1541 }
1542
1543 int
1544 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
1545 {
1546   vcl_worker_t *wrk = vcl_worker_get_current ();
1547   vcl_session_t *session = 0;
1548
1549   if (!ep || !ep->ip)
1550     return VPPCOM_EINVAL;
1551
1552   session = vcl_session_get_w_handle (wrk, session_handle);
1553   if (!session)
1554     return VPPCOM_EBADFD;
1555
1556   if (session->flags & VCL_SESSION_F_IS_VEP)
1557     {
1558       VDBG (0, "ERROR: cannot bind to epoll session %u!",
1559             session->session_index);
1560       return VPPCOM_EBADFD;
1561     }
1562
1563   session->transport.is_ip4 = ep->is_ip4;
1564   if (ep->is_ip4)
1565     clib_memcpy_fast (&session->transport.lcl_ip.ip4, ep->ip,
1566                       sizeof (ip4_address_t));
1567   else
1568     clib_memcpy_fast (&session->transport.lcl_ip.ip6, ep->ip,
1569                       sizeof (ip6_address_t));
1570   session->transport.lcl_port = ep->port;
1571
1572   VDBG (0, "session %u handle %u: binding to local %s address %U port %u, "
1573         "proto %s", session->session_index, session_handle,
1574         session->transport.is_ip4 ? "IPv4" : "IPv6",
1575         format_ip46_address, &session->transport.lcl_ip,
1576         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1577         clib_net_to_host_u16 (session->transport.lcl_port),
1578         vppcom_proto_str (session->session_type));
1579   vcl_evt (VCL_EVT_BIND, session);
1580
1581   if (session->session_type == VPPCOM_PROTO_UDP)
1582     vppcom_session_listen (session_handle, 10);
1583
1584   return VPPCOM_OK;
1585 }
1586
1587 int
1588 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
1589 {
1590   vcl_worker_t *wrk = vcl_worker_get_current ();
1591   vcl_session_t *listen_session = 0;
1592   u64 listen_vpp_handle;
1593   int rv;
1594
1595   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1596   if (!listen_session || (listen_session->flags & VCL_SESSION_F_IS_VEP))
1597     return VPPCOM_EBADFD;
1598
1599   if (q_len == 0 || q_len == ~0)
1600     q_len = vcm->cfg.listen_queue_size;
1601
1602   listen_vpp_handle = listen_session->vpp_handle;
1603   if (listen_session->session_state == VCL_STATE_LISTEN)
1604     {
1605       VDBG (0, "session %u [0x%llx]: already in listen state!",
1606             listen_sh, listen_vpp_handle);
1607       return VPPCOM_OK;
1608     }
1609
1610   VDBG (0, "session %u: sending vpp listen request...", listen_sh);
1611
1612   /*
1613    * Send listen request to vpp and wait for reply
1614    */
1615   vcl_send_session_listen (wrk, listen_session);
1616   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1617                                              VCL_STATE_LISTEN,
1618                                              vcm->cfg.session_timeout);
1619
1620   if (PREDICT_FALSE (rv))
1621     {
1622       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1623       VDBG (0, "session %u [0x%llx]: listen failed! returning %d (%s)",
1624             listen_sh, listen_session->vpp_handle, rv,
1625             vppcom_retval_str (rv));
1626       return rv;
1627     }
1628
1629   return VPPCOM_OK;
1630 }
1631
1632 static int
1633 validate_args_session_accept_ (vcl_worker_t * wrk, vcl_session_t * ls)
1634 {
1635   if (ls->flags & VCL_SESSION_F_IS_VEP)
1636     {
1637       VDBG (0, "ERROR: cannot accept on epoll session %u!",
1638             ls->session_index);
1639       return VPPCOM_EBADFD;
1640     }
1641
1642   if ((ls->session_state != VCL_STATE_LISTEN)
1643       && (!vcl_session_is_connectable_listener (wrk, ls)))
1644     {
1645       VDBG (0, "ERROR: session [0x%llx]: not in listen state! state 0x%x"
1646             " (%s)", ls->vpp_handle, ls->session_state,
1647             vppcom_session_state_str (ls->session_state));
1648       return VPPCOM_EBADFD;
1649     }
1650   return VPPCOM_OK;
1651 }
1652
1653 int
1654 vppcom_unformat_proto (uint8_t * proto, char *proto_str)
1655 {
1656   if (!strcmp (proto_str, "TCP"))
1657     *proto = VPPCOM_PROTO_TCP;
1658   else if (!strcmp (proto_str, "tcp"))
1659     *proto = VPPCOM_PROTO_TCP;
1660   else if (!strcmp (proto_str, "UDP"))
1661     *proto = VPPCOM_PROTO_UDP;
1662   else if (!strcmp (proto_str, "udp"))
1663     *proto = VPPCOM_PROTO_UDP;
1664   else if (!strcmp (proto_str, "TLS"))
1665     *proto = VPPCOM_PROTO_TLS;
1666   else if (!strcmp (proto_str, "tls"))
1667     *proto = VPPCOM_PROTO_TLS;
1668   else if (!strcmp (proto_str, "QUIC"))
1669     *proto = VPPCOM_PROTO_QUIC;
1670   else if (!strcmp (proto_str, "quic"))
1671     *proto = VPPCOM_PROTO_QUIC;
1672   else if (!strcmp (proto_str, "DTLS"))
1673     *proto = VPPCOM_PROTO_DTLS;
1674   else if (!strcmp (proto_str, "dtls"))
1675     *proto = VPPCOM_PROTO_DTLS;
1676   else if (!strcmp (proto_str, "SRTP"))
1677     *proto = VPPCOM_PROTO_SRTP;
1678   else if (!strcmp (proto_str, "srtp"))
1679     *proto = VPPCOM_PROTO_SRTP;
1680   else
1681     return 1;
1682   return 0;
1683 }
1684
1685 int
1686 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1687                        uint32_t flags)
1688 {
1689   u32 client_session_index = ~0, listen_session_index, accept_flags = 0;
1690   vcl_worker_t *wrk = vcl_worker_get_current ();
1691   session_accepted_msg_t accepted_msg;
1692   vcl_session_t *listen_session = 0;
1693   vcl_session_t *client_session = 0;
1694   vcl_session_msg_t *evt;
1695   u8 is_nonblocking;
1696   int rv;
1697
1698 again:
1699
1700   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1701   if (!listen_session)
1702     return VPPCOM_EBADFD;
1703
1704   listen_session_index = listen_session->session_index;
1705   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1706     return rv;
1707
1708   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1709     {
1710       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1711       accept_flags = evt->flags;
1712       accepted_msg = evt->accepted_msg;
1713       goto handle;
1714     }
1715
1716   is_nonblocking = vcl_session_has_attr (listen_session,
1717                                          VCL_SESS_ATTR_NONBLOCK);
1718   while (1)
1719     {
1720       if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1721         return VPPCOM_EAGAIN;
1722
1723       svm_msg_q_wait (wrk->app_event_queue, SVM_MQ_WAIT_EMPTY);
1724       vcl_worker_flush_mq_events (wrk);
1725       goto again;
1726     }
1727
1728 handle:
1729
1730   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg,
1731                                                        listen_session_index);
1732   if (client_session_index == VCL_INVALID_SESSION_INDEX)
1733     return VPPCOM_ECONNABORTED;
1734
1735   listen_session = vcl_session_get (wrk, listen_session_index);
1736   client_session = vcl_session_get (wrk, client_session_index);
1737
1738   if (flags & O_NONBLOCK)
1739     vcl_session_set_attr (client_session, VCL_SESS_ATTR_NONBLOCK);
1740
1741   VDBG (1, "listener %u [0x%llx]: Got a connect request! session %u [0x%llx],"
1742         " flags %d, is_nonblocking %u", listen_session->session_index,
1743         listen_session->vpp_handle, client_session_index,
1744         client_session->vpp_handle, flags,
1745         vcl_session_has_attr (client_session, VCL_SESS_ATTR_NONBLOCK));
1746
1747   if (ep)
1748     {
1749       ep->is_ip4 = client_session->transport.is_ip4;
1750       ep->port = client_session->transport.rmt_port;
1751       if (client_session->transport.is_ip4)
1752         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip4,
1753                           sizeof (ip4_address_t));
1754       else
1755         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip6,
1756                           sizeof (ip6_address_t));
1757     }
1758
1759   VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
1760         "local: %U:%u", listen_session_handle, listen_session->vpp_handle,
1761         client_session_index, client_session->vpp_handle,
1762         format_ip46_address, &client_session->transport.rmt_ip,
1763         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1764         clib_net_to_host_u16 (client_session->transport.rmt_port),
1765         format_ip46_address, &client_session->transport.lcl_ip,
1766         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1767         clib_net_to_host_u16 (client_session->transport.lcl_port));
1768   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1769            client_session_index);
1770
1771   /*
1772    * Session might have been closed already
1773    */
1774   if (accept_flags)
1775     {
1776       if (accept_flags & VCL_ACCEPTED_F_CLOSED)
1777         client_session->session_state = VCL_STATE_VPP_CLOSING;
1778       else if (accept_flags & VCL_ACCEPTED_F_RESET)
1779         client_session->session_state = VCL_STATE_DISCONNECT;
1780     }
1781   return vcl_session_handle (client_session);
1782 }
1783
1784 int
1785 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1786 {
1787   vcl_worker_t *wrk = vcl_worker_get_current ();
1788   vcl_session_t *session = 0;
1789   u32 session_index;
1790   int rv;
1791
1792   session = vcl_session_get_w_handle (wrk, session_handle);
1793   if (!session)
1794     return VPPCOM_EBADFD;
1795   session_index = session->session_index;
1796
1797   if (PREDICT_FALSE (session->flags & VCL_SESSION_F_IS_VEP))
1798     {
1799       VDBG (0, "ERROR: cannot connect epoll session %u!",
1800             session->session_index);
1801       return VPPCOM_EBADFD;
1802     }
1803
1804   if (PREDICT_FALSE (vcl_session_is_ready (session)))
1805     {
1806       VDBG (0, "session handle %u [0x%llx]: session already "
1807             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1808             session_handle, session->vpp_handle,
1809             session->transport.is_ip4 ? "IPv4" : "IPv6", format_ip46_address,
1810             &session->transport.rmt_ip, session->transport.is_ip4 ?
1811             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1812             clib_net_to_host_u16 (session->transport.rmt_port),
1813             vppcom_proto_str (session->session_type), session->session_state,
1814             vppcom_session_state_str (session->session_state));
1815       return VPPCOM_OK;
1816     }
1817
1818   /* Attempt to connect a connectionless listener */
1819   if (PREDICT_FALSE (session->session_state == VCL_STATE_LISTEN))
1820     {
1821       if (session->session_type != VPPCOM_PROTO_UDP)
1822         return VPPCOM_EINVAL;
1823       vcl_send_session_unlisten (wrk, session);
1824       session->session_state = VCL_STATE_CLOSED;
1825     }
1826
1827   session->transport.is_ip4 = server_ep->is_ip4;
1828   vcl_ip_copy_from_ep (&session->transport.rmt_ip, server_ep);
1829   session->transport.rmt_port = server_ep->port;
1830   session->parent_handle = VCL_INVALID_SESSION_HANDLE;
1831   session->flags |= VCL_SESSION_F_CONNECTED;
1832
1833   VDBG (0, "session handle %u (%s): connecting to peer %s %U "
1834         "port %d proto %s", session_handle,
1835         vppcom_session_state_str (session->session_state),
1836         session->transport.is_ip4 ? "IPv4" : "IPv6",
1837         format_ip46_address,
1838         &session->transport.rmt_ip, session->transport.is_ip4 ?
1839         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1840         clib_net_to_host_u16 (session->transport.rmt_port),
1841         vppcom_proto_str (session->session_type));
1842
1843   vcl_send_session_connect (wrk, session);
1844
1845   if (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK))
1846     {
1847       /* State set to STATE_UPDATED to ensure the session is not assumed
1848        * to be ready and to also allow the app to close it prior to vpp's
1849        * connected reply. */
1850       session->session_state = VCL_STATE_UPDATED;
1851       return VPPCOM_EINPROGRESS;
1852     }
1853
1854   /*
1855    * Wait for reply from vpp if blocking
1856    */
1857   rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY,
1858                                              vcm->cfg.session_timeout);
1859
1860   session = vcl_session_get (wrk, session_index);
1861   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1862         session->vpp_handle, rv ? "failed" : "succeeded");
1863
1864   return rv;
1865 }
1866
1867 int
1868 vppcom_session_stream_connect (uint32_t session_handle,
1869                                uint32_t parent_session_handle)
1870 {
1871   vcl_worker_t *wrk = vcl_worker_get_current ();
1872   vcl_session_t *session, *parent_session;
1873   u32 session_index, parent_session_index;
1874   int rv;
1875
1876   session = vcl_session_get_w_handle (wrk, session_handle);
1877   if (!session)
1878     return VPPCOM_EBADFD;
1879   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1880   if (!parent_session)
1881     return VPPCOM_EBADFD;
1882
1883   session_index = session->session_index;
1884   parent_session_index = parent_session->session_index;
1885   if (PREDICT_FALSE (session->flags & VCL_SESSION_F_IS_VEP))
1886     {
1887       VDBG (0, "ERROR: cannot connect epoll session %u!",
1888             session->session_index);
1889       return VPPCOM_EBADFD;
1890     }
1891
1892   if (PREDICT_FALSE (vcl_session_is_ready (session)))
1893     {
1894       VDBG (0, "session handle %u [0x%llx]: session already "
1895             "connected to session %u [0x%llx] proto %s, state 0x%x (%s)",
1896             session_handle, session->vpp_handle,
1897             parent_session_handle, parent_session->vpp_handle,
1898             vppcom_proto_str (session->session_type), session->session_state,
1899             vppcom_session_state_str (session->session_state));
1900       return VPPCOM_OK;
1901     }
1902
1903   /* Connect to quic session specifics */
1904   session->transport.is_ip4 = parent_session->transport.is_ip4;
1905   session->transport.rmt_ip.ip4.as_u32 = (uint32_t) 1;
1906   session->transport.rmt_port = 0;
1907   session->parent_handle = parent_session->vpp_handle;
1908
1909   VDBG (0, "session handle %u: connecting to session %u [0x%llx]",
1910         session_handle, parent_session_handle, parent_session->vpp_handle);
1911
1912   /*
1913    * Send connect request and wait for reply from vpp
1914    */
1915   vcl_send_session_connect (wrk, session);
1916   rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY,
1917                                              vcm->cfg.session_timeout);
1918
1919   session->listener_index = parent_session_index;
1920   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1921   if (parent_session)
1922     parent_session->n_accepted_sessions++;
1923
1924   session = vcl_session_get (wrk, session_index);
1925   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1926         session->vpp_handle, rv ? "failed" : "succeeded");
1927
1928   return rv;
1929 }
1930
1931 static inline int
1932 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1933                               u8 peek)
1934 {
1935   vcl_worker_t *wrk = vcl_worker_get_current ();
1936   int rv, n_read = 0, is_nonblocking;
1937   vcl_session_t *s = 0;
1938   svm_fifo_t *rx_fifo;
1939   session_event_t *e;
1940   svm_msg_q_t *mq;
1941   u8 is_ct;
1942
1943   if (PREDICT_FALSE (!buf))
1944     return VPPCOM_EINVAL;
1945
1946   s = vcl_session_get_w_handle (wrk, session_handle);
1947   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
1948     return VPPCOM_EBADFD;
1949
1950   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1951     {
1952       VDBG (0, "session %u[0x%llx] is not open! state 0x%x (%s)",
1953             s->session_index, s->vpp_handle, s->session_state,
1954             vppcom_session_state_str (s->session_state));
1955       return vcl_session_closed_error (s);
1956     }
1957
1958   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_RD_SHUTDOWN))
1959     {
1960       /* Vpp would ack the incoming data and enqueue it for reading.
1961        * So even if SHUT_RD is set, we can still read() the data if
1962        * the session is ready.
1963        */
1964       if (!vcl_session_read_ready (s))
1965         {
1966           return 0;
1967         }
1968     }
1969
1970   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
1971   is_ct = vcl_session_is_ct (s);
1972   mq = wrk->app_event_queue;
1973   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
1974   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
1975
1976   if (svm_fifo_is_empty_cons (rx_fifo))
1977     {
1978       if (is_nonblocking)
1979         {
1980           if (vcl_session_is_closing (s))
1981             return vcl_session_closing_error (s);
1982           if (is_ct)
1983             svm_fifo_unset_event (s->rx_fifo);
1984           svm_fifo_unset_event (rx_fifo);
1985           return VPPCOM_EWOULDBLOCK;
1986         }
1987       while (svm_fifo_is_empty_cons (rx_fifo))
1988         {
1989           if (vcl_session_is_closing (s))
1990             return vcl_session_closing_error (s);
1991
1992           if (is_ct)
1993             svm_fifo_unset_event (s->rx_fifo);
1994           svm_fifo_unset_event (rx_fifo);
1995
1996           svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
1997           vcl_worker_flush_mq_events (wrk);
1998         }
1999     }
2000
2001 read_again:
2002
2003   if (s->is_dgram)
2004     rv = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
2005   else
2006     rv = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
2007
2008   ASSERT (rv >= 0);
2009
2010   if (peek)
2011     return rv;
2012
2013   n_read += rv;
2014
2015   if (svm_fifo_is_empty_cons (rx_fifo))
2016     {
2017       if (is_ct)
2018         svm_fifo_unset_event (s->rx_fifo);
2019       svm_fifo_unset_event (rx_fifo);
2020       if (!svm_fifo_is_empty_cons (rx_fifo)
2021           && svm_fifo_set_event (rx_fifo) && is_nonblocking)
2022         {
2023           vec_add2 (wrk->unhandled_evts_vector, e, 1);
2024           e->event_type = SESSION_IO_EVT_RX;
2025           e->session_index = s->session_index;
2026         }
2027     }
2028   else if (PREDICT_FALSE (rv < n && !s->is_dgram))
2029     {
2030       /* More data enqueued while reading. Try to drain it
2031        * or fill the buffer. Avoid doing that for dgrams */
2032       buf += rv;
2033       n -= rv;
2034       goto read_again;
2035     }
2036
2037   if (PREDICT_FALSE (svm_fifo_needs_deq_ntf (rx_fifo, n_read)))
2038     {
2039       svm_fifo_clear_deq_ntf (rx_fifo);
2040       app_send_io_evt_to_vpp (s->vpp_evt_q,
2041                               s->rx_fifo->shr->master_session_index,
2042                               SESSION_IO_EVT_RX, SVM_Q_WAIT);
2043     }
2044
2045   VDBG (2, "session %u[0x%llx]: read %d bytes from (%p)", s->session_index,
2046         s->vpp_handle, n_read, rx_fifo);
2047
2048   return n_read;
2049 }
2050
2051 int
2052 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
2053 {
2054   return (vppcom_session_read_internal (session_handle, buf, n, 0));
2055 }
2056
2057 static int
2058 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
2059 {
2060   return (vppcom_session_read_internal (session_handle, buf, n, 1));
2061 }
2062
2063 int
2064 vppcom_session_read_segments (uint32_t session_handle,
2065                               vppcom_data_segment_t * ds, uint32_t n_segments,
2066                               uint32_t max_bytes)
2067 {
2068   vcl_worker_t *wrk = vcl_worker_get_current ();
2069   int n_read = 0, is_nonblocking;
2070   vcl_session_t *s = 0;
2071   svm_fifo_t *rx_fifo;
2072   svm_msg_q_t *mq;
2073   u8 is_ct;
2074
2075   s = vcl_session_get_w_handle (wrk, session_handle);
2076   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
2077     return VPPCOM_EBADFD;
2078
2079   if (PREDICT_FALSE (!vcl_session_is_open (s)))
2080     return vcl_session_closed_error (s);
2081
2082   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
2083   is_ct = vcl_session_is_ct (s);
2084   mq = wrk->app_event_queue;
2085   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
2086   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
2087
2088   if (svm_fifo_is_empty_cons (rx_fifo))
2089     {
2090       if (is_nonblocking)
2091         {
2092           if (is_ct)
2093             svm_fifo_unset_event (s->rx_fifo);
2094           svm_fifo_unset_event (rx_fifo);
2095           return VPPCOM_EWOULDBLOCK;
2096         }
2097       while (svm_fifo_is_empty_cons (rx_fifo))
2098         {
2099           if (vcl_session_is_closing (s))
2100             return vcl_session_closing_error (s);
2101
2102           if (is_ct)
2103             svm_fifo_unset_event (s->rx_fifo);
2104           svm_fifo_unset_event (rx_fifo);
2105
2106           svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
2107           vcl_worker_flush_mq_events (wrk);
2108         }
2109     }
2110
2111   n_read = svm_fifo_segments (rx_fifo, s->rx_bytes_pending,
2112                               (svm_fifo_seg_t *) ds, n_segments, max_bytes);
2113   if (n_read < 0)
2114     return VPPCOM_EAGAIN;
2115
2116   if (svm_fifo_max_dequeue_cons (rx_fifo) == n_read)
2117     {
2118       if (is_ct)
2119         svm_fifo_unset_event (s->rx_fifo);
2120       svm_fifo_unset_event (rx_fifo);
2121       if (svm_fifo_max_dequeue_cons (rx_fifo) != n_read
2122           && svm_fifo_set_event (rx_fifo)
2123           && vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
2124         {
2125           session_event_t *e;
2126           vec_add2 (wrk->unhandled_evts_vector, e, 1);
2127           e->event_type = SESSION_IO_EVT_RX;
2128           e->session_index = s->session_index;
2129         }
2130     }
2131
2132   s->rx_bytes_pending += n_read;
2133   return n_read;
2134 }
2135
2136 void
2137 vppcom_session_free_segments (uint32_t session_handle, uint32_t n_bytes)
2138 {
2139   vcl_worker_t *wrk = vcl_worker_get_current ();
2140   vcl_session_t *s;
2141   u8 is_ct;
2142
2143   s = vcl_session_get_w_handle (wrk, session_handle);
2144   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
2145     return;
2146
2147   is_ct = vcl_session_is_ct (s);
2148   svm_fifo_dequeue_drop (is_ct ? s->ct_rx_fifo : s->rx_fifo, n_bytes);
2149
2150   ASSERT (s->rx_bytes_pending < n_bytes);
2151   s->rx_bytes_pending -= n_bytes;
2152 }
2153
2154 always_inline u8
2155 vcl_fifo_is_writeable (svm_fifo_t * f, u32 len, u8 is_dgram)
2156 {
2157   u32 max_enq = svm_fifo_max_enqueue_prod (f);
2158   if (is_dgram)
2159     return max_enq >= (sizeof (session_dgram_hdr_t) + len);
2160   else
2161     return max_enq > 0;
2162 }
2163
2164 always_inline int
2165 vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
2166                              size_t n, u8 is_flush, u8 is_dgram)
2167 {
2168   int n_write, is_nonblocking;
2169   session_evt_type_t et;
2170   svm_fifo_t *tx_fifo;
2171   svm_msg_q_t *mq;
2172   u8 is_ct;
2173
2174   /* Accept zero length writes but just return */
2175   if (PREDICT_FALSE (!n))
2176     return VPPCOM_OK;
2177
2178   if (PREDICT_FALSE (!buf))
2179     return VPPCOM_EFAULT;
2180
2181   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2182     {
2183       VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
2184             " session!", s->session_index, s->vpp_handle);
2185       return VPPCOM_EBADFD;
2186     }
2187
2188   if (PREDICT_FALSE (!vcl_session_is_open (s)))
2189     {
2190       VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
2191             s->session_index, s->vpp_handle, s->session_state,
2192             vppcom_session_state_str (s->session_state));
2193       return vcl_session_closed_error (s);;
2194     }
2195
2196   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_WR_SHUTDOWN))
2197     {
2198       VDBG (1, "session %u [0x%llx]: is shutdown! state 0x%x (%s)",
2199             s->session_index, s->vpp_handle, s->session_state,
2200             vppcom_session_state_str (s->session_state));
2201       return VPPCOM_EPIPE;
2202     }
2203
2204   is_ct = vcl_session_is_ct (s);
2205   tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
2206   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
2207
2208   mq = wrk->app_event_queue;
2209   if (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2210     {
2211       if (is_nonblocking)
2212         {
2213           return VPPCOM_EWOULDBLOCK;
2214         }
2215       while (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2216         {
2217           svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
2218           if (vcl_session_is_closing (s))
2219             return vcl_session_closing_error (s);
2220
2221           svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
2222           vcl_worker_flush_mq_events (wrk);
2223         }
2224     }
2225
2226   et = SESSION_IO_EVT_TX;
2227   if (is_flush && !is_ct)
2228     et = SESSION_IO_EVT_TX_FLUSH;
2229
2230   if (is_dgram)
2231     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
2232                                   s->vpp_evt_q, buf, n, et,
2233                                   0 /* do_evt */ , SVM_Q_WAIT);
2234   else
2235     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
2236                                    0 /* do_evt */ , SVM_Q_WAIT);
2237
2238   if (svm_fifo_set_event (s->tx_fifo))
2239     app_send_io_evt_to_vpp (
2240       s->vpp_evt_q, s->tx_fifo->shr->master_session_index, et, SVM_Q_WAIT);
2241
2242   /* The underlying fifo segment can run out of memory */
2243   if (PREDICT_FALSE (n_write < 0))
2244     return VPPCOM_EAGAIN;
2245
2246   VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
2247         s->vpp_handle, n_write);
2248
2249   return n_write;
2250 }
2251
2252 int
2253 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
2254 {
2255   vcl_worker_t *wrk = vcl_worker_get_current ();
2256   vcl_session_t *s;
2257
2258   s = vcl_session_get_w_handle (wrk, session_handle);
2259   if (PREDICT_FALSE (!s))
2260     return VPPCOM_EBADFD;
2261
2262   return vppcom_session_write_inline (wrk, s, buf, n,
2263                                       0 /* is_flush */ , s->is_dgram ? 1 : 0);
2264 }
2265
2266 int
2267 vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n)
2268 {
2269   vcl_worker_t *wrk = vcl_worker_get_current ();
2270   vcl_session_t *s;
2271
2272   s = vcl_session_get_w_handle (wrk, session_handle);
2273   if (PREDICT_FALSE (!s))
2274     return VPPCOM_EBADFD;
2275
2276   return vppcom_session_write_inline (wrk, s, buf, n,
2277                                       1 /* is_flush */ , s->is_dgram ? 1 : 0);
2278 }
2279
2280 #define vcl_fifo_rx_evt_valid_or_break(_s)                              \
2281 if (PREDICT_FALSE (!_s->rx_fifo))                                       \
2282   break;                                                                \
2283 if (PREDICT_FALSE (svm_fifo_is_empty (_s->rx_fifo)))                    \
2284   {                                                                     \
2285     if (!vcl_session_is_ct (_s))                                        \
2286       {                                                                 \
2287         svm_fifo_unset_event (_s->rx_fifo);                             \
2288         if (svm_fifo_is_empty (_s->rx_fifo))                            \
2289           break;                                                        \
2290       }                                                                 \
2291     else if (svm_fifo_is_empty (_s->ct_rx_fifo))                        \
2292       {                                                                 \
2293         svm_fifo_unset_event (_s->rx_fifo); /* rx evts on actual fifo*/ \
2294         if (svm_fifo_is_empty (_s->ct_rx_fifo))                         \
2295           break;                                                        \
2296       }                                                                 \
2297   }                                                                     \
2298
2299 static void
2300 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2301                             unsigned long n_bits, unsigned long *read_map,
2302                             unsigned long *write_map,
2303                             unsigned long *except_map, u32 * bits_set)
2304 {
2305   session_disconnected_msg_t *disconnected_msg;
2306   session_connected_msg_t *connected_msg;
2307   vcl_session_t *s;
2308   u32 sid;
2309
2310   switch (e->event_type)
2311     {
2312     case SESSION_IO_EVT_RX:
2313       sid = e->session_index;
2314       s = vcl_session_get (wrk, sid);
2315       if (!s || !vcl_session_is_open (s))
2316         break;
2317       vcl_fifo_rx_evt_valid_or_break (s);
2318       if (sid < n_bits && read_map)
2319         {
2320           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2321           *bits_set += 1;
2322         }
2323       break;
2324     case SESSION_IO_EVT_TX:
2325       sid = e->session_index;
2326       s = vcl_session_get (wrk, sid);
2327       if (!s || !vcl_session_is_open (s))
2328         break;
2329       if (sid < n_bits && write_map)
2330         {
2331           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2332           *bits_set += 1;
2333         }
2334       break;
2335     case SESSION_CTRL_EVT_ACCEPTED:
2336       if (!e->postponed)
2337         s = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
2338       else
2339         s = vcl_session_get (wrk, e->session_index);
2340       if (!s)
2341         break;
2342       sid = s->session_index;
2343       if (sid < n_bits && read_map)
2344         {
2345           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2346           *bits_set += 1;
2347         }
2348       break;
2349     case SESSION_CTRL_EVT_CONNECTED:
2350       if (!e->postponed)
2351         {
2352           connected_msg = (session_connected_msg_t *) e->data;
2353           sid = vcl_session_connected_handler (wrk, connected_msg);
2354         }
2355       else
2356         sid = e->session_index;
2357       if (sid == VCL_INVALID_SESSION_INDEX)
2358         break;
2359       if (sid < n_bits && write_map)
2360         {
2361           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2362           *bits_set += 1;
2363         }
2364       break;
2365     case SESSION_CTRL_EVT_DISCONNECTED:
2366       disconnected_msg = (session_disconnected_msg_t *) e->data;
2367       s = vcl_session_disconnected_handler (wrk, disconnected_msg);
2368       if (!s)
2369         break;
2370       sid = s->session_index;
2371       if (sid < n_bits && except_map)
2372         {
2373           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2374           *bits_set += 1;
2375         }
2376       break;
2377     case SESSION_CTRL_EVT_RESET:
2378       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2379       if (sid < n_bits && except_map)
2380         {
2381           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2382           *bits_set += 1;
2383         }
2384       break;
2385     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2386       vcl_session_unlisten_reply_handler (wrk, e->data);
2387       break;
2388     case SESSION_CTRL_EVT_MIGRATED:
2389       vcl_session_migrated_handler (wrk, e->data);
2390       break;
2391     case SESSION_CTRL_EVT_CLEANUP:
2392       vcl_session_cleanup_handler (wrk, e->data);
2393       break;
2394     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2395       vcl_session_worker_update_reply_handler (wrk, e->data);
2396       break;
2397     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2398       vcl_session_req_worker_update_handler (wrk, e->data);
2399       break;
2400     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
2401       vcl_session_app_add_segment_handler (wrk, e->data);
2402       break;
2403     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
2404       vcl_session_app_del_segment_handler (wrk, e->data);
2405       break;
2406     case SESSION_CTRL_EVT_APP_WRK_RPC:
2407       vcl_worker_rpc_handler (wrk, e->data);
2408       break;
2409     default:
2410       clib_warning ("unhandled: %u", e->event_type);
2411       break;
2412     }
2413 }
2414
2415 static int
2416 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2417                       unsigned long n_bits, unsigned long *read_map,
2418                       unsigned long *write_map, unsigned long *except_map,
2419                       double time_to_wait, u32 * bits_set)
2420 {
2421   svm_msg_q_msg_t *msg;
2422   session_event_t *e;
2423   u32 i;
2424
2425   if (svm_msg_q_is_empty (mq))
2426     {
2427       if (*bits_set)
2428         return 0;
2429
2430       if (!time_to_wait)
2431         return 0;
2432       else if (time_to_wait < 0)
2433         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
2434       else
2435         {
2436           if (svm_msg_q_timedwait (mq, time_to_wait))
2437             return 0;
2438         }
2439     }
2440   vcl_mq_dequeue_batch (wrk, mq, ~0);
2441
2442   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2443     {
2444       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2445       e = svm_msg_q_msg_data (mq, msg);
2446       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
2447                                   except_map, bits_set);
2448       svm_msg_q_free_msg (mq, msg);
2449     }
2450   vec_reset_length (wrk->mq_msg_vector);
2451   vcl_handle_pending_wrk_updates (wrk);
2452   return *bits_set;
2453 }
2454
2455 static int
2456 vppcom_select_condvar (vcl_worker_t * wrk, int n_bits,
2457                        vcl_si_set * read_map, vcl_si_set * write_map,
2458                        vcl_si_set * except_map, double time_to_wait,
2459                        u32 * bits_set)
2460 {
2461   double wait = 0, start = 0;
2462
2463   if (!*bits_set)
2464     {
2465       wait = time_to_wait;
2466       start = clib_time_now (&wrk->clib_time);
2467     }
2468
2469   do
2470     {
2471       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
2472                             write_map, except_map, wait, bits_set);
2473       if (*bits_set)
2474         return *bits_set;
2475       if (wait == -1)
2476         continue;
2477
2478       wait = wait - (clib_time_now (&wrk->clib_time) - start);
2479     }
2480   while (wait > 0);
2481
2482   return 0;
2483 }
2484
2485 static int
2486 vppcom_select_eventfd (vcl_worker_t * wrk, int n_bits,
2487                        vcl_si_set * read_map, vcl_si_set * write_map,
2488                        vcl_si_set * except_map, double time_to_wait,
2489                        u32 * bits_set)
2490 {
2491   vcl_mq_evt_conn_t *mqc;
2492   int __clib_unused n_read;
2493   int n_mq_evts, i;
2494   u64 buf;
2495
2496   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2497   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2498                           vec_len (wrk->mq_events), time_to_wait);
2499   for (i = 0; i < n_mq_evts; i++)
2500     {
2501       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2502       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2503       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
2504                             except_map, 0, bits_set);
2505     }
2506
2507   return (n_mq_evts > 0 ? (int) *bits_set : 0);
2508 }
2509
2510 int
2511 vppcom_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
2512                vcl_si_set * except_map, double time_to_wait)
2513 {
2514   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
2515   vcl_worker_t *wrk = vcl_worker_get_current ();
2516   vcl_session_t *s = 0;
2517   int i;
2518
2519   if (n_bits && read_map)
2520     {
2521       clib_bitmap_validate (wrk->rd_bitmap, minbits);
2522       clib_memcpy_fast (wrk->rd_bitmap, read_map,
2523                         vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2524       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2525     }
2526   if (n_bits && write_map)
2527     {
2528       clib_bitmap_validate (wrk->wr_bitmap, minbits);
2529       clib_memcpy_fast (wrk->wr_bitmap, write_map,
2530                         vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2531       memset (write_map, 0, vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2532     }
2533   if (n_bits && except_map)
2534     {
2535       clib_bitmap_validate (wrk->ex_bitmap, minbits);
2536       clib_memcpy_fast (wrk->ex_bitmap, except_map,
2537                         vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2538       memset (except_map, 0, vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2539     }
2540
2541   if (!n_bits)
2542     return 0;
2543
2544   if (!write_map)
2545     goto check_rd;
2546
2547   clib_bitmap_foreach (sid, wrk->wr_bitmap)
2548     {
2549       if (!(s = vcl_session_get (wrk, sid)))
2550         {
2551           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2552           bits_set++;
2553           continue;
2554         }
2555
2556       if (vcl_session_write_ready (s))
2557         {
2558           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2559           bits_set++;
2560         }
2561       else
2562         {
2563           svm_fifo_t *txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2564           svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF);
2565         }
2566     }
2567
2568 check_rd:
2569   if (!read_map)
2570     goto check_mq;
2571
2572   clib_bitmap_foreach (sid, wrk->rd_bitmap)
2573     {
2574       if (!(s = vcl_session_get (wrk, sid)))
2575         {
2576           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2577           bits_set++;
2578           continue;
2579         }
2580
2581       if (vcl_session_read_ready (s))
2582         {
2583           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2584           bits_set++;
2585         }
2586     }
2587
2588 check_mq:
2589
2590   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2591     {
2592       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
2593                                   read_map, write_map, except_map, &bits_set);
2594     }
2595   vec_reset_length (wrk->unhandled_evts_vector);
2596
2597   if (vcm->cfg.use_mq_eventfd)
2598     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
2599                            time_to_wait, &bits_set);
2600   else
2601     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
2602                            time_to_wait, &bits_set);
2603
2604   return (bits_set);
2605 }
2606
2607 static inline void
2608 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_handle)
2609 {
2610   vppcom_epoll_t *vep;
2611   u32 sh = vep_handle;
2612   vcl_session_t *s;
2613
2614   if (VPPCOM_DEBUG <= 2)
2615     return;
2616
2617   s = vcl_session_get_w_handle (wrk, vep_handle);
2618   if (PREDICT_FALSE (!s))
2619     {
2620       VDBG (0, "ERROR: Invalid vep_sh (%u)!", vep_handle);
2621       goto done;
2622     }
2623   if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP)))
2624     {
2625       VDBG (0, "ERROR: vep_sh (%u) is not a vep!", vep_handle);
2626       goto done;
2627     }
2628   vep = &s->vep;
2629   VDBG (0, "vep_sh (%u): Dumping epoll chain\n"
2630         "{\n"
2631         "   is_vep         = %u\n"
2632         "   is_vep_session = %u\n"
2633         "   next_sh        = 0x%x (%u)\n"
2634         "}\n", vep_handle, s->flags & VCL_SESSION_F_IS_VEP,
2635         s->flags & VCL_SESSION_F_IS_VEP_SESSION, vep->next_sh, vep->next_sh);
2636
2637   for (sh = vep->next_sh; sh != ~0; sh = vep->next_sh)
2638     {
2639       s = vcl_session_get_w_handle (wrk, sh);
2640       if (PREDICT_FALSE (!s))
2641         {
2642           VDBG (0, "ERROR: Invalid sh (%u)!", sh);
2643           goto done;
2644         }
2645       if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2646         {
2647           VDBG (0, "ERROR: sh (%u) is a vep!", vep_handle);
2648         }
2649       else if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2650         {
2651           VDBG (0, "ERROR: sh (%u) is not a vep session handle!", sh);
2652           goto done;
2653         }
2654       vep = &s->vep;
2655       if (PREDICT_FALSE (vep->vep_sh != vep_handle))
2656         VDBG (0, "ERROR: session (%u) vep_sh (%u) != vep_sh (%u)!",
2657               sh, s->vep.vep_sh, vep_handle);
2658       if (s->flags & VCL_SESSION_F_IS_VEP_SESSION)
2659         {
2660           VDBG (0, "vep_sh[%u]: sh 0x%x (%u)\n"
2661                 "{\n"
2662                 "   next_sh        = 0x%x (%u)\n"
2663                 "   prev_sh        = 0x%x (%u)\n"
2664                 "   vep_sh         = 0x%x (%u)\n"
2665                 "   ev.events      = 0x%x\n"
2666                 "   ev.data.u64    = 0x%llx\n"
2667                 "   et_mask        = 0x%x\n"
2668                 "}\n",
2669                 vep_handle, sh, sh, vep->next_sh, vep->next_sh, vep->prev_sh,
2670                 vep->prev_sh, vep->vep_sh, vep->vep_sh, vep->ev.events,
2671                 vep->ev.data.u64, vep->et_mask);
2672         }
2673     }
2674
2675 done:
2676   VDBG (0, "vep_sh (%u): Dump complete!\n", vep_handle);
2677 }
2678
2679 int
2680 vppcom_epoll_create (void)
2681 {
2682   vcl_worker_t *wrk = vcl_worker_get_current ();
2683   vcl_session_t *vep_session;
2684
2685   vep_session = vcl_session_alloc (wrk);
2686
2687   vep_session->flags |= VCL_SESSION_F_IS_VEP;
2688   vep_session->vep.vep_sh = ~0;
2689   vep_session->vep.next_sh = ~0;
2690   vep_session->vep.prev_sh = ~0;
2691   vep_session->vpp_handle = ~0;
2692
2693   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_session->session_index);
2694   VDBG (0, "Created vep_idx %u", vep_session->session_index);
2695
2696   return vcl_session_handle (vep_session);
2697 }
2698
2699 int
2700 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2701                   struct epoll_event *event)
2702 {
2703   vcl_worker_t *wrk = vcl_worker_get_current ();
2704   vcl_session_t *vep_session;
2705   int rv = VPPCOM_OK;
2706   vcl_session_t *s;
2707   svm_fifo_t *txf;
2708
2709   if (vep_handle == session_handle)
2710     {
2711       VDBG (0, "vep_sh == session handle (%u)!", vep_handle);
2712       return VPPCOM_EINVAL;
2713     }
2714
2715   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2716   if (PREDICT_FALSE (!vep_session))
2717     {
2718       VDBG (0, "Invalid vep_sh (%u)!", vep_handle);
2719       return VPPCOM_EBADFD;
2720     }
2721   if (PREDICT_FALSE (!(vep_session->flags & VCL_SESSION_F_IS_VEP)))
2722     {
2723       VDBG (0, "vep_sh (%u) is not a vep!", vep_handle);
2724       return VPPCOM_EINVAL;
2725     }
2726
2727   ASSERT (vep_session->vep.vep_sh == ~0);
2728   ASSERT (vep_session->vep.prev_sh == ~0);
2729
2730   s = vcl_session_get_w_handle (wrk, session_handle);
2731   if (PREDICT_FALSE (!s))
2732     {
2733       VDBG (0, "Invalid session_handle (%u)!", session_handle);
2734       return VPPCOM_EBADFD;
2735     }
2736   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2737     {
2738       VDBG (0, "session_handle (%u) is a vep!", vep_handle);
2739       return VPPCOM_EINVAL;
2740     }
2741
2742   switch (op)
2743     {
2744     case EPOLL_CTL_ADD:
2745       if (PREDICT_FALSE (!event))
2746         {
2747           VDBG (0, "EPOLL_CTL_ADD: NULL pointer to epoll_event structure!");
2748           return VPPCOM_EINVAL;
2749         }
2750       if (s->flags & VCL_SESSION_F_IS_VEP_SESSION)
2751         {
2752           VDBG (0, "EPOLL_CTL_ADD: %u already epolled!", s->session_index);
2753           rv = VPPCOM_EEXIST;
2754           goto done;
2755         }
2756       if (vep_session->vep.next_sh != ~0)
2757         {
2758           vcl_session_t *next_session;
2759           next_session = vcl_session_get_w_handle (wrk,
2760                                                    vep_session->vep.next_sh);
2761           if (PREDICT_FALSE (!next_session))
2762             {
2763               VDBG (0, "EPOLL_CTL_ADD: Invalid vep.next_sh (%u) on "
2764                     "vep_idx (%u)!", vep_session->vep.next_sh, vep_handle);
2765               return VPPCOM_EBADFD;
2766             }
2767           ASSERT (next_session->vep.prev_sh == vep_handle);
2768           next_session->vep.prev_sh = session_handle;
2769         }
2770       s->vep.next_sh = vep_session->vep.next_sh;
2771       s->vep.prev_sh = vep_handle;
2772       s->vep.vep_sh = vep_handle;
2773       s->vep.et_mask = VEP_DEFAULT_ET_MASK;
2774       s->vep.ev = *event;
2775       s->flags &= ~VCL_SESSION_F_IS_VEP;
2776       s->flags |= VCL_SESSION_F_IS_VEP_SESSION;
2777       vep_session->vep.next_sh = session_handle;
2778
2779       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2780       if (txf && (event->events & EPOLLOUT))
2781         svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2782
2783       /* Generate EPOLLOUT if tx fifo not full */
2784       if ((event->events & EPOLLOUT) && (vcl_session_write_ready (s) > 0))
2785         {
2786           session_event_t e = { 0 };
2787           e.event_type = SESSION_IO_EVT_TX;
2788           e.session_index = s->session_index;
2789           vec_add1 (wrk->unhandled_evts_vector, e);
2790         }
2791       /* Generate EPOLLIN if rx fifo has data */
2792       if ((event->events & EPOLLIN) && (vcl_session_read_ready (s) > 0))
2793         {
2794           session_event_t e = { 0 };
2795           e.event_type = SESSION_IO_EVT_RX;
2796           e.session_index = s->session_index;
2797           vec_add1 (wrk->unhandled_evts_vector, e);
2798         }
2799       VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2800             vep_handle, session_handle, event->events, event->data.u64);
2801       vcl_evt (VCL_EVT_EPOLL_CTLADD, s, event->events, event->data.u64);
2802       break;
2803
2804     case EPOLL_CTL_MOD:
2805       if (PREDICT_FALSE (!event))
2806         {
2807           VDBG (0, "EPOLL_CTL_MOD: NULL pointer to epoll_event structure!");
2808           rv = VPPCOM_EINVAL;
2809           goto done;
2810         }
2811       else if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2812         {
2813           VDBG (0, "sh %u EPOLL_CTL_MOD: not a vep session!", session_handle);
2814           rv = VPPCOM_ENOENT;
2815           goto done;
2816         }
2817       else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
2818         {
2819           VDBG (0, "EPOLL_CTL_MOD: sh %u vep_sh (%u) != vep_sh (%u)!",
2820                 session_handle, s->vep.vep_sh, vep_handle);
2821           rv = VPPCOM_EINVAL;
2822           goto done;
2823         }
2824
2825       /* Generate EPOLLOUT if session write ready nd event was not on */
2826       if ((event->events & EPOLLOUT) && !(s->vep.ev.events & EPOLLOUT) &&
2827           (vcl_session_write_ready (s) > 0))
2828         {
2829           session_event_t e = { 0 };
2830           e.event_type = SESSION_IO_EVT_TX;
2831           e.session_index = s->session_index;
2832           vec_add1 (wrk->unhandled_evts_vector, e);
2833         }
2834       /* Generate EPOLLIN if session read ready and event was not on */
2835       if ((event->events & EPOLLIN) && !(s->vep.ev.events & EPOLLIN) &&
2836           (vcl_session_read_ready (s) > 0))
2837         {
2838           session_event_t e = { 0 };
2839           e.event_type = SESSION_IO_EVT_RX;
2840           e.session_index = s->session_index;
2841           vec_add1 (wrk->unhandled_evts_vector, e);
2842         }
2843       s->vep.et_mask = VEP_DEFAULT_ET_MASK;
2844       s->vep.ev = *event;
2845       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2846       if (txf)
2847         {
2848           if (event->events & EPOLLOUT)
2849             svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2850           else
2851             svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2852         }
2853       VDBG (1, "EPOLL_CTL_MOD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2854             vep_handle, session_handle, event->events, event->data.u64);
2855       break;
2856
2857     case EPOLL_CTL_DEL:
2858       if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2859         {
2860           VDBG (0, "EPOLL_CTL_DEL: %u not a vep session!", session_handle);
2861           rv = VPPCOM_ENOENT;
2862           goto done;
2863         }
2864       else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
2865         {
2866           VDBG (0, "EPOLL_CTL_DEL: sh %u vep_sh (%u) != vep_sh (%u)!",
2867                 session_handle, s->vep.vep_sh, vep_handle);
2868           rv = VPPCOM_EINVAL;
2869           goto done;
2870         }
2871
2872       if (s->vep.prev_sh == vep_handle)
2873         vep_session->vep.next_sh = s->vep.next_sh;
2874       else
2875         {
2876           vcl_session_t *prev_session;
2877           prev_session = vcl_session_get_w_handle (wrk, s->vep.prev_sh);
2878           if (PREDICT_FALSE (!prev_session))
2879             {
2880               VDBG (0, "EPOLL_CTL_DEL: Invalid prev_sh (%u) on sh (%u)!",
2881                     s->vep.prev_sh, session_handle);
2882               return VPPCOM_EBADFD;
2883             }
2884           ASSERT (prev_session->vep.next_sh == session_handle);
2885           prev_session->vep.next_sh = s->vep.next_sh;
2886         }
2887       if (s->vep.next_sh != ~0)
2888         {
2889           vcl_session_t *next_session;
2890           next_session = vcl_session_get_w_handle (wrk, s->vep.next_sh);
2891           if (PREDICT_FALSE (!next_session))
2892             {
2893               VDBG (0, "EPOLL_CTL_DEL: Invalid next_sh (%u) on sh (%u)!",
2894                     s->vep.next_sh, session_handle);
2895               return VPPCOM_EBADFD;
2896             }
2897           ASSERT (next_session->vep.prev_sh == session_handle);
2898           next_session->vep.prev_sh = s->vep.prev_sh;
2899         }
2900
2901       memset (&s->vep, 0, sizeof (s->vep));
2902       s->vep.next_sh = ~0;
2903       s->vep.prev_sh = ~0;
2904       s->vep.vep_sh = ~0;
2905       s->flags &= ~VCL_SESSION_F_IS_VEP_SESSION;
2906
2907       if (vcl_session_is_open (s))
2908         {
2909           txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2910           if (txf)
2911             svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2912         }
2913
2914       VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sh %u!", vep_handle,
2915             session_handle);
2916       vcl_evt (VCL_EVT_EPOLL_CTLDEL, s, vep_sh);
2917       break;
2918
2919     default:
2920       VDBG (0, "Invalid operation (%d)!", op);
2921       rv = VPPCOM_EINVAL;
2922     }
2923
2924   vep_verify_epoll_chain (wrk, vep_handle);
2925
2926 done:
2927   return rv;
2928 }
2929
2930 static inline void
2931 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2932                                 struct epoll_event *events, u32 * num_ev)
2933 {
2934   session_disconnected_msg_t *disconnected_msg;
2935   session_connected_msg_t *connected_msg;
2936   u32 sid = ~0, session_events;
2937   u64 session_evt_data = ~0;
2938   vcl_session_t *s;
2939   u8 add_event = 0;
2940
2941   switch (e->event_type)
2942     {
2943     case SESSION_IO_EVT_RX:
2944       sid = e->session_index;
2945       s = vcl_session_get (wrk, sid);
2946       if (vcl_session_is_closed (s))
2947         break;
2948       vcl_fifo_rx_evt_valid_or_break (s);
2949       session_events = s->vep.ev.events;
2950       if (!(EPOLLIN & s->vep.ev.events)
2951           || (s->flags & VCL_SESSION_F_HAS_RX_EVT))
2952         break;
2953       add_event = 1;
2954       events[*num_ev].events |= EPOLLIN;
2955       session_evt_data = s->vep.ev.data.u64;
2956       s->flags |= VCL_SESSION_F_HAS_RX_EVT;
2957       break;
2958     case SESSION_IO_EVT_TX:
2959       sid = e->session_index;
2960       s = vcl_session_get (wrk, sid);
2961       if (vcl_session_is_closed (s))
2962         break;
2963       session_events = s->vep.ev.events;
2964       if (!(EPOLLOUT & session_events))
2965         break;
2966       add_event = 1;
2967       events[*num_ev].events |= EPOLLOUT;
2968       session_evt_data = s->vep.ev.data.u64;
2969       svm_fifo_reset_has_deq_ntf (vcl_session_is_ct (s) ?
2970                                   s->ct_tx_fifo : s->tx_fifo);
2971       break;
2972     case SESSION_CTRL_EVT_ACCEPTED:
2973       if (!e->postponed)
2974         s = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
2975       else
2976         s = vcl_session_get (wrk, e->session_index);
2977       if (!s)
2978         break;
2979       session_events = s->vep.ev.events;
2980       sid = s->session_index;
2981       if (!(EPOLLIN & session_events))
2982         break;
2983       add_event = 1;
2984       events[*num_ev].events |= EPOLLIN;
2985       session_evt_data = s->vep.ev.data.u64;
2986       break;
2987     case SESSION_CTRL_EVT_CONNECTED:
2988       if (!e->postponed)
2989         {
2990           connected_msg = (session_connected_msg_t *) e->data;
2991           sid = vcl_session_connected_handler (wrk, connected_msg);
2992         }
2993       else
2994         sid = e->session_index;
2995       s = vcl_session_get (wrk, sid);
2996       if (vcl_session_is_closed (s))
2997         break;
2998       session_events = s->vep.ev.events;
2999       /* Generate EPOLLOUT because there's no connected event */
3000       if (!(EPOLLOUT & session_events))
3001         break;
3002       add_event = 1;
3003       events[*num_ev].events |= EPOLLOUT;
3004       session_evt_data = s->vep.ev.data.u64;
3005       if (s->session_state == VCL_STATE_DETACHED)
3006         events[*num_ev].events |= EPOLLHUP;
3007       break;
3008     case SESSION_CTRL_EVT_DISCONNECTED:
3009       disconnected_msg = (session_disconnected_msg_t *) e->data;
3010       s = vcl_session_disconnected_handler (wrk, disconnected_msg);
3011       if (vcl_session_is_closed (s))
3012         break;
3013       sid = s->session_index;
3014       session_events = s->vep.ev.events;
3015       add_event = 1;
3016       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
3017       session_evt_data = s->vep.ev.data.u64;
3018       break;
3019     case SESSION_CTRL_EVT_RESET:
3020       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
3021       s = vcl_session_get (wrk, sid);
3022       if (vcl_session_is_closed (s))
3023         break;
3024       session_events = s->vep.ev.events;
3025       add_event = 1;
3026       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
3027       session_evt_data = s->vep.ev.data.u64;
3028       break;
3029     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
3030       vcl_session_unlisten_reply_handler (wrk, e->data);
3031       break;
3032     case SESSION_CTRL_EVT_MIGRATED:
3033       vcl_session_migrated_handler (wrk, e->data);
3034       break;
3035     case SESSION_CTRL_EVT_CLEANUP:
3036       vcl_session_cleanup_handler (wrk, e->data);
3037       break;
3038     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
3039       vcl_session_req_worker_update_handler (wrk, e->data);
3040       break;
3041     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
3042       vcl_session_worker_update_reply_handler (wrk, e->data);
3043       break;
3044     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
3045       vcl_session_app_add_segment_handler (wrk, e->data);
3046       break;
3047     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
3048       vcl_session_app_del_segment_handler (wrk, e->data);
3049       break;
3050     case SESSION_CTRL_EVT_APP_WRK_RPC:
3051       vcl_worker_rpc_handler (wrk, e->data);
3052       break;
3053     default:
3054       VDBG (0, "unhandled: %u", e->event_type);
3055       break;
3056     }
3057
3058   if (add_event)
3059     {
3060       events[*num_ev].data.u64 = session_evt_data;
3061       if (EPOLLONESHOT & session_events)
3062         {
3063           s = vcl_session_get (wrk, sid);
3064           s->vep.ev.events = 0;
3065         }
3066       *num_ev += 1;
3067     }
3068 }
3069
3070 static int
3071 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
3072                           struct epoll_event *events, u32 maxevents,
3073                           double wait_for_time, u32 * num_ev)
3074 {
3075   svm_msg_q_msg_t *msg;
3076   session_event_t *e;
3077   int i;
3078
3079   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
3080     goto handle_dequeued;
3081
3082   if (svm_msg_q_is_empty (mq))
3083     {
3084       if (!wait_for_time)
3085         return 0;
3086       else if (wait_for_time < 0)
3087         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
3088       else
3089         {
3090           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
3091             return 0;
3092         }
3093     }
3094   ASSERT (maxevents > *num_ev);
3095   vcl_mq_dequeue_batch (wrk, mq, ~0);
3096
3097 handle_dequeued:
3098   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
3099     {
3100       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
3101       e = svm_msg_q_msg_data (mq, msg);
3102       if (*num_ev < maxevents)
3103         vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
3104       else
3105         vcl_handle_mq_event (wrk, e);
3106       svm_msg_q_free_msg (mq, msg);
3107     }
3108   vec_reset_length (wrk->mq_msg_vector);
3109   vcl_handle_pending_wrk_updates (wrk);
3110   return *num_ev;
3111 }
3112
3113 static int
3114 vppcom_epoll_wait_condvar (vcl_worker_t *wrk, struct epoll_event *events,
3115                            int maxevents, u32 n_evts, double timeout_ms)
3116 {
3117   double end = -1;
3118
3119   if (!n_evts)
3120     {
3121       if (timeout_ms > 0)
3122         end = clib_time_now (&wrk->clib_time) + (timeout_ms / 1e3);
3123     }
3124
3125   do
3126     {
3127       vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events, maxevents,
3128                                 timeout_ms, &n_evts);
3129       if (n_evts || !timeout_ms)
3130         return n_evts;
3131     }
3132   while (end == -1 || clib_time_now (&wrk->clib_time) < end);
3133
3134   return 0;
3135 }
3136
3137 static int
3138 vppcom_epoll_wait_eventfd (vcl_worker_t *wrk, struct epoll_event *events,
3139                            int maxevents, u32 n_evts, double timeout_ms)
3140 {
3141   int __clib_unused n_read;
3142   vcl_mq_evt_conn_t *mqc;
3143   int n_mq_evts, i;
3144   double end = -1;
3145   u64 buf;
3146
3147   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
3148   if (!n_evts)
3149     {
3150       if (timeout_ms > 0)
3151         end = clib_time_now (&wrk->clib_time) + (timeout_ms / 1e3);
3152     }
3153
3154   do
3155     {
3156       n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
3157                               vec_len (wrk->mq_events), timeout_ms);
3158       if (n_mq_evts < 0)
3159         {
3160           VDBG (0, "epoll_wait error %u", errno);
3161           return n_evts;
3162         }
3163
3164       for (i = 0; i < n_mq_evts; i++)
3165         {
3166           mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
3167           n_read = read (mqc->mq_fd, &buf, sizeof (buf));
3168           vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0,
3169                                     &n_evts);
3170         }
3171
3172       if (n_evts || !timeout_ms)
3173         return n_evts;
3174     }
3175   while (end == -1 || clib_time_now (&wrk->clib_time) < end);
3176
3177   return 0;
3178 }
3179
3180 int
3181 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
3182                    int maxevents, double wait_for_time)
3183 {
3184   vcl_worker_t *wrk = vcl_worker_get_current ();
3185   vcl_session_t *vep_session;
3186   u32 n_evts = 0;
3187   int i;
3188
3189   if (PREDICT_FALSE (maxevents <= 0))
3190     {
3191       VDBG (0, "ERROR: Invalid maxevents (%d)!", maxevents);
3192       return VPPCOM_EINVAL;
3193     }
3194
3195   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
3196   if (!vep_session)
3197     return VPPCOM_EBADFD;
3198
3199   if (PREDICT_FALSE (!(vep_session->flags & VCL_SESSION_F_IS_VEP)))
3200     {
3201       VDBG (0, "ERROR: vep_idx (%u) is not a vep!", vep_handle);
3202       return VPPCOM_EINVAL;
3203     }
3204
3205   memset (events, 0, sizeof (*events) * maxevents);
3206
3207   if (vec_len (wrk->unhandled_evts_vector))
3208     {
3209       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
3210         {
3211           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
3212                                           events, &n_evts);
3213           if (n_evts == maxevents)
3214             {
3215               vec_delete (wrk->unhandled_evts_vector, i + 1, 0);
3216               return n_evts;
3217             }
3218         }
3219       vec_reset_length (wrk->unhandled_evts_vector);
3220     }
3221   /* Request to only drain unhandled */
3222   if ((int) wait_for_time == -2)
3223     return n_evts;
3224
3225   if (vcm->cfg.use_mq_eventfd)
3226     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
3227                                       wait_for_time);
3228
3229   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
3230                                     wait_for_time);
3231 }
3232
3233 int
3234 vppcom_session_attr (uint32_t session_handle, uint32_t op,
3235                      void *buffer, uint32_t * buflen)
3236 {
3237   vcl_worker_t *wrk = vcl_worker_get_current ();
3238   u32 *flags = buffer;
3239   vppcom_endpt_t *ep = buffer;
3240   transport_endpt_attr_t tea;
3241   vcl_session_t *session;
3242   int rv = VPPCOM_OK;
3243
3244   session = vcl_session_get_w_handle (wrk, session_handle);
3245   if (!session)
3246     return VPPCOM_EBADFD;
3247
3248   switch (op)
3249     {
3250     case VPPCOM_ATTR_GET_NREAD:
3251       rv = vcl_session_read_ready (session);
3252       VDBG (2, "VPPCOM_ATTR_GET_NREAD: sh %u, nread = %d", session_handle,
3253             rv);
3254       break;
3255
3256     case VPPCOM_ATTR_GET_NWRITE:
3257       rv = vcl_session_write_ready (session);
3258       VDBG (2, "VPPCOM_ATTR_GET_NWRITE: sh %u, nwrite = %d", session_handle,
3259             rv);
3260       break;
3261
3262     case VPPCOM_ATTR_GET_FLAGS:
3263       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
3264         {
3265           *flags =
3266             O_RDWR |
3267             (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK) ?
3268              O_NONBLOCK : 0);
3269           *buflen = sizeof (*flags);
3270           VDBG (2, "VPPCOM_ATTR_GET_FLAGS: sh %u, flags = 0x%08x, "
3271                 "is_nonblocking = %u", session_handle, *flags,
3272                 vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK));
3273         }
3274       else
3275         rv = VPPCOM_EINVAL;
3276       break;
3277
3278     case VPPCOM_ATTR_SET_FLAGS:
3279       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
3280         {
3281           if (*flags & O_NONBLOCK)
3282             vcl_session_set_attr (session, VCL_SESS_ATTR_NONBLOCK);
3283           else
3284             vcl_session_clear_attr (session, VCL_SESS_ATTR_NONBLOCK);
3285
3286           VDBG (2, "VPPCOM_ATTR_SET_FLAGS: sh %u, flags = 0x%08x,"
3287                 " is_nonblocking = %u", session_handle, *flags,
3288                 vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK));
3289         }
3290       else
3291         rv = VPPCOM_EINVAL;
3292       break;
3293
3294     case VPPCOM_ATTR_GET_PEER_ADDR:
3295       if (PREDICT_TRUE (buffer && buflen &&
3296                         (*buflen >= sizeof (*ep)) && ep->ip))
3297         {
3298           ep->is_ip4 = session->transport.is_ip4;
3299           ep->port = session->transport.rmt_port;
3300           if (session->transport.is_ip4)
3301             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3302                               sizeof (ip4_address_t));
3303           else
3304             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3305                               sizeof (ip6_address_t));
3306           *buflen = sizeof (*ep);
3307           VDBG (1, "VPPCOM_ATTR_GET_PEER_ADDR: sh %u, is_ip4 = %u, "
3308                 "addr = %U, port %u", session_handle, ep->is_ip4,
3309                 format_ip46_address, &session->transport.rmt_ip,
3310                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3311                 clib_net_to_host_u16 (ep->port));
3312         }
3313       else
3314         rv = VPPCOM_EINVAL;
3315       break;
3316
3317     case VPPCOM_ATTR_GET_LCL_ADDR:
3318       if (PREDICT_TRUE (buffer && buflen &&
3319                         (*buflen >= sizeof (*ep)) && ep->ip))
3320         {
3321           ep->is_ip4 = session->transport.is_ip4;
3322           ep->port = session->transport.lcl_port;
3323           if (session->transport.is_ip4)
3324             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip4,
3325                               sizeof (ip4_address_t));
3326           else
3327             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip6,
3328                               sizeof (ip6_address_t));
3329           *buflen = sizeof (*ep);
3330           VDBG (1, "VPPCOM_ATTR_GET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3331                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3332                 &session->transport.lcl_ip,
3333                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3334                 clib_net_to_host_u16 (ep->port));
3335         }
3336       else
3337         rv = VPPCOM_EINVAL;
3338       break;
3339
3340     case VPPCOM_ATTR_SET_LCL_ADDR:
3341       if (PREDICT_TRUE (buffer && buflen &&
3342                         (*buflen >= sizeof (*ep)) && ep->ip))
3343         {
3344           session->transport.is_ip4 = ep->is_ip4;
3345           session->transport.lcl_port = ep->port;
3346           vcl_ip_copy_from_ep (&session->transport.lcl_ip, ep);
3347           *buflen = sizeof (*ep);
3348           VDBG (1, "VPPCOM_ATTR_SET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3349                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3350                 &session->transport.lcl_ip,
3351                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3352                 clib_net_to_host_u16 (ep->port));
3353         }
3354       else
3355         rv = VPPCOM_EINVAL;
3356       break;
3357
3358     case VPPCOM_ATTR_GET_LIBC_EPFD:
3359       rv = session->libc_epfd;
3360       VDBG (2, "VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d", rv);
3361       break;
3362
3363     case VPPCOM_ATTR_SET_LIBC_EPFD:
3364       if (PREDICT_TRUE (buffer && buflen &&
3365                         (*buflen == sizeof (session->libc_epfd))))
3366         {
3367           session->libc_epfd = *(int *) buffer;
3368           *buflen = sizeof (session->libc_epfd);
3369
3370           VDBG (2, "VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, buflen %d",
3371                 session->libc_epfd, *buflen);
3372         }
3373       else
3374         rv = VPPCOM_EINVAL;
3375       break;
3376
3377     case VPPCOM_ATTR_GET_PROTOCOL:
3378       if (buffer && buflen && (*buflen >= sizeof (int)))
3379         {
3380           *(int *) buffer = session->session_type;
3381           *buflen = sizeof (int);
3382
3383           VDBG (2, "VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
3384                 *(int *) buffer, *(int *) buffer ? "UDP" : "TCP", *buflen);
3385         }
3386       else
3387         rv = VPPCOM_EINVAL;
3388       break;
3389
3390     case VPPCOM_ATTR_GET_LISTEN:
3391       if (buffer && buflen && (*buflen >= sizeof (int)))
3392         {
3393           *(int *) buffer = vcl_session_has_attr (session,
3394                                                   VCL_SESS_ATTR_LISTEN);
3395           *buflen = sizeof (int);
3396
3397           VDBG (2, "VPPCOM_ATTR_GET_LISTEN: %d, buflen %d", *(int *) buffer,
3398                 *buflen);
3399         }
3400       else
3401         rv = VPPCOM_EINVAL;
3402       break;
3403
3404     case VPPCOM_ATTR_GET_ERROR:
3405       if (buffer && buflen && (*buflen >= sizeof (int)))
3406         {
3407           *(int *) buffer = 0;
3408           *buflen = sizeof (int);
3409
3410           VDBG (2, "VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
3411                 *(int *) buffer, *buflen);
3412         }
3413       else
3414         rv = VPPCOM_EINVAL;
3415       break;
3416
3417     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
3418       if (buffer && buflen && (*buflen >= sizeof (u32)))
3419         {
3420
3421           /* VPP-TBD */
3422           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
3423                                 session->tx_fifo ?
3424                                 svm_fifo_size (session->tx_fifo) :
3425                                 vcm->cfg.tx_fifo_size);
3426           *buflen = sizeof (u32);
3427
3428           VDBG (2, "VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3429                 " #VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer,
3430                 *buflen);
3431         }
3432       else
3433         rv = VPPCOM_EINVAL;
3434       break;
3435
3436     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
3437       if (buffer && buflen && (*buflen == sizeof (u32)))
3438         {
3439           /* VPP-TBD */
3440           session->sndbuf_size = *(u32 *) buffer;
3441           VDBG (2, "VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3442                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3443                 *buflen);
3444         }
3445       else
3446         rv = VPPCOM_EINVAL;
3447       break;
3448
3449     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
3450       if (buffer && buflen && (*buflen >= sizeof (u32)))
3451         {
3452
3453           /* VPP-TBD */
3454           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
3455                                 session->rx_fifo ?
3456                                 svm_fifo_size (session->rx_fifo) :
3457                                 vcm->cfg.rx_fifo_size);
3458           *buflen = sizeof (u32);
3459
3460           VDBG (2, "VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), buflen %d, "
3461                 "#VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer, *buflen);
3462         }
3463       else
3464         rv = VPPCOM_EINVAL;
3465       break;
3466
3467     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
3468       if (buffer && buflen && (*buflen == sizeof (u32)))
3469         {
3470           /* VPP-TBD */
3471           session->rcvbuf_size = *(u32 *) buffer;
3472           VDBG (2, "VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), buflen %d,"
3473                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3474                 *buflen);
3475         }
3476       else
3477         rv = VPPCOM_EINVAL;
3478       break;
3479
3480     case VPPCOM_ATTR_GET_REUSEADDR:
3481       if (buffer && buflen && (*buflen >= sizeof (int)))
3482         {
3483           /* VPP-TBD */
3484           *(int *) buffer = vcl_session_has_attr (session,
3485                                                   VCL_SESS_ATTR_REUSEADDR);
3486           *buflen = sizeof (int);
3487
3488           VDBG (2, "VPPCOM_ATTR_GET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3489                 *(int *) buffer, *buflen);
3490         }
3491       else
3492         rv = VPPCOM_EINVAL;
3493       break;
3494
3495     case VPPCOM_ATTR_SET_REUSEADDR:
3496       if (buffer && buflen && (*buflen == sizeof (int)) &&
3497           !vcl_session_has_attr (session, VCL_SESS_ATTR_LISTEN))
3498         {
3499           /* VPP-TBD */
3500           if (*(int *) buffer)
3501             vcl_session_set_attr (session, VCL_SESS_ATTR_REUSEADDR);
3502           else
3503             vcl_session_clear_attr (session, VCL_SESS_ATTR_REUSEADDR);
3504
3505           VDBG (2, "VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3506                 vcl_session_has_attr (session, VCL_SESS_ATTR_REUSEADDR),
3507                 *buflen);
3508         }
3509       else
3510         rv = VPPCOM_EINVAL;
3511       break;
3512
3513     case VPPCOM_ATTR_GET_REUSEPORT:
3514       if (buffer && buflen && (*buflen >= sizeof (int)))
3515         {
3516           /* VPP-TBD */
3517           *(int *) buffer = vcl_session_has_attr (session,
3518                                                   VCL_SESS_ATTR_REUSEPORT);
3519           *buflen = sizeof (int);
3520
3521           VDBG (2, "VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3522                 *(int *) buffer, *buflen);
3523         }
3524       else
3525         rv = VPPCOM_EINVAL;
3526       break;
3527
3528     case VPPCOM_ATTR_SET_REUSEPORT:
3529       if (buffer && buflen && (*buflen == sizeof (int)) &&
3530           !vcl_session_has_attr (session, VCL_SESS_ATTR_LISTEN))
3531         {
3532           /* VPP-TBD */
3533           if (*(int *) buffer)
3534             vcl_session_set_attr (session, VCL_SESS_ATTR_REUSEPORT);
3535           else
3536             vcl_session_clear_attr (session, VCL_SESS_ATTR_REUSEPORT);
3537
3538           VDBG (2, "VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3539                 vcl_session_has_attr (session, VCL_SESS_ATTR_REUSEPORT),
3540                 *buflen);
3541         }
3542       else
3543         rv = VPPCOM_EINVAL;
3544       break;
3545
3546     case VPPCOM_ATTR_GET_BROADCAST:
3547       if (buffer && buflen && (*buflen >= sizeof (int)))
3548         {
3549           /* VPP-TBD */
3550           *(int *) buffer = vcl_session_has_attr (session,
3551                                                   VCL_SESS_ATTR_BROADCAST);
3552           *buflen = sizeof (int);
3553
3554           VDBG (2, "VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3555                 *(int *) buffer, *buflen);
3556         }
3557       else
3558         rv = VPPCOM_EINVAL;
3559       break;
3560
3561     case VPPCOM_ATTR_SET_BROADCAST:
3562       if (buffer && buflen && (*buflen == sizeof (int)))
3563         {
3564           /* VPP-TBD */
3565           if (*(int *) buffer)
3566             vcl_session_set_attr (session, VCL_SESS_ATTR_BROADCAST);
3567           else
3568             vcl_session_clear_attr (session, VCL_SESS_ATTR_BROADCAST);
3569
3570           VDBG (2, "VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3571                 vcl_session_has_attr (session, VCL_SESS_ATTR_BROADCAST),
3572                 *buflen);
3573         }
3574       else
3575         rv = VPPCOM_EINVAL;
3576       break;
3577
3578     case VPPCOM_ATTR_GET_V6ONLY:
3579       if (buffer && buflen && (*buflen >= sizeof (int)))
3580         {
3581           /* VPP-TBD */
3582           *(int *) buffer = vcl_session_has_attr (session,
3583                                                   VCL_SESS_ATTR_V6ONLY);
3584           *buflen = sizeof (int);
3585
3586           VDBG (2, "VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3587                 *(int *) buffer, *buflen);
3588         }
3589       else
3590         rv = VPPCOM_EINVAL;
3591       break;
3592
3593     case VPPCOM_ATTR_SET_V6ONLY:
3594       if (buffer && buflen && (*buflen == sizeof (int)))
3595         {
3596           /* VPP-TBD */
3597           if (*(int *) buffer)
3598             vcl_session_set_attr (session, VCL_SESS_ATTR_V6ONLY);
3599           else
3600             vcl_session_clear_attr (session, VCL_SESS_ATTR_V6ONLY);
3601
3602           VDBG (2, "VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3603                 vcl_session_has_attr (session, VCL_SESS_ATTR_V6ONLY),
3604                 *buflen);
3605         }
3606       else
3607         rv = VPPCOM_EINVAL;
3608       break;
3609
3610     case VPPCOM_ATTR_GET_KEEPALIVE:
3611       if (buffer && buflen && (*buflen >= sizeof (int)))
3612         {
3613           /* VPP-TBD */
3614           *(int *) buffer = vcl_session_has_attr (session,
3615                                                   VCL_SESS_ATTR_KEEPALIVE);
3616           *buflen = sizeof (int);
3617
3618           VDBG (2, "VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3619                 *(int *) buffer, *buflen);
3620         }
3621       else
3622         rv = VPPCOM_EINVAL;
3623       break;
3624
3625     case VPPCOM_ATTR_SET_KEEPALIVE:
3626       if (buffer && buflen && (*buflen == sizeof (int)))
3627         {
3628           /* VPP-TBD */
3629           if (*(int *) buffer)
3630             vcl_session_set_attr (session, VCL_SESS_ATTR_KEEPALIVE);
3631           else
3632             vcl_session_clear_attr (session, VCL_SESS_ATTR_KEEPALIVE);
3633
3634           VDBG (2, "VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3635                 vcl_session_has_attr (session, VCL_SESS_ATTR_KEEPALIVE),
3636                 *buflen);
3637         }
3638       else
3639         rv = VPPCOM_EINVAL;
3640       break;
3641
3642     case VPPCOM_ATTR_GET_TCP_NODELAY:
3643       if (buffer && buflen && (*buflen >= sizeof (int)))
3644         {
3645           /* VPP-TBD */
3646           *(int *) buffer = vcl_session_has_attr (session,
3647                                                   VCL_SESS_ATTR_TCP_NODELAY);
3648           *buflen = sizeof (int);
3649
3650           VDBG (2, "VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3651                 *(int *) buffer, *buflen);
3652         }
3653       else
3654         rv = VPPCOM_EINVAL;
3655       break;
3656
3657     case VPPCOM_ATTR_SET_TCP_NODELAY:
3658       if (buffer && buflen && (*buflen == sizeof (int)))
3659         {
3660           /* VPP-TBD */
3661           if (*(int *) buffer)
3662             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_NODELAY);
3663           else
3664             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_NODELAY);
3665
3666           VDBG (2, "VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3667                 vcl_session_has_attr (session, VCL_SESS_ATTR_TCP_NODELAY),
3668                 *buflen);
3669         }
3670       else
3671         rv = VPPCOM_EINVAL;
3672       break;
3673
3674     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3675       if (buffer && buflen && (*buflen >= sizeof (int)))
3676         {
3677           /* VPP-TBD */
3678           *(int *) buffer = vcl_session_has_attr (session,
3679                                                   VCL_SESS_ATTR_TCP_KEEPIDLE);
3680           *buflen = sizeof (int);
3681
3682           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3683                 *(int *) buffer, *buflen);
3684         }
3685       else
3686         rv = VPPCOM_EINVAL;
3687       break;
3688
3689     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3690       if (buffer && buflen && (*buflen == sizeof (int)))
3691         {
3692           /* VPP-TBD */
3693           if (*(int *) buffer)
3694             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_KEEPIDLE);
3695           else
3696             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_KEEPIDLE);
3697
3698           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3699                 vcl_session_has_attr (session,
3700                                       VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3701         }
3702       else
3703         rv = VPPCOM_EINVAL;
3704       break;
3705
3706     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3707       if (buffer && buflen && (*buflen >= sizeof (int)))
3708         {
3709           /* VPP-TBD */
3710           *(int *) buffer = vcl_session_has_attr (session,
3711                                                   VCL_SESS_ATTR_TCP_KEEPINTVL);
3712           *buflen = sizeof (int);
3713
3714           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3715                 *(int *) buffer, *buflen);
3716         }
3717       else
3718         rv = VPPCOM_EINVAL;
3719       break;
3720
3721     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3722       if (buffer && buflen && (*buflen == sizeof (int)))
3723         {
3724           /* VPP-TBD */
3725           if (*(int *) buffer)
3726             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_KEEPINTVL);
3727           else
3728             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_KEEPINTVL);
3729
3730           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3731                 vcl_session_has_attr (session,
3732                                       VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3733         }
3734       else
3735         rv = VPPCOM_EINVAL;
3736       break;
3737
3738     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3739       if (!(buffer && buflen && (*buflen >= sizeof (u32))))
3740         {
3741           rv = VPPCOM_EINVAL;
3742           break;
3743         }
3744
3745       tea.type = TRANSPORT_ENDPT_ATTR_MSS;
3746       tea.mss = *(u32 *) buffer;
3747       if (vcl_session_transport_attr (wrk, session, 1 /* is_get */, &tea))
3748         rv = VPPCOM_ENOPROTOOPT;
3749
3750       if (!rv)
3751         {
3752           *(u32 *) buffer = tea.mss;
3753           *buflen = sizeof (int);
3754         }
3755
3756       VDBG (2, "VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d", *(int *) buffer,
3757             *buflen);
3758       break;
3759
3760     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3761       if (!(buffer && buflen && (*buflen == sizeof (u32))))
3762         {
3763           rv = VPPCOM_EINVAL;
3764           break;
3765         }
3766
3767       tea.type = TRANSPORT_ENDPT_ATTR_MSS;
3768       tea.mss = *(u32 *) buffer;
3769       if (vcl_session_transport_attr (wrk, session, 0 /* is_get */, &tea))
3770         rv = VPPCOM_ENOPROTOOPT;
3771
3772       VDBG (2, "VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d", tea.mss,
3773             *buflen);
3774       break;
3775
3776     case VPPCOM_ATTR_SET_CONNECTED:
3777       session->flags |= VCL_SESSION_F_CONNECTED;
3778       break;
3779
3780     case VPPCOM_ATTR_SET_CKPAIR:
3781       if (!(buffer && buflen && (*buflen == sizeof (int))) ||
3782           !vcl_session_has_crypto (session))
3783         {
3784           rv = VPPCOM_EINVAL;
3785           break;
3786         }
3787       if (!session->ext_config)
3788         {
3789           vcl_session_alloc_ext_cfg (session, TRANSPORT_ENDPT_EXT_CFG_CRYPTO,
3790                                      sizeof (transport_endpt_ext_cfg_t));
3791         }
3792       else if (session->ext_config->type != TRANSPORT_ENDPT_EXT_CFG_CRYPTO)
3793         {
3794           rv = VPPCOM_EINVAL;
3795           break;
3796         }
3797
3798       session->ext_config->crypto.ckpair_index = *(uint32_t *) buffer;
3799       break;
3800
3801     case VPPCOM_ATTR_SET_VRF:
3802       if (!(buffer && buflen && (*buflen == sizeof (u32))))
3803         {
3804           rv = VPPCOM_EINVAL;
3805           break;
3806         }
3807       session->vrf = *(u32 *) buffer;
3808       break;
3809
3810     case VPPCOM_ATTR_GET_VRF:
3811       if (!(buffer && buflen && (*buflen >= sizeof (u32))))
3812         {
3813           rv = VPPCOM_EINVAL;
3814           break;
3815         }
3816       *(u32 *) buffer = session->vrf;
3817       *buflen = sizeof (u32);
3818       break;
3819
3820     case VPPCOM_ATTR_GET_DOMAIN:
3821       if (!(buffer && buflen && (*buflen >= sizeof (int))))
3822         {
3823           rv = VPPCOM_EINVAL;
3824           break;
3825         }
3826
3827       if (session->transport.is_ip4)
3828         *(int *) buffer = AF_INET;
3829       else
3830         *(int *) buffer = AF_INET6;
3831       *buflen = sizeof (int);
3832
3833       VDBG (2, "VPPCOM_ATTR_GET_DOMAIN: %d, buflen %u", *(int *) buffer,
3834             *buflen);
3835       break;
3836
3837     case VPPCOM_ATTR_SET_ENDPT_EXT_CFG:
3838       if (!(buffer && buflen && (*buflen > 0)))
3839         {
3840           rv = VPPCOM_EINVAL;
3841           break;
3842         }
3843       if (session->ext_config)
3844         {
3845           rv = VPPCOM_EINVAL;
3846           break;
3847         }
3848       vcl_session_alloc_ext_cfg (session, TRANSPORT_ENDPT_EXT_CFG_NONE,
3849                                  *buflen + sizeof (u32));
3850       clib_memcpy (session->ext_config->data, buffer, *buflen);
3851       session->ext_config->len = *buflen;
3852       break;
3853
3854     default:
3855       rv = VPPCOM_EINVAL;
3856       break;
3857     }
3858
3859   return rv;
3860 }
3861
3862 int
3863 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3864                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3865 {
3866   vcl_worker_t *wrk = vcl_worker_get_current ();
3867   vcl_session_t *session;
3868   int rv = VPPCOM_OK;
3869
3870   if (flags == 0)
3871     rv = vppcom_session_read (session_handle, buffer, buflen);
3872   else if (flags & MSG_PEEK)
3873     rv = vppcom_session_peek (session_handle, buffer, buflen);
3874   else
3875     {
3876       VDBG (0, "Unsupport flags for recvfrom %d", flags);
3877       return VPPCOM_EAFNOSUPPORT;
3878     }
3879
3880   if (ep && rv > 0)
3881     {
3882       session = vcl_session_get_w_handle (wrk, session_handle);
3883       if (session->transport.is_ip4)
3884         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3885                           sizeof (ip4_address_t));
3886       else
3887         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3888                           sizeof (ip6_address_t));
3889       ep->is_ip4 = session->transport.is_ip4;
3890       ep->port = session->transport.rmt_port;
3891     }
3892
3893   return rv;
3894 }
3895
3896 int
3897 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3898                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3899 {
3900   vcl_worker_t *wrk = vcl_worker_get_current ();
3901   vcl_session_t *s;
3902
3903   s = vcl_session_get_w_handle (wrk, session_handle);
3904   if (PREDICT_FALSE (!s))
3905     return VPPCOM_EBADFD;
3906
3907   if (ep)
3908     {
3909       if (!vcl_session_is_cl (s))
3910         return VPPCOM_EINVAL;
3911
3912       /* Session not connected/bound in vpp. Create it by 'connecting' it */
3913       if (PREDICT_FALSE (s->session_state == VCL_STATE_CLOSED))
3914         {
3915           u32 session_index = s->session_index;
3916           f64 timeout = vcm->cfg.session_timeout;
3917           int rv;
3918
3919           vcl_send_session_connect (wrk, s);
3920           rv = vppcom_wait_for_session_state_change (session_index,
3921                                                      VCL_STATE_READY,
3922                                                      timeout);
3923           if (rv < 0)
3924             return rv;
3925           s = vcl_session_get (wrk, session_index);
3926         }
3927
3928       s->transport.is_ip4 = ep->is_ip4;
3929       s->transport.rmt_port = ep->port;
3930       vcl_ip_copy_from_ep (&s->transport.rmt_ip, ep);
3931     }
3932
3933   if (flags)
3934     {
3935       // TBD check the flags and do the right thing
3936       VDBG (2, "handling flags 0x%u (%d) not implemented yet.", flags, flags);
3937     }
3938
3939   return (vppcom_session_write_inline (wrk, s, buffer, buflen, 1,
3940                                        s->is_dgram ? 1 : 0));
3941 }
3942
3943 int
3944 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3945 {
3946   vcl_worker_t *wrk = vcl_worker_get_current ();
3947   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3948   u32 i, keep_trying = 1;
3949   svm_msg_q_msg_t msg;
3950   session_event_t *e;
3951   int rv, num_ev = 0;
3952
3953   VDBG (3, "vp %p, nsids %u, wait_for_time %f", vp, n_sids, wait_for_time);
3954
3955   if (!vp)
3956     return VPPCOM_EFAULT;
3957
3958   do
3959     {
3960       vcl_session_t *session;
3961
3962       /* Dequeue all events and drop all unhandled io events */
3963       while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
3964         {
3965           e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
3966           vcl_handle_mq_event (wrk, e);
3967           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
3968         }
3969       vec_reset_length (wrk->unhandled_evts_vector);
3970
3971       for (i = 0; i < n_sids; i++)
3972         {
3973           session = vcl_session_get (wrk, vp[i].sh);
3974           if (!session)
3975             {
3976               vp[i].revents = POLLHUP;
3977               num_ev++;
3978               continue;
3979             }
3980
3981           vp[i].revents = 0;
3982
3983           if (POLLIN & vp[i].events)
3984             {
3985               rv = vcl_session_read_ready (session);
3986               if (rv > 0)
3987                 {
3988                   vp[i].revents |= POLLIN;
3989                   num_ev++;
3990                 }
3991               else if (rv < 0)
3992                 {
3993                   switch (rv)
3994                     {
3995                     case VPPCOM_ECONNRESET:
3996                       vp[i].revents = POLLHUP;
3997                       break;
3998
3999                     default:
4000                       vp[i].revents = POLLERR;
4001                       break;
4002                     }
4003                   num_ev++;
4004                 }
4005             }
4006
4007           if (POLLOUT & vp[i].events)
4008             {
4009               rv = vcl_session_write_ready (session);
4010               if (rv > 0)
4011                 {
4012                   vp[i].revents |= POLLOUT;
4013                   num_ev++;
4014                 }
4015               else if (rv < 0)
4016                 {
4017                   switch (rv)
4018                     {
4019                     case VPPCOM_ECONNRESET:
4020                       vp[i].revents = POLLHUP;
4021                       break;
4022
4023                     default:
4024                       vp[i].revents = POLLERR;
4025                       break;
4026                     }
4027                   num_ev++;
4028                 }
4029             }
4030
4031           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
4032             {
4033               vp[i].revents = POLLNVAL;
4034               num_ev++;
4035             }
4036         }
4037       if (wait_for_time != -1)
4038         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
4039     }
4040   while ((num_ev == 0) && keep_trying);
4041
4042   return num_ev;
4043 }
4044
4045 int
4046 vppcom_mq_epoll_fd (void)
4047 {
4048   vcl_worker_t *wrk = vcl_worker_get_current ();
4049   return wrk->mqs_epfd;
4050 }
4051
4052 int
4053 vppcom_session_index (vcl_session_handle_t session_handle)
4054 {
4055   return session_handle & 0xFFFFFF;
4056 }
4057
4058 int
4059 vppcom_session_worker (vcl_session_handle_t session_handle)
4060 {
4061   return session_handle >> 24;
4062 }
4063
4064 int
4065 vppcom_worker_register (void)
4066 {
4067   if (!vcl_worker_alloc_and_init ())
4068     return VPPCOM_EEXIST;
4069
4070   if (vcl_worker_register_with_vpp ())
4071     return VPPCOM_EEXIST;
4072
4073   return VPPCOM_OK;
4074 }
4075
4076 void
4077 vppcom_worker_unregister (void)
4078 {
4079   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
4080   vcl_set_worker_index (~0);
4081 }
4082
4083 void
4084 vppcom_worker_index_set (int index)
4085 {
4086   vcl_set_worker_index (index);
4087 }
4088
4089 int
4090 vppcom_worker_index (void)
4091 {
4092   return vcl_get_worker_index ();
4093 }
4094
4095 int
4096 vppcom_worker_mqs_epfd (void)
4097 {
4098   vcl_worker_t *wrk = vcl_worker_get_current ();
4099   if (!vcm->cfg.use_mq_eventfd)
4100     return -1;
4101   return wrk->mqs_epfd;
4102 }
4103
4104 int
4105 vppcom_session_is_connectable_listener (uint32_t session_handle)
4106 {
4107   vcl_session_t *session;
4108   vcl_worker_t *wrk = vcl_worker_get_current ();
4109   session = vcl_session_get_w_handle (wrk, session_handle);
4110   if (!session)
4111     return VPPCOM_EBADFD;
4112   return vcl_session_is_connectable_listener (wrk, session);
4113 }
4114
4115 int
4116 vppcom_session_listener (uint32_t session_handle)
4117 {
4118   vcl_worker_t *wrk = vcl_worker_get_current ();
4119   vcl_session_t *listen_session, *session;
4120   session = vcl_session_get_w_handle (wrk, session_handle);
4121   if (!session)
4122     return VPPCOM_EBADFD;
4123   if (session->listener_index == VCL_INVALID_SESSION_INDEX)
4124     return VPPCOM_EBADFD;
4125   listen_session = vcl_session_get_w_handle (wrk, session->listener_index);
4126   if (!listen_session)
4127     return VPPCOM_EBADFD;
4128   return vcl_session_handle (listen_session);
4129 }
4130
4131 int
4132 vppcom_session_n_accepted (uint32_t session_handle)
4133 {
4134   vcl_worker_t *wrk = vcl_worker_get_current ();
4135   vcl_session_t *session = vcl_session_get_w_handle (wrk, session_handle);
4136   if (!session)
4137     return VPPCOM_EBADFD;
4138   return session->n_accepted_sessions;
4139 }
4140
4141 const char *
4142 vppcom_proto_str (vppcom_proto_t proto)
4143 {
4144   char const *proto_str;
4145
4146   switch (proto)
4147     {
4148     case VPPCOM_PROTO_TCP:
4149       proto_str = "TCP";
4150       break;
4151     case VPPCOM_PROTO_UDP:
4152       proto_str = "UDP";
4153       break;
4154     case VPPCOM_PROTO_TLS:
4155       proto_str = "TLS";
4156       break;
4157     case VPPCOM_PROTO_QUIC:
4158       proto_str = "QUIC";
4159       break;
4160     case VPPCOM_PROTO_DTLS:
4161       proto_str = "DTLS";
4162       break;
4163     case VPPCOM_PROTO_SRTP:
4164       proto_str = "SRTP";
4165       break;
4166     default:
4167       proto_str = "UNKNOWN";
4168       break;
4169     }
4170   return proto_str;
4171 }
4172
4173 const char *
4174 vppcom_retval_str (int retval)
4175 {
4176   char const *st;
4177
4178   switch (retval)
4179     {
4180     case VPPCOM_OK:
4181       st = "VPPCOM_OK";
4182       break;
4183
4184     case VPPCOM_EAGAIN:
4185       st = "VPPCOM_EAGAIN";
4186       break;
4187
4188     case VPPCOM_EFAULT:
4189       st = "VPPCOM_EFAULT";
4190       break;
4191
4192     case VPPCOM_ENOMEM:
4193       st = "VPPCOM_ENOMEM";
4194       break;
4195
4196     case VPPCOM_EINVAL:
4197       st = "VPPCOM_EINVAL";
4198       break;
4199
4200     case VPPCOM_EBADFD:
4201       st = "VPPCOM_EBADFD";
4202       break;
4203
4204     case VPPCOM_EAFNOSUPPORT:
4205       st = "VPPCOM_EAFNOSUPPORT";
4206       break;
4207
4208     case VPPCOM_ECONNABORTED:
4209       st = "VPPCOM_ECONNABORTED";
4210       break;
4211
4212     case VPPCOM_ECONNRESET:
4213       st = "VPPCOM_ECONNRESET";
4214       break;
4215
4216     case VPPCOM_ENOTCONN:
4217       st = "VPPCOM_ENOTCONN";
4218       break;
4219
4220     case VPPCOM_ECONNREFUSED:
4221       st = "VPPCOM_ECONNREFUSED";
4222       break;
4223
4224     case VPPCOM_ETIMEDOUT:
4225       st = "VPPCOM_ETIMEDOUT";
4226       break;
4227
4228     default:
4229       st = "UNKNOWN_STATE";
4230       break;
4231     }
4232
4233   return st;
4234 }
4235
4236 int
4237 vppcom_add_cert_key_pair (vppcom_cert_key_pair_t *ckpair)
4238 {
4239   if (vcm->cfg.vpp_app_socket_api)
4240     {
4241       clib_warning ("not supported");
4242       return VPPCOM_EINVAL;
4243     }
4244   return vcl_bapi_add_cert_key_pair (ckpair);
4245 }
4246
4247 int
4248 vppcom_del_cert_key_pair (uint32_t ckpair_index)
4249 {
4250   if (vcm->cfg.vpp_app_socket_api)
4251     {
4252       clib_warning ("not supported");
4253       return VPPCOM_EINVAL;
4254     }
4255   return vcl_bapi_del_cert_key_pair (ckpair_index);
4256 }
4257
4258 /*
4259  * fd.io coding-style-patch-verification: ON
4260  *
4261  * Local Variables:
4262  * eval: (c-set-style "gnu")
4263  * End:
4264  */