svm: allow mq attachments at random offsets
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <vcl/vppcom.h>
19 #include <vcl/vcl_debug.h>
20 #include <vcl/vcl_private.h>
21 #include <svm/fifo_segment.h>
22
23 __thread uword __vcl_worker_index = ~0;
24
25 static inline int
26 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
27 {
28   svm_msg_q_msg_t *msg;
29   u32 n_msgs;
30   int i;
31
32   n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
33   for (i = 0; i < n_msgs; i++)
34     {
35       vec_add2 (wrk->mq_msg_vector, msg, 1);
36       svm_msg_q_sub_w_lock (mq, msg);
37     }
38   return n_msgs;
39 }
40
41 const char *
42 vppcom_session_state_str (vcl_session_state_t state)
43 {
44   char *st;
45
46   switch (state)
47     {
48     case VCL_STATE_CLOSED:
49       st = "STATE_CLOSED";
50       break;
51     case VCL_STATE_LISTEN:
52       st = "STATE_LISTEN";
53       break;
54     case VCL_STATE_READY:
55       st = "STATE_READY";
56       break;
57     case VCL_STATE_VPP_CLOSING:
58       st = "STATE_VPP_CLOSING";
59       break;
60     case VCL_STATE_DISCONNECT:
61       st = "STATE_DISCONNECT";
62       break;
63     case VCL_STATE_DETACHED:
64       st = "STATE_DETACHED";
65       break;
66     case VCL_STATE_UPDATED:
67       st = "STATE_UPDATED";
68       break;
69     case VCL_STATE_LISTEN_NO_MQ:
70       st = "STATE_LISTEN_NO_MQ";
71       break;
72     default:
73       st = "UNKNOWN_STATE";
74       break;
75     }
76
77   return st;
78 }
79
80 u8 *
81 format_ip4_address (u8 * s, va_list * args)
82 {
83   u8 *a = va_arg (*args, u8 *);
84   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
85 }
86
87 u8 *
88 format_ip6_address (u8 * s, va_list * args)
89 {
90   ip6_address_t *a = va_arg (*args, ip6_address_t *);
91   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
92
93   i_max_n_zero = ARRAY_LEN (a->as_u16);
94   max_n_zeros = 0;
95   i_first_zero = i_max_n_zero;
96   n_zeros = 0;
97   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
98     {
99       u32 is_zero = a->as_u16[i] == 0;
100       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
101         {
102           i_first_zero = i;
103           n_zeros = 0;
104         }
105       n_zeros += is_zero;
106       if ((!is_zero && n_zeros > max_n_zeros)
107           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
108         {
109           i_max_n_zero = i_first_zero;
110           max_n_zeros = n_zeros;
111           i_first_zero = ARRAY_LEN (a->as_u16);
112           n_zeros = 0;
113         }
114     }
115
116   last_double_colon = 0;
117   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
118     {
119       if (i == i_max_n_zero && max_n_zeros > 1)
120         {
121           s = format (s, "::");
122           i += max_n_zeros - 1;
123           last_double_colon = 1;
124         }
125       else
126         {
127           s = format (s, "%s%x",
128                       (last_double_colon || i == 0) ? "" : ":",
129                       clib_net_to_host_u16 (a->as_u16[i]));
130           last_double_colon = 0;
131         }
132     }
133
134   return s;
135 }
136
137 /* Format an IP46 address. */
138 u8 *
139 format_ip46_address (u8 * s, va_list * args)
140 {
141   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
142   ip46_type_t type = va_arg (*args, ip46_type_t);
143   int is_ip4 = 1;
144
145   switch (type)
146     {
147     case IP46_TYPE_ANY:
148       is_ip4 = ip46_address_is_ip4 (ip46);
149       break;
150     case IP46_TYPE_IP4:
151       is_ip4 = 1;
152       break;
153     case IP46_TYPE_IP6:
154       is_ip4 = 0;
155       break;
156     }
157
158   return is_ip4 ?
159     format (s, "%U", format_ip4_address, &ip46->ip4) :
160     format (s, "%U", format_ip6_address, &ip46->ip6);
161 }
162
163 /*
164  * VPPCOM Utility Functions
165  */
166
167 static void
168 vcl_send_session_listen (vcl_worker_t * wrk, vcl_session_t * s)
169 {
170   app_session_evt_t _app_evt, *app_evt = &_app_evt;
171   session_listen_msg_t *mp;
172   svm_msg_q_t *mq;
173
174   mq = vcl_worker_ctrl_mq (wrk);
175   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_LISTEN);
176   mp = (session_listen_msg_t *) app_evt->evt->data;
177   memset (mp, 0, sizeof (*mp));
178   mp->client_index = wrk->api_client_handle;
179   mp->context = s->session_index;
180   mp->wrk_index = wrk->vpp_wrk_index;
181   mp->is_ip4 = s->transport.is_ip4;
182   clib_memcpy_fast (&mp->ip, &s->transport.lcl_ip, sizeof (mp->ip));
183   mp->port = s->transport.lcl_port;
184   mp->proto = s->session_type;
185   if (s->flags & VCL_SESSION_F_CONNECTED)
186     mp->flags = TRANSPORT_CFG_F_CONNECTED;
187   app_send_ctrl_evt_to_vpp (mq, app_evt);
188 }
189
190 static void
191 vcl_send_session_connect (vcl_worker_t * wrk, vcl_session_t * s)
192 {
193   app_session_evt_t _app_evt, *app_evt = &_app_evt;
194   session_connect_msg_t *mp;
195   svm_msg_q_t *mq;
196
197   mq = vcl_worker_ctrl_mq (wrk);
198   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_CONNECT);
199   mp = (session_connect_msg_t *) app_evt->evt->data;
200   memset (mp, 0, sizeof (*mp));
201   mp->client_index = wrk->api_client_handle;
202   mp->context = s->session_index;
203   mp->wrk_index = wrk->vpp_wrk_index;
204   mp->is_ip4 = s->transport.is_ip4;
205   mp->parent_handle = s->parent_handle;
206   clib_memcpy_fast (&mp->ip, &s->transport.rmt_ip, sizeof (mp->ip));
207   clib_memcpy_fast (&mp->lcl_ip, &s->transport.lcl_ip, sizeof (mp->lcl_ip));
208   mp->port = s->transport.rmt_port;
209   mp->lcl_port = s->transport.lcl_port;
210   mp->proto = s->session_type;
211   if (s->flags & VCL_SESSION_F_CONNECTED)
212     mp->flags |= TRANSPORT_CFG_F_CONNECTED;
213   app_send_ctrl_evt_to_vpp (mq, app_evt);
214 }
215
216 void
217 vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
218 {
219   app_session_evt_t _app_evt, *app_evt = &_app_evt;
220   session_unlisten_msg_t *mp;
221   svm_msg_q_t *mq;
222
223   mq = vcl_worker_ctrl_mq (wrk);
224   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_UNLISTEN);
225   mp = (session_unlisten_msg_t *) app_evt->evt->data;
226   memset (mp, 0, sizeof (*mp));
227   mp->client_index = wrk->api_client_handle;
228   mp->wrk_index = wrk->vpp_wrk_index;
229   mp->handle = s->vpp_handle;
230   mp->context = wrk->wrk_index;
231   app_send_ctrl_evt_to_vpp (mq, app_evt);
232 }
233
234 static void
235 vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s)
236 {
237   app_session_evt_t _app_evt, *app_evt = &_app_evt;
238   session_disconnect_msg_t *mp;
239   svm_msg_q_t *mq;
240
241   /* Send to thread that owns the session */
242   mq = s->vpp_evt_q;
243   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_DISCONNECT);
244   mp = (session_disconnect_msg_t *) app_evt->evt->data;
245   memset (mp, 0, sizeof (*mp));
246   mp->client_index = wrk->api_client_handle;
247   mp->handle = s->vpp_handle;
248   app_send_ctrl_evt_to_vpp (mq, app_evt);
249 }
250
251 static void
252 vcl_send_app_detach (vcl_worker_t * wrk)
253 {
254   app_session_evt_t _app_evt, *app_evt = &_app_evt;
255   session_app_detach_msg_t *mp;
256   svm_msg_q_t *mq;
257
258   mq = vcl_worker_ctrl_mq (wrk);
259   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_DETACH);
260   mp = (session_app_detach_msg_t *) app_evt->evt->data;
261   memset (mp, 0, sizeof (*mp));
262   mp->client_index = wrk->api_client_handle;
263   app_send_ctrl_evt_to_vpp (mq, app_evt);
264 }
265
266 static void
267 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
268                                  session_handle_t handle, int retval)
269 {
270   app_session_evt_t _app_evt, *app_evt = &_app_evt;
271   session_accepted_reply_msg_t *rmp;
272   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
273   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
274   rmp->handle = handle;
275   rmp->context = context;
276   rmp->retval = retval;
277   app_send_ctrl_evt_to_vpp (mq, app_evt);
278 }
279
280 static void
281 vcl_send_session_disconnected_reply (vcl_worker_t * wrk, vcl_session_t * s,
282                                      int retval)
283 {
284   app_session_evt_t _app_evt, *app_evt = &_app_evt;
285   session_disconnected_reply_msg_t *rmp;
286   app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
287                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
288   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
289   rmp->handle = s->vpp_handle;
290   rmp->context = wrk->api_client_handle;
291   rmp->retval = retval;
292   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
293 }
294
295 static void
296 vcl_send_session_reset_reply (vcl_worker_t * wrk, vcl_session_t * s,
297                               int retval)
298 {
299   app_session_evt_t _app_evt, *app_evt = &_app_evt;
300   session_reset_reply_msg_t *rmp;
301   app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
302                              SESSION_CTRL_EVT_RESET_REPLY);
303   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
304   rmp->handle = s->vpp_handle;
305   rmp->context = wrk->api_client_handle;
306   rmp->retval = retval;
307   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
308 }
309
310 void
311 vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
312                                 u32 wrk_index)
313 {
314   app_session_evt_t _app_evt, *app_evt = &_app_evt;
315   session_worker_update_msg_t *mp;
316
317   app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
318                              SESSION_CTRL_EVT_WORKER_UPDATE);
319   mp = (session_worker_update_msg_t *) app_evt->evt->data;
320   mp->client_index = wrk->api_client_handle;
321   mp->handle = s->vpp_handle;
322   mp->req_wrk_index = wrk->vpp_wrk_index;
323   mp->wrk_index = wrk_index;
324   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
325 }
326
327 int
328 vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len)
329 {
330   app_session_evt_t _app_evt, *app_evt = &_app_evt;
331   session_app_wrk_rpc_msg_t *mp;
332   vcl_worker_t *dst_wrk, *wrk;
333   svm_msg_q_t *mq;
334   int ret = -1;
335
336   if (data_len > sizeof (mp->data))
337     goto done;
338
339   clib_spinlock_lock (&vcm->workers_lock);
340
341   dst_wrk = vcl_worker_get_if_valid (dst_wrk_index);
342   if (!dst_wrk)
343     goto done;
344
345   wrk = vcl_worker_get_current ();
346   mq = vcl_worker_ctrl_mq (wrk);
347   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_WRK_RPC);
348   mp = (session_app_wrk_rpc_msg_t *) app_evt->evt->data;
349   mp->client_index = wrk->api_client_handle;
350   mp->wrk_index = dst_wrk->vpp_wrk_index;
351   clib_memcpy (mp->data, data, data_len);
352   app_send_ctrl_evt_to_vpp (mq, app_evt);
353   ret = 0;
354
355 done:
356   clib_spinlock_unlock (&vcm->workers_lock);
357   return ret;
358 }
359
360 static u32
361 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
362                               u32 ls_index)
363 {
364   vcl_session_t *session, *listen_session;
365   svm_msg_q_t *evt_q;
366
367   session = vcl_session_alloc (wrk);
368
369   listen_session = vcl_session_get (wrk, ls_index);
370   if (listen_session->vpp_handle != mp->listener_handle)
371     {
372       VDBG (0, "ERROR: listener handle %lu does not match session %u",
373             mp->listener_handle, ls_index);
374       goto error;
375     }
376
377   if (vcl_segment_attach_session (mp->segment_handle, mp->server_rx_fifo,
378                                   mp->server_tx_fifo,
379                                   mp->vpp_event_queue_address, 0, session))
380     {
381       VDBG (0, "failed to attach fifos for %u", session->session_index);
382       goto error;
383     }
384
385   session->vpp_handle = mp->handle;
386   session->session_state = VCL_STATE_READY;
387   session->transport.rmt_port = mp->rmt.port;
388   session->transport.is_ip4 = mp->rmt.is_ip4;
389   clib_memcpy_fast (&session->transport.rmt_ip, &mp->rmt.ip,
390                     sizeof (ip46_address_t));
391
392   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
393   session->transport.lcl_port = listen_session->transport.lcl_port;
394   session->transport.lcl_ip = listen_session->transport.lcl_ip;
395   session->session_type = listen_session->session_type;
396   session->is_dgram = vcl_proto_is_dgram (session->session_type);
397   session->listener_index = listen_session->session_index;
398   listen_session->n_accepted_sessions++;
399
400   VDBG (1, "session %u [0x%llx]: client accept request from %s address %U"
401         " port %d queue %p!", session->session_index, mp->handle,
402         mp->rmt.is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->rmt.ip,
403         mp->rmt.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
404         clib_net_to_host_u16 (mp->rmt.port), session->vpp_evt_q);
405   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
406
407   vcl_send_session_accepted_reply (session->vpp_evt_q, mp->context,
408                                    session->vpp_handle, 0);
409
410   return session->session_index;
411
412 error:
413   vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0),
414                          mp->vpp_event_queue_address, mp->mq_index, &evt_q);
415   vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
416                                    VNET_API_ERROR_INVALID_ARGUMENT);
417   vcl_session_free (wrk, session);
418   return VCL_INVALID_SESSION_INDEX;
419 }
420
421 static u32
422 vcl_session_connected_handler (vcl_worker_t * wrk,
423                                session_connected_msg_t * mp)
424 {
425   vcl_session_t *session = 0;
426   u32 session_index;
427
428   session_index = mp->context;
429   session = vcl_session_get (wrk, session_index);
430   if (!session)
431     {
432       VDBG (0, "ERROR: vpp handle 0x%llx has no session index (%u)!",
433             mp->handle, session_index);
434       return VCL_INVALID_SESSION_INDEX;
435     }
436   if (mp->retval)
437     {
438       VDBG (0, "ERROR: session index %u: connect failed! %U",
439             session_index, format_session_error, mp->retval);
440       session->session_state = VCL_STATE_DETACHED;
441       session->vpp_handle = mp->handle;
442       return session_index;
443     }
444
445   session->vpp_handle = mp->handle;
446
447   if (vcl_segment_attach_session (mp->segment_handle, mp->server_rx_fifo,
448                                   mp->server_tx_fifo,
449                                   mp->vpp_event_queue_address, 0, session))
450     {
451       VDBG (0, "failed to attach fifos for %u", session->session_index);
452       session->session_state = VCL_STATE_DETACHED;
453       vcl_send_session_disconnect (wrk, session);
454       return session_index;
455     }
456
457   if (mp->ct_rx_fifo)
458     {
459       if (vcl_segment_attach_session (mp->ct_segment_handle, mp->ct_rx_fifo,
460                                       mp->ct_tx_fifo, (uword) ~0, 1, session))
461         {
462           VDBG (0, "failed to attach ct fifos for %u", session->session_index);
463           session->session_state = VCL_STATE_DETACHED;
464           vcl_send_session_disconnect (wrk, session);
465           return session_index;
466         }
467     }
468
469   session->transport.is_ip4 = mp->lcl.is_ip4;
470   clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip,
471                     sizeof (session->transport.lcl_ip));
472   session->transport.lcl_port = mp->lcl.port;
473
474   /* Application closed session before connect reply */
475   if (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK)
476       && session->session_state == VCL_STATE_CLOSED)
477     vcl_send_session_disconnect (wrk, session);
478   else
479     session->session_state = VCL_STATE_READY;
480
481   /* Add it to lookup table */
482   vcl_session_table_add_vpp_handle (wrk, mp->handle, session_index);
483
484   VDBG (1, "session %u [0x%llx] connected! rx_fifo %p, refcnt %d, tx_fifo %p,"
485         " refcnt %d", session_index, mp->handle, session->rx_fifo,
486         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
487
488   return session_index;
489 }
490
491 static int
492 vcl_flag_accepted_session (vcl_session_t * session, u64 handle, u32 flags)
493 {
494   vcl_session_msg_t *accepted_msg;
495   int i;
496
497   for (i = 0; i < vec_len (session->accept_evts_fifo); i++)
498     {
499       accepted_msg = &session->accept_evts_fifo[i];
500       if (accepted_msg->accepted_msg.handle == handle)
501         {
502           accepted_msg->flags |= flags;
503           return 1;
504         }
505     }
506   return 0;
507 }
508
509 static u32
510 vcl_session_reset_handler (vcl_worker_t * wrk,
511                            session_reset_msg_t * reset_msg)
512 {
513   vcl_session_t *session;
514   u32 sid;
515
516   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
517   session = vcl_session_get (wrk, sid);
518   if (!session)
519     {
520       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
521       return VCL_INVALID_SESSION_INDEX;
522     }
523
524   /* Caught a reset before actually accepting the session */
525   if (session->session_state == VCL_STATE_LISTEN)
526     {
527
528       if (!vcl_flag_accepted_session (session, reset_msg->handle,
529                                       VCL_ACCEPTED_F_RESET))
530         VDBG (0, "session was not accepted!");
531       return VCL_INVALID_SESSION_INDEX;
532     }
533
534   if (session->session_state != VCL_STATE_CLOSED)
535     session->session_state = VCL_STATE_DISCONNECT;
536   VDBG (0, "reset session %u [0x%llx]", sid, reset_msg->handle);
537   return sid;
538 }
539
540 static u32
541 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
542 {
543   vcl_session_t *session;
544   u32 sid = mp->context;
545
546   session = vcl_session_get (wrk, sid);
547   if (mp->retval)
548     {
549       VERR ("session %u [0x%llx]: bind failed: %U", sid, mp->handle,
550             format_session_error, mp->retval);
551       if (session)
552         {
553           session->session_state = VCL_STATE_DETACHED;
554           session->vpp_handle = mp->handle;
555           return sid;
556         }
557       else
558         {
559           VDBG (0, "ERROR: session %u [0x%llx]: Invalid session index!",
560                 sid, mp->handle);
561           return VCL_INVALID_SESSION_INDEX;
562         }
563     }
564
565   session->vpp_handle = mp->handle;
566   session->transport.is_ip4 = mp->lcl_is_ip4;
567   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
568                     sizeof (ip46_address_t));
569   session->transport.lcl_port = mp->lcl_port;
570   vcl_session_table_add_listener (wrk, mp->handle, sid);
571   session->session_state = VCL_STATE_LISTEN;
572
573   if (vcl_session_is_cl (session))
574     {
575       if (vcl_segment_attach_session (mp->segment_handle, mp->rx_fifo,
576                                       mp->tx_fifo, mp->vpp_evt_q, 0, session))
577         {
578           VDBG (0, "failed to attach fifos for %u", session->session_index);
579           session->session_state = VCL_STATE_DETACHED;
580           return VCL_INVALID_SESSION_INDEX;
581         }
582     }
583
584   VDBG (0, "session %u [0x%llx]: listen succeeded!", sid, mp->handle);
585   return sid;
586 }
587
588 static void
589 vcl_session_unlisten_reply_handler (vcl_worker_t * wrk, void *data)
590 {
591   session_unlisten_reply_msg_t *mp = (session_unlisten_reply_msg_t *) data;
592   vcl_session_t *s;
593
594   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
595   if (!s)
596     {
597       VDBG (0, "Unlisten reply with wrong handle %llx", mp->handle);
598       return;
599     }
600   if (s->session_state != VCL_STATE_DISCONNECT)
601     {
602       /* Connected udp listener */
603       if (s->session_type == VPPCOM_PROTO_UDP
604           && s->session_state == VCL_STATE_CLOSED)
605         return;
606
607       VDBG (0, "Unlisten session in wrong state %llx", mp->handle);
608       return;
609     }
610
611   if (mp->retval)
612     VDBG (0, "ERROR: session %u [0xllx]: unlisten failed: %U",
613           s->session_index, mp->handle, format_session_error, mp->retval);
614
615   if (mp->context != wrk->wrk_index)
616     VDBG (0, "wrong context");
617
618   vcl_session_table_del_vpp_handle (wrk, mp->handle);
619   vcl_session_free (wrk, s);
620 }
621
622 static void
623 vcl_session_migrated_handler (vcl_worker_t * wrk, void *data)
624 {
625   session_migrated_msg_t *mp = (session_migrated_msg_t *) data;
626   vcl_session_t *s;
627   u32 fs_index;
628
629   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
630   if (!s)
631     {
632       VDBG (0, "Migrated notification with wrong handle %llx", mp->handle);
633       return;
634     }
635
636   fs_index = vcl_segment_table_lookup (mp->segment_handle);
637   if (fs_index == VCL_INVALID_SEGMENT_INDEX)
638     {
639       VDBG (0, "segment for session %u is not mounted!", s->session_index);
640       s->session_state = VCL_STATE_DETACHED;
641       return;
642     }
643
644   s->vpp_handle = mp->new_handle;
645
646   vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0), mp->vpp_evt_q,
647                          mp->vpp_thread_index, &s->vpp_evt_q);
648
649   vcl_session_table_del_vpp_handle (wrk, mp->handle);
650   vcl_session_table_add_vpp_handle (wrk, mp->new_handle, s->session_index);
651
652   /* Generate new tx event if we have outstanding data */
653   if (svm_fifo_has_event (s->tx_fifo))
654     app_send_io_evt_to_vpp (s->vpp_evt_q,
655                             s->tx_fifo->shr->master_session_index,
656                             SESSION_IO_EVT_TX, SVM_Q_WAIT);
657
658   VDBG (0, "Migrated 0x%lx to thread %u 0x%lx", mp->handle,
659         mp->vpp_thread_index, mp->new_handle);
660 }
661
662 static vcl_session_t *
663 vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg)
664 {
665   vcl_session_msg_t *vcl_msg;
666   vcl_session_t *session;
667
668   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
669   if (PREDICT_FALSE (session != 0))
670     VWRN ("session overlap handle %lu state %u!", msg->handle,
671           session->session_state);
672
673   session = vcl_session_table_lookup_listener (wrk, msg->listener_handle);
674   if (!session)
675     {
676       VERR ("couldn't find listen session: listener handle %llx",
677             msg->listener_handle);
678       return 0;
679     }
680
681   clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
682   vcl_msg->flags = 0;
683   vcl_msg->accepted_msg = *msg;
684   /* Session handle points to listener until fully accepted by app */
685   vcl_session_table_add_vpp_handle (wrk, msg->handle, session->session_index);
686
687   return session;
688 }
689
690 static vcl_session_t *
691 vcl_session_disconnected_handler (vcl_worker_t * wrk,
692                                   session_disconnected_msg_t * msg)
693 {
694   vcl_session_t *session;
695
696   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
697   if (!session)
698     {
699       VDBG (0, "request to disconnect unknown handle 0x%llx", msg->handle);
700       return 0;
701     }
702
703   /* Late disconnect notification on a session that has been closed */
704   if (session->session_state == VCL_STATE_CLOSED)
705     return 0;
706
707   /* Caught a disconnect before actually accepting the session */
708   if (session->session_state == VCL_STATE_LISTEN)
709     {
710       if (!vcl_flag_accepted_session (session, msg->handle,
711                                       VCL_ACCEPTED_F_CLOSED))
712         VDBG (0, "session was not accepted!");
713       return 0;
714     }
715
716   /* If not already reset change state */
717   if (session->session_state != VCL_STATE_DISCONNECT)
718     session->session_state = VCL_STATE_VPP_CLOSING;
719
720   return session;
721 }
722
723 static int
724 vppcom_session_disconnect (u32 session_handle)
725 {
726   vcl_worker_t *wrk = vcl_worker_get_current ();
727   vcl_session_t *session, *listen_session;
728   vcl_session_state_t state;
729   u64 vpp_handle;
730
731   session = vcl_session_get_w_handle (wrk, session_handle);
732   if (!session)
733     return VPPCOM_EBADFD;
734
735   vpp_handle = session->vpp_handle;
736   state = session->session_state;
737
738   VDBG (1, "session %u [0x%llx] state 0x%x (%s)", session->session_index,
739         vpp_handle, state, vppcom_session_state_str (state));
740
741   if (PREDICT_FALSE (state == VCL_STATE_LISTEN))
742     {
743       VDBG (0, "ERROR: Cannot disconnect a listen socket!");
744       return VPPCOM_EBADFD;
745     }
746
747   if (state == VCL_STATE_VPP_CLOSING)
748     {
749       vcl_send_session_disconnected_reply (wrk, session, 0);
750       VDBG (1, "session %u [0x%llx]: sending disconnect REPLY...",
751             session->session_index, vpp_handle);
752     }
753   else
754     {
755       /* Session doesn't have an event queue yet. Probably a non-blocking
756        * connect. Wait for the reply */
757       if (PREDICT_FALSE (!session->vpp_evt_q))
758         return VPPCOM_OK;
759
760       VDBG (1, "session %u [0x%llx]: sending disconnect...",
761             session->session_index, vpp_handle);
762       vcl_send_session_disconnect (wrk, session);
763     }
764
765   if (session->listener_index != VCL_INVALID_SESSION_INDEX)
766     {
767       listen_session = vcl_session_get (wrk, session->listener_index);
768       listen_session->n_accepted_sessions--;
769     }
770
771   return VPPCOM_OK;
772 }
773
774 static void
775 vcl_session_cleanup_handler (vcl_worker_t * wrk, void *data)
776 {
777   session_cleanup_msg_t *msg;
778   vcl_session_t *session;
779
780   msg = (session_cleanup_msg_t *) data;
781   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
782   if (!session)
783     {
784       VDBG (0, "disconnect confirmed for unknown handle 0x%llx", msg->handle);
785       return;
786     }
787
788   if (msg->type == SESSION_CLEANUP_TRANSPORT)
789     {
790       /* Transport was cleaned up before we confirmed close. Probably the
791        * app is still waiting for some data that cannot be delivered.
792        * Confirm close to make sure everything is cleaned up.
793        * Move to undetermined state to ensure that the session is not
794        * removed before both vpp and the app cleanup.
795        * - If the app closes first, the session is moved to CLOSED state
796        *   and the session cleanup notification from vpp removes the
797        *   session.
798        * - If vpp cleans up the session first, the session is moved to
799        *   DETACHED state lower and subsequently the close from the app
800        *   frees the session
801        */
802       if (session->session_state == VCL_STATE_VPP_CLOSING)
803         {
804           vppcom_session_disconnect (vcl_session_handle (session));
805           session->session_state = VCL_STATE_UPDATED;
806         }
807       else if (session->session_state == VCL_STATE_DISCONNECT)
808         {
809           vcl_send_session_reset_reply (wrk, session, 0);
810           session->session_state = VCL_STATE_UPDATED;
811         }
812       return;
813     }
814
815   vcl_session_table_del_vpp_handle (wrk, msg->handle);
816   /* Should not happen. App did not close the connection so don't free it. */
817   if (session->session_state != VCL_STATE_CLOSED)
818     {
819       VDBG (0, "app did not close session %d", session->session_index);
820       session->session_state = VCL_STATE_DETACHED;
821       session->vpp_handle = VCL_INVALID_SESSION_HANDLE;
822       return;
823     }
824   vcl_session_free (wrk, session);
825 }
826
827 static void
828 vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
829 {
830   session_req_worker_update_msg_t *msg;
831   vcl_session_t *s;
832
833   msg = (session_req_worker_update_msg_t *) data;
834   s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
835   if (!s)
836     return;
837
838   vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
839 }
840
841 static void
842 vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
843 {
844   session_worker_update_reply_msg_t *msg;
845   vcl_session_t *s;
846
847   msg = (session_worker_update_reply_msg_t *) data;
848   s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
849   if (!s)
850     {
851       VDBG (0, "unknown handle 0x%llx", msg->handle);
852       return;
853     }
854
855   if (s->rx_fifo)
856     {
857       if (vcl_segment_attach_session (msg->segment_handle, msg->rx_fifo,
858                                       msg->tx_fifo, (uword) ~0, 0, s))
859         {
860           VDBG (0, "failed to attach fifos for %u", s->session_index);
861           return;
862         }
863     }
864   s->session_state = VCL_STATE_UPDATED;
865
866   VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
867         s->vpp_handle, wrk->wrk_index);
868 }
869
870 static int
871 vcl_api_recv_fd (vcl_worker_t * wrk, int *fds, int n_fds)
872 {
873
874   if (vcm->cfg.vpp_app_socket_api)
875     return vcl_sapi_recv_fds (wrk, fds, n_fds);
876
877   return vcl_bapi_recv_fds (wrk, fds, n_fds);
878 }
879
880 static void
881 vcl_session_app_add_segment_handler (vcl_worker_t * wrk, void *data)
882 {
883   ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM;
884   session_app_add_segment_msg_t *msg;
885   u64 segment_handle;
886   int fd = -1;
887
888   msg = (session_app_add_segment_msg_t *) data;
889
890   if (msg->fd_flags)
891     {
892       vcl_api_recv_fd (wrk, &fd, 1);
893       seg_type = SSVM_SEGMENT_MEMFD;
894     }
895
896   segment_handle = msg->segment_handle;
897   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
898     {
899       clib_warning ("invalid segment handle");
900       return;
901     }
902
903   if (vcl_segment_attach (segment_handle, (char *) msg->segment_name,
904                           seg_type, fd))
905     {
906       VDBG (0, "vcl_segment_attach ('%s') failed", msg->segment_name);
907       return;
908     }
909
910   VDBG (1, "mapped new segment '%s' size %d", msg->segment_name,
911         msg->segment_size);
912 }
913
914 static void
915 vcl_session_app_del_segment_handler (vcl_worker_t * wrk, void *data)
916 {
917   session_app_del_segment_msg_t *msg = (session_app_del_segment_msg_t *) data;
918   vcl_segment_detach (msg->segment_handle);
919   VDBG (1, "Unmapped segment: %d", msg->segment_handle);
920 }
921
922 static void
923 vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data)
924 {
925   if (!vcm->wrk_rpc_fn)
926     return;
927
928   (vcm->wrk_rpc_fn) (((session_app_wrk_rpc_msg_t *) data)->data);
929 }
930
931 static int
932 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
933 {
934   session_disconnected_msg_t *disconnected_msg;
935   session_connected_msg_t *connected_msg;
936   session_reset_msg_t *reset_msg;
937   session_event_t *ecpy;
938   vcl_session_t *s;
939   u32 sid;
940
941   switch (e->event_type)
942     {
943     case SESSION_IO_EVT_RX:
944     case SESSION_IO_EVT_TX:
945       s = vcl_session_get (wrk, e->session_index);
946       if (!s || !vcl_session_is_open (s))
947         break;
948       vec_add1 (wrk->unhandled_evts_vector, *e);
949       break;
950     case SESSION_CTRL_EVT_BOUND:
951       /* We can only wait for only one listen so not postponed */
952       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
953       break;
954     case SESSION_CTRL_EVT_ACCEPTED:
955       s = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
956       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
957         {
958           vec_add2 (wrk->unhandled_evts_vector, ecpy, 1);
959           *ecpy = *e;
960           ecpy->postponed = 1;
961           ecpy->session_index = s->session_index;
962         }
963       break;
964     case SESSION_CTRL_EVT_CONNECTED:
965       connected_msg = (session_connected_msg_t *) e->data;
966       sid = vcl_session_connected_handler (wrk, connected_msg);
967       if (!(s = vcl_session_get (wrk, sid)))
968         break;
969       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
970         {
971           vec_add2 (wrk->unhandled_evts_vector, ecpy, 1);
972           *ecpy = *e;
973           ecpy->postponed = 1;
974           ecpy->session_index = s->session_index;
975         }
976       break;
977     case SESSION_CTRL_EVT_DISCONNECTED:
978       disconnected_msg = (session_disconnected_msg_t *) e->data;
979       if (!(s = vcl_session_get_w_vpp_handle (wrk, disconnected_msg->handle)))
980         break;
981       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
982         {
983           vec_add1 (wrk->unhandled_evts_vector, *e);
984           break;
985         }
986       if (!(s = vcl_session_disconnected_handler (wrk, disconnected_msg)))
987         break;
988       VDBG (0, "disconnected session %u [0x%llx]", s->session_index,
989             s->vpp_handle);
990       break;
991     case SESSION_CTRL_EVT_RESET:
992       reset_msg = (session_reset_msg_t *) e->data;
993       if (!(s = vcl_session_get_w_vpp_handle (wrk, reset_msg->handle)))
994         break;
995       if (vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
996         {
997           vec_add1 (wrk->unhandled_evts_vector, *e);
998           break;
999         }
1000       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
1001       break;
1002     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
1003       vcl_session_unlisten_reply_handler (wrk, e->data);
1004       break;
1005     case SESSION_CTRL_EVT_MIGRATED:
1006       vcl_session_migrated_handler (wrk, e->data);
1007       break;
1008     case SESSION_CTRL_EVT_CLEANUP:
1009       vcl_session_cleanup_handler (wrk, e->data);
1010       break;
1011     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
1012       vcl_session_req_worker_update_handler (wrk, e->data);
1013       break;
1014     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
1015       vcl_session_worker_update_reply_handler (wrk, e->data);
1016       break;
1017     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
1018       vcl_session_app_add_segment_handler (wrk, e->data);
1019       break;
1020     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
1021       vcl_session_app_del_segment_handler (wrk, e->data);
1022       break;
1023     case SESSION_CTRL_EVT_APP_WRK_RPC:
1024       vcl_worker_rpc_handler (wrk, e->data);
1025       break;
1026     default:
1027       clib_warning ("unhandled %u", e->event_type);
1028     }
1029   return VPPCOM_OK;
1030 }
1031
1032 static int
1033 vppcom_wait_for_session_state_change (u32 session_index,
1034                                       vcl_session_state_t state,
1035                                       f64 wait_for_time)
1036 {
1037   vcl_worker_t *wrk = vcl_worker_get_current ();
1038   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
1039   vcl_session_t *volatile session;
1040   svm_msg_q_msg_t msg;
1041   session_event_t *e;
1042
1043   do
1044     {
1045       session = vcl_session_get (wrk, session_index);
1046       if (PREDICT_FALSE (!session))
1047         {
1048           return VPPCOM_EBADFD;
1049         }
1050       if (session->session_state == state)
1051         {
1052           return VPPCOM_OK;
1053         }
1054       if (session->session_state == VCL_STATE_DETACHED)
1055         {
1056           return VPPCOM_ECONNREFUSED;
1057         }
1058
1059       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
1060         {
1061           usleep (100);
1062           continue;
1063         }
1064       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1065       vcl_handle_mq_event (wrk, e);
1066       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1067     }
1068   while (clib_time_now (&wrk->clib_time) < timeout);
1069
1070   VDBG (0, "timeout waiting for state 0x%x (%s)", state,
1071         vppcom_session_state_str (state));
1072   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
1073
1074   return VPPCOM_ETIMEDOUT;
1075 }
1076
1077 static void
1078 vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
1079 {
1080   vcl_session_state_t state;
1081   vcl_session_t *s;
1082   u32 *sip;
1083
1084   if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
1085     return;
1086
1087   vec_foreach (sip, wrk->pending_session_wrk_updates)
1088   {
1089     s = vcl_session_get (wrk, *sip);
1090     vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
1091     state = s->session_state;
1092     vppcom_wait_for_session_state_change (s->session_index, VCL_STATE_UPDATED,
1093                                           5);
1094     s->session_state = state;
1095   }
1096   vec_reset_length (wrk->pending_session_wrk_updates);
1097 }
1098
1099 void
1100 vcl_flush_mq_events (void)
1101 {
1102   vcl_worker_t *wrk = vcl_worker_get_current ();
1103   svm_msg_q_msg_t *msg;
1104   session_event_t *e;
1105   svm_msg_q_t *mq;
1106   int i;
1107
1108   mq = wrk->app_event_queue;
1109   svm_msg_q_lock (mq);
1110   vcl_mq_dequeue_batch (wrk, mq, ~0);
1111   svm_msg_q_unlock (mq);
1112
1113   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
1114     {
1115       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
1116       e = svm_msg_q_msg_data (mq, msg);
1117       vcl_handle_mq_event (wrk, e);
1118       svm_msg_q_free_msg (mq, msg);
1119     }
1120   vec_reset_length (wrk->mq_msg_vector);
1121   vcl_handle_pending_wrk_updates (wrk);
1122 }
1123
1124 static int
1125 vppcom_session_unbind (u32 session_handle)
1126 {
1127   vcl_worker_t *wrk = vcl_worker_get_current ();
1128   session_accepted_msg_t *accepted_msg;
1129   vcl_session_t *session = 0;
1130   vcl_session_msg_t *evt;
1131
1132   session = vcl_session_get_w_handle (wrk, session_handle);
1133   if (!session)
1134     return VPPCOM_EBADFD;
1135
1136   /* Flush pending accept events, if any */
1137   while (clib_fifo_elts (session->accept_evts_fifo))
1138     {
1139       clib_fifo_sub2 (session->accept_evts_fifo, evt);
1140       accepted_msg = &evt->accepted_msg;
1141       vcl_session_table_del_vpp_handle (wrk, accepted_msg->handle);
1142       vcl_send_session_accepted_reply (session->vpp_evt_q,
1143                                        accepted_msg->context,
1144                                        accepted_msg->handle, -1);
1145     }
1146   clib_fifo_free (session->accept_evts_fifo);
1147
1148   vcl_send_session_unlisten (wrk, session);
1149
1150   VDBG (1, "session %u [0x%llx]: sending unbind!", session->session_index,
1151         session->vpp_handle);
1152   vcl_evt (VCL_EVT_UNBIND, session);
1153
1154   session->vpp_handle = ~0;
1155   session->session_state = VCL_STATE_DISCONNECT;
1156
1157   return VPPCOM_OK;
1158 }
1159
1160 /**
1161  * Handle app exit
1162  *
1163  * Notify vpp of the disconnect and mark the worker as free. If we're the
1164  * last worker, do a full cleanup otherwise, since we're probably a forked
1165  * child, avoid syscalls as much as possible. We might've lost privileges.
1166  */
1167 void
1168 vppcom_app_exit (void)
1169 {
1170   if (!pool_elts (vcm->workers))
1171     return;
1172   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
1173   vcl_set_worker_index (~0);
1174   vcl_elog_stop (vcm);
1175 }
1176
1177 static int
1178 vcl_api_attach (void)
1179 {
1180   if (vcm->cfg.vpp_app_socket_api)
1181     return vcl_sapi_attach ();
1182
1183   return vcl_bapi_attach ();
1184 }
1185
1186 static void
1187 vcl_api_detach (vcl_worker_t * wrk)
1188 {
1189   vcl_send_app_detach (wrk);
1190
1191   if (vcm->cfg.vpp_app_socket_api)
1192     return vcl_sapi_detach (wrk);
1193
1194   return vcl_bapi_disconnect_from_vpp ();
1195 }
1196
1197 /*
1198  * VPPCOM Public API functions
1199  */
1200 int
1201 vppcom_app_create (const char *app_name)
1202 {
1203   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
1204   int rv;
1205
1206   if (vcm->is_init)
1207     {
1208       VDBG (1, "already initialized");
1209       return VPPCOM_EEXIST;
1210     }
1211
1212   vcm->is_init = 1;
1213   vppcom_cfg (&vcm->cfg);
1214   vcl_cfg = &vcm->cfg;
1215
1216   vcm->main_cpu = pthread_self ();
1217   vcm->main_pid = getpid ();
1218   vcm->app_name = format (0, "%s", app_name);
1219   fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
1220                           20 /* timeout in secs */ );
1221   pool_alloc (vcm->workers, vcl_cfg->max_workers);
1222   clib_spinlock_init (&vcm->workers_lock);
1223   clib_rwlock_init (&vcm->segment_table_lock);
1224   atexit (vppcom_app_exit);
1225   vcl_elog_init (vcm);
1226
1227   /* Allocate default worker */
1228   vcl_worker_alloc_and_init ();
1229
1230   if ((rv = vcl_api_attach ()))
1231     return rv;
1232
1233   VDBG (0, "app_name '%s', my_client_index %d (0x%x)", app_name,
1234         vcm->workers[0].api_client_handle, vcm->workers[0].api_client_handle);
1235
1236   return VPPCOM_OK;
1237 }
1238
1239 void
1240 vppcom_app_destroy (void)
1241 {
1242   vcl_worker_t *wrk, *current_wrk;
1243   void *heap;
1244
1245   if (!pool_elts (vcm->workers))
1246     return;
1247
1248   vcl_evt (VCL_EVT_DETACH, vcm);
1249
1250   current_wrk = vcl_worker_get_current ();
1251
1252   /* *INDENT-OFF* */
1253   pool_foreach (wrk, vcm->workers)  {
1254     if (current_wrk != wrk)
1255       vcl_worker_cleanup (wrk, 0 /* notify vpp */ );
1256   }
1257   /* *INDENT-ON* */
1258
1259   vcl_api_detach (current_wrk);
1260   vcl_worker_cleanup (current_wrk, 0 /* notify vpp */ );
1261
1262   vcl_elog_stop (vcm);
1263
1264   /*
1265    * Free the heap and fix vcm
1266    */
1267   heap = clib_mem_get_heap ();
1268   munmap (clib_mem_get_heap_base (heap), clib_mem_get_heap_size (heap));
1269
1270   vcm = &_vppcom_main;
1271   vcm->is_init = 0;
1272 }
1273
1274 int
1275 vppcom_session_create (u8 proto, u8 is_nonblocking)
1276 {
1277   vcl_worker_t *wrk = vcl_worker_get_current ();
1278   vcl_session_t *session;
1279
1280   session = vcl_session_alloc (wrk);
1281
1282   session->session_type = proto;
1283   session->session_state = VCL_STATE_CLOSED;
1284   session->vpp_handle = ~0;
1285   session->is_dgram = vcl_proto_is_dgram (proto);
1286
1287   if (is_nonblocking)
1288     vcl_session_set_attr (session, VCL_SESS_ATTR_NONBLOCK);
1289
1290   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
1291            is_nonblocking, session_index);
1292
1293   VDBG (0, "created session %u", session->session_index);
1294
1295   return vcl_session_handle (session);
1296 }
1297
1298 int
1299 vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * s,
1300                      vcl_session_handle_t sh, u8 do_disconnect)
1301 {
1302   int rv = VPPCOM_OK;
1303
1304   VDBG (1, "session %u [0x%llx] closing", s->session_index, s->vpp_handle);
1305
1306   if (s->flags & VCL_SESSION_F_IS_VEP)
1307     {
1308       u32 next_sh = s->vep.next_sh;
1309       while (next_sh != ~0)
1310         {
1311           rv = vppcom_epoll_ctl (sh, EPOLL_CTL_DEL, next_sh, 0);
1312           if (PREDICT_FALSE (rv < 0))
1313             VDBG (0, "vpp handle 0x%llx, sh %u: EPOLL_CTL_DEL vep_idx %u"
1314                   " failed! rv %d (%s)", s->vpp_handle, next_sh,
1315                   s->vep.vep_sh, rv, vppcom_retval_str (rv));
1316           next_sh = s->vep.next_sh;
1317         }
1318       goto free_session;
1319     }
1320
1321   if (s->flags & VCL_SESSION_F_IS_VEP_SESSION)
1322     {
1323       rv = vppcom_epoll_ctl (s->vep.vep_sh, EPOLL_CTL_DEL, sh, 0);
1324       if (rv < 0)
1325         VDBG (0, "session %u [0x%llx]: EPOLL_CTL_DEL vep_idx %u "
1326               "failed! rv %d (%s)", s->session_index, s->vpp_handle,
1327               s->vep.vep_sh, rv, vppcom_retval_str (rv));
1328     }
1329
1330   if (!do_disconnect)
1331     {
1332       VDBG (1, "session %u [0x%llx] disconnect skipped",
1333             s->session_index, s->vpp_handle);
1334       goto cleanup;
1335     }
1336
1337   if (s->session_state == VCL_STATE_LISTEN)
1338     {
1339       rv = vppcom_session_unbind (sh);
1340       if (PREDICT_FALSE (rv < 0))
1341         VDBG (0, "session %u [0x%llx]: listener unbind failed! "
1342               "rv %d (%s)", s->session_index, s->vpp_handle, rv,
1343               vppcom_retval_str (rv));
1344       return rv;
1345     }
1346   else if (vcl_session_is_ready (s)
1347            || (vcl_session_is_connectable_listener (wrk, s)))
1348     {
1349       rv = vppcom_session_disconnect (sh);
1350       if (PREDICT_FALSE (rv < 0))
1351         VDBG (0, "ERROR: session %u [0x%llx]: disconnect failed!"
1352               " rv %d (%s)", s->session_index, s->vpp_handle,
1353               rv, vppcom_retval_str (rv));
1354     }
1355   else if (s->session_state == VCL_STATE_DISCONNECT)
1356     {
1357       vcl_send_session_reset_reply (wrk, s, 0);
1358     }
1359   else if (s->session_state == VCL_STATE_DETACHED)
1360     {
1361       /* Should not happen. VPP cleaned up before app confirmed close */
1362       VDBG (0, "vpp freed session %d before close", s->session_index);
1363       goto free_session;
1364     }
1365
1366   s->session_state = VCL_STATE_CLOSED;
1367
1368   /* Session is removed only after vpp confirms the disconnect */
1369   return rv;
1370
1371 cleanup:
1372   vcl_session_table_del_vpp_handle (wrk, s->vpp_handle);
1373 free_session:
1374   vcl_session_free (wrk, s);
1375   vcl_evt (VCL_EVT_CLOSE, s, rv);
1376
1377   return rv;
1378 }
1379
1380 int
1381 vppcom_session_close (uint32_t session_handle)
1382 {
1383   vcl_worker_t *wrk = vcl_worker_get_current ();
1384   vcl_session_t *session;
1385
1386   session = vcl_session_get_w_handle (wrk, session_handle);
1387   if (!session)
1388     return VPPCOM_EBADFD;
1389   return vcl_session_cleanup (wrk, session, session_handle,
1390                               1 /* do_disconnect */ );
1391 }
1392
1393 int
1394 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
1395 {
1396   vcl_worker_t *wrk = vcl_worker_get_current ();
1397   vcl_session_t *session = 0;
1398
1399   if (!ep || !ep->ip)
1400     return VPPCOM_EINVAL;
1401
1402   session = vcl_session_get_w_handle (wrk, session_handle);
1403   if (!session)
1404     return VPPCOM_EBADFD;
1405
1406   if (session->flags & VCL_SESSION_F_IS_VEP)
1407     {
1408       VDBG (0, "ERROR: cannot bind to epoll session %u!",
1409             session->session_index);
1410       return VPPCOM_EBADFD;
1411     }
1412
1413   session->transport.is_ip4 = ep->is_ip4;
1414   if (ep->is_ip4)
1415     clib_memcpy_fast (&session->transport.lcl_ip.ip4, ep->ip,
1416                       sizeof (ip4_address_t));
1417   else
1418     clib_memcpy_fast (&session->transport.lcl_ip.ip6, ep->ip,
1419                       sizeof (ip6_address_t));
1420   session->transport.lcl_port = ep->port;
1421
1422   VDBG (0, "session %u handle %u: binding to local %s address %U port %u, "
1423         "proto %s", session->session_index, session_handle,
1424         session->transport.is_ip4 ? "IPv4" : "IPv6",
1425         format_ip46_address, &session->transport.lcl_ip,
1426         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1427         clib_net_to_host_u16 (session->transport.lcl_port),
1428         vppcom_proto_str (session->session_type));
1429   vcl_evt (VCL_EVT_BIND, session);
1430
1431   if (session->session_type == VPPCOM_PROTO_UDP)
1432     vppcom_session_listen (session_handle, 10);
1433
1434   return VPPCOM_OK;
1435 }
1436
1437 int
1438 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
1439 {
1440   vcl_worker_t *wrk = vcl_worker_get_current ();
1441   vcl_session_t *listen_session = 0;
1442   u64 listen_vpp_handle;
1443   int rv;
1444
1445   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1446   if (!listen_session || (listen_session->flags & VCL_SESSION_F_IS_VEP))
1447     return VPPCOM_EBADFD;
1448
1449   if (q_len == 0 || q_len == ~0)
1450     q_len = vcm->cfg.listen_queue_size;
1451
1452   listen_vpp_handle = listen_session->vpp_handle;
1453   if (listen_session->session_state == VCL_STATE_LISTEN)
1454     {
1455       VDBG (0, "session %u [0x%llx]: already in listen state!",
1456             listen_sh, listen_vpp_handle);
1457       return VPPCOM_OK;
1458     }
1459
1460   VDBG (0, "session %u: sending vpp listen request...", listen_sh);
1461
1462   /*
1463    * Send listen request to vpp and wait for reply
1464    */
1465   vcl_send_session_listen (wrk, listen_session);
1466   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1467                                              VCL_STATE_LISTEN,
1468                                              vcm->cfg.session_timeout);
1469
1470   if (PREDICT_FALSE (rv))
1471     {
1472       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1473       VDBG (0, "session %u [0x%llx]: listen failed! returning %d (%s)",
1474             listen_sh, listen_session->vpp_handle, rv,
1475             vppcom_retval_str (rv));
1476       return rv;
1477     }
1478
1479   return VPPCOM_OK;
1480 }
1481
1482 static int
1483 validate_args_session_accept_ (vcl_worker_t * wrk, vcl_session_t * ls)
1484 {
1485   if (ls->flags & VCL_SESSION_F_IS_VEP)
1486     {
1487       VDBG (0, "ERROR: cannot accept on epoll session %u!",
1488             ls->session_index);
1489       return VPPCOM_EBADFD;
1490     }
1491
1492   if ((ls->session_state != VCL_STATE_LISTEN)
1493       && (!vcl_session_is_connectable_listener (wrk, ls)))
1494     {
1495       VDBG (0, "ERROR: session [0x%llx]: not in listen state! state 0x%x"
1496             " (%s)", ls->vpp_handle, ls->session_state,
1497             vppcom_session_state_str (ls->session_state));
1498       return VPPCOM_EBADFD;
1499     }
1500   return VPPCOM_OK;
1501 }
1502
1503 int
1504 vppcom_unformat_proto (uint8_t * proto, char *proto_str)
1505 {
1506   if (!strcmp (proto_str, "TCP"))
1507     *proto = VPPCOM_PROTO_TCP;
1508   else if (!strcmp (proto_str, "tcp"))
1509     *proto = VPPCOM_PROTO_TCP;
1510   else if (!strcmp (proto_str, "UDP"))
1511     *proto = VPPCOM_PROTO_UDP;
1512   else if (!strcmp (proto_str, "udp"))
1513     *proto = VPPCOM_PROTO_UDP;
1514   else if (!strcmp (proto_str, "TLS"))
1515     *proto = VPPCOM_PROTO_TLS;
1516   else if (!strcmp (proto_str, "tls"))
1517     *proto = VPPCOM_PROTO_TLS;
1518   else if (!strcmp (proto_str, "QUIC"))
1519     *proto = VPPCOM_PROTO_QUIC;
1520   else if (!strcmp (proto_str, "quic"))
1521     *proto = VPPCOM_PROTO_QUIC;
1522   else
1523     return 1;
1524   return 0;
1525 }
1526
1527 int
1528 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1529                        uint32_t flags)
1530 {
1531   u32 client_session_index = ~0, listen_session_index, accept_flags = 0;
1532   vcl_worker_t *wrk = vcl_worker_get_current ();
1533   session_accepted_msg_t accepted_msg;
1534   vcl_session_t *listen_session = 0;
1535   vcl_session_t *client_session = 0;
1536   vcl_session_msg_t *evt;
1537   svm_msg_q_msg_t msg;
1538   session_event_t *e;
1539   u8 is_nonblocking;
1540   int rv;
1541
1542   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1543   if (!listen_session)
1544     return VPPCOM_EBADFD;
1545
1546   listen_session_index = listen_session->session_index;
1547   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1548     return rv;
1549
1550   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1551     {
1552       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1553       accept_flags = evt->flags;
1554       accepted_msg = evt->accepted_msg;
1555       goto handle;
1556     }
1557
1558   is_nonblocking = vcl_session_has_attr (listen_session,
1559                                          VCL_SESS_ATTR_NONBLOCK);
1560   while (1)
1561     {
1562       if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1563         return VPPCOM_EAGAIN;
1564
1565       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1566         return VPPCOM_EAGAIN;
1567
1568       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1569       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1570         {
1571           vcl_handle_mq_event (wrk, e);
1572           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1573           continue;
1574         }
1575       clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
1576       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1577       break;
1578     }
1579
1580 handle:
1581
1582   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg,
1583                                                        listen_session_index);
1584   if (client_session_index == VCL_INVALID_SESSION_INDEX)
1585     return VPPCOM_ECONNABORTED;
1586
1587   listen_session = vcl_session_get (wrk, listen_session_index);
1588   client_session = vcl_session_get (wrk, client_session_index);
1589
1590   if (flags & O_NONBLOCK)
1591     vcl_session_set_attr (client_session, VCL_SESS_ATTR_NONBLOCK);
1592
1593   VDBG (1, "listener %u [0x%llx]: Got a connect request! session %u [0x%llx],"
1594         " flags %d, is_nonblocking %u", listen_session->session_index,
1595         listen_session->vpp_handle, client_session_index,
1596         client_session->vpp_handle, flags,
1597         vcl_session_has_attr (client_session, VCL_SESS_ATTR_NONBLOCK));
1598
1599   if (ep)
1600     {
1601       ep->is_ip4 = client_session->transport.is_ip4;
1602       ep->port = client_session->transport.rmt_port;
1603       if (client_session->transport.is_ip4)
1604         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip4,
1605                           sizeof (ip4_address_t));
1606       else
1607         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip6,
1608                           sizeof (ip6_address_t));
1609     }
1610
1611   VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
1612         "local: %U:%u", listen_session_handle, listen_session->vpp_handle,
1613         client_session_index, client_session->vpp_handle,
1614         format_ip46_address, &client_session->transport.rmt_ip,
1615         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1616         clib_net_to_host_u16 (client_session->transport.rmt_port),
1617         format_ip46_address, &client_session->transport.lcl_ip,
1618         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1619         clib_net_to_host_u16 (client_session->transport.lcl_port));
1620   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1621            client_session_index);
1622
1623   /*
1624    * Session might have been closed already
1625    */
1626   if (accept_flags)
1627     {
1628       if (accept_flags & VCL_ACCEPTED_F_CLOSED)
1629         client_session->session_state = VCL_STATE_VPP_CLOSING;
1630       else if (accept_flags & VCL_ACCEPTED_F_RESET)
1631         client_session->session_state = VCL_STATE_DISCONNECT;
1632     }
1633   return vcl_session_handle (client_session);
1634 }
1635
1636 int
1637 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1638 {
1639   vcl_worker_t *wrk = vcl_worker_get_current ();
1640   vcl_session_t *session = 0;
1641   u32 session_index;
1642   int rv;
1643
1644   session = vcl_session_get_w_handle (wrk, session_handle);
1645   if (!session)
1646     return VPPCOM_EBADFD;
1647   session_index = session->session_index;
1648
1649   if (PREDICT_FALSE (session->flags & VCL_SESSION_F_IS_VEP))
1650     {
1651       VDBG (0, "ERROR: cannot connect epoll session %u!",
1652             session->session_index);
1653       return VPPCOM_EBADFD;
1654     }
1655
1656   if (PREDICT_FALSE (vcl_session_is_ready (session)))
1657     {
1658       VDBG (0, "session handle %u [0x%llx]: session already "
1659             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1660             session_handle, session->vpp_handle,
1661             session->transport.is_ip4 ? "IPv4" : "IPv6", format_ip46_address,
1662             &session->transport.rmt_ip, session->transport.is_ip4 ?
1663             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1664             clib_net_to_host_u16 (session->transport.rmt_port),
1665             vppcom_proto_str (session->session_type), session->session_state,
1666             vppcom_session_state_str (session->session_state));
1667       return VPPCOM_OK;
1668     }
1669
1670   /* Attempt to connect a connectionless listener */
1671   if (PREDICT_FALSE (session->session_state == VCL_STATE_LISTEN))
1672     {
1673       if (session->session_type != VPPCOM_PROTO_UDP)
1674         return VPPCOM_EINVAL;
1675       vcl_send_session_unlisten (wrk, session);
1676       session->session_state = VCL_STATE_CLOSED;
1677     }
1678
1679   session->transport.is_ip4 = server_ep->is_ip4;
1680   vcl_ip_copy_from_ep (&session->transport.rmt_ip, server_ep);
1681   session->transport.rmt_port = server_ep->port;
1682   session->parent_handle = VCL_INVALID_SESSION_HANDLE;
1683   session->flags |= VCL_SESSION_F_CONNECTED;
1684
1685   VDBG (0, "session handle %u (%s): connecting to peer %s %U "
1686         "port %d proto %s", session_handle,
1687         vppcom_session_state_str (session->session_state),
1688         session->transport.is_ip4 ? "IPv4" : "IPv6",
1689         format_ip46_address,
1690         &session->transport.rmt_ip, session->transport.is_ip4 ?
1691         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1692         clib_net_to_host_u16 (session->transport.rmt_port),
1693         vppcom_proto_str (session->session_type));
1694
1695   vcl_send_session_connect (wrk, session);
1696
1697   if (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK))
1698     {
1699       /* State set to STATE_UPDATED to ensure the session is not assumed
1700        * to be ready and to also allow the app to close it prior to vpp's
1701        * connected reply. */
1702       session->session_state = VCL_STATE_UPDATED;
1703       return VPPCOM_EINPROGRESS;
1704     }
1705
1706   /*
1707    * Wait for reply from vpp if blocking
1708    */
1709   rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY,
1710                                              vcm->cfg.session_timeout);
1711
1712   session = vcl_session_get (wrk, session_index);
1713   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1714         session->vpp_handle, rv ? "failed" : "succeeded");
1715
1716   return rv;
1717 }
1718
1719 int
1720 vppcom_session_stream_connect (uint32_t session_handle,
1721                                uint32_t parent_session_handle)
1722 {
1723   vcl_worker_t *wrk = vcl_worker_get_current ();
1724   vcl_session_t *session, *parent_session;
1725   u32 session_index, parent_session_index;
1726   int rv;
1727
1728   session = vcl_session_get_w_handle (wrk, session_handle);
1729   if (!session)
1730     return VPPCOM_EBADFD;
1731   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1732   if (!parent_session)
1733     return VPPCOM_EBADFD;
1734
1735   session_index = session->session_index;
1736   parent_session_index = parent_session->session_index;
1737   if (PREDICT_FALSE (session->flags & VCL_SESSION_F_IS_VEP))
1738     {
1739       VDBG (0, "ERROR: cannot connect epoll session %u!",
1740             session->session_index);
1741       return VPPCOM_EBADFD;
1742     }
1743
1744   if (PREDICT_FALSE (vcl_session_is_ready (session)))
1745     {
1746       VDBG (0, "session handle %u [0x%llx]: session already "
1747             "connected to session %u [0x%llx] proto %s, state 0x%x (%s)",
1748             session_handle, session->vpp_handle,
1749             parent_session_handle, parent_session->vpp_handle,
1750             vppcom_proto_str (session->session_type), session->session_state,
1751             vppcom_session_state_str (session->session_state));
1752       return VPPCOM_OK;
1753     }
1754
1755   /* Connect to quic session specifics */
1756   session->transport.is_ip4 = parent_session->transport.is_ip4;
1757   session->transport.rmt_ip.ip4.as_u32 = (uint32_t) 1;
1758   session->transport.rmt_port = 0;
1759   session->parent_handle = parent_session->vpp_handle;
1760
1761   VDBG (0, "session handle %u: connecting to session %u [0x%llx]",
1762         session_handle, parent_session_handle, parent_session->vpp_handle);
1763
1764   /*
1765    * Send connect request and wait for reply from vpp
1766    */
1767   vcl_send_session_connect (wrk, session);
1768   rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY,
1769                                              vcm->cfg.session_timeout);
1770
1771   session->listener_index = parent_session_index;
1772   parent_session = vcl_session_get_w_handle (wrk, parent_session_handle);
1773   if (parent_session)
1774     parent_session->n_accepted_sessions++;
1775
1776   session = vcl_session_get (wrk, session_index);
1777   VDBG (0, "session %u [0x%llx]: connect %s!", session->session_index,
1778         session->vpp_handle, rv ? "failed" : "succeeded");
1779
1780   return rv;
1781 }
1782
1783 static u8
1784 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1785 {
1786   return (e->event_type == SESSION_IO_EVT_RX && e->session_index == sid);
1787 }
1788
1789 static inline int
1790 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1791                               u8 peek)
1792 {
1793   vcl_worker_t *wrk = vcl_worker_get_current ();
1794   int rv, n_read = 0, is_nonblocking;
1795   vcl_session_t *s = 0;
1796   svm_fifo_t *rx_fifo;
1797   svm_msg_q_msg_t msg;
1798   session_event_t *e;
1799   svm_msg_q_t *mq;
1800   u8 is_ct;
1801
1802   if (PREDICT_FALSE (!buf))
1803     return VPPCOM_EINVAL;
1804
1805   s = vcl_session_get_w_handle (wrk, session_handle);
1806   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
1807     return VPPCOM_EBADFD;
1808
1809   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1810     {
1811       VDBG (0, "session %u[0x%llx] is not open! state 0x%x (%s)",
1812             s->session_index, s->vpp_handle, s->session_state,
1813             vppcom_session_state_str (s->session_state));
1814       return vcl_session_closed_error (s);
1815     }
1816
1817   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
1818   is_ct = vcl_session_is_ct (s);
1819   mq = wrk->app_event_queue;
1820   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
1821   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
1822
1823   if (svm_fifo_is_empty_cons (rx_fifo))
1824     {
1825       if (is_nonblocking)
1826         {
1827           if (vcl_session_is_closing (s))
1828             return vcl_session_closing_error (s);
1829           if (is_ct)
1830             svm_fifo_unset_event (s->rx_fifo);
1831           svm_fifo_unset_event (rx_fifo);
1832           return VPPCOM_EWOULDBLOCK;
1833         }
1834       while (svm_fifo_is_empty_cons (rx_fifo))
1835         {
1836           if (vcl_session_is_closing (s))
1837             return vcl_session_closing_error (s);
1838
1839           if (is_ct)
1840             svm_fifo_unset_event (s->rx_fifo);
1841           svm_fifo_unset_event (rx_fifo);
1842           svm_msg_q_lock (mq);
1843           if (svm_msg_q_is_empty (mq))
1844             svm_msg_q_wait (mq);
1845
1846           svm_msg_q_sub_w_lock (mq, &msg);
1847           e = svm_msg_q_msg_data (mq, &msg);
1848           svm_msg_q_unlock (mq);
1849           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1850             vcl_handle_mq_event (wrk, e);
1851           svm_msg_q_free_msg (mq, &msg);
1852         }
1853     }
1854
1855 read_again:
1856
1857   if (s->is_dgram)
1858     rv = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1859   else
1860     rv = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1861
1862   ASSERT (rv >= 0);
1863   n_read += rv;
1864
1865   if (svm_fifo_is_empty_cons (rx_fifo))
1866     {
1867       if (is_ct)
1868         svm_fifo_unset_event (s->rx_fifo);
1869       svm_fifo_unset_event (rx_fifo);
1870       if (!svm_fifo_is_empty_cons (rx_fifo)
1871           && svm_fifo_set_event (rx_fifo) && is_nonblocking)
1872         {
1873           vec_add2 (wrk->unhandled_evts_vector, e, 1);
1874           e->event_type = SESSION_IO_EVT_RX;
1875           e->session_index = s->session_index;
1876         }
1877     }
1878   else if (PREDICT_FALSE (rv < n && !s->is_dgram))
1879     {
1880       /* More data enqueued while reading. Try to drain it
1881        * or fill the buffer. Avoid doing that for dgrams */
1882       buf += rv;
1883       n -= rv;
1884       goto read_again;
1885     }
1886
1887   if (PREDICT_FALSE (svm_fifo_needs_deq_ntf (rx_fifo, n_read)))
1888     {
1889       svm_fifo_clear_deq_ntf (rx_fifo);
1890       app_send_io_evt_to_vpp (s->vpp_evt_q,
1891                               s->rx_fifo->shr->master_session_index,
1892                               SESSION_IO_EVT_RX, SVM_Q_WAIT);
1893     }
1894
1895   VDBG (2, "session %u[0x%llx]: read %d bytes from (%p)", s->session_index,
1896         s->vpp_handle, n_read, rx_fifo);
1897
1898   return n_read;
1899 }
1900
1901 int
1902 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1903 {
1904   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1905 }
1906
1907 static int
1908 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1909 {
1910   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1911 }
1912
1913 int
1914 vppcom_session_read_segments (uint32_t session_handle,
1915                               vppcom_data_segment_t * ds, uint32_t n_segments,
1916                               uint32_t max_bytes)
1917 {
1918   vcl_worker_t *wrk = vcl_worker_get_current ();
1919   int n_read = 0, is_nonblocking;
1920   vcl_session_t *s = 0;
1921   svm_fifo_t *rx_fifo;
1922   svm_msg_q_msg_t msg;
1923   session_event_t *e;
1924   svm_msg_q_t *mq;
1925   u8 is_ct;
1926
1927   s = vcl_session_get_w_handle (wrk, session_handle);
1928   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
1929     return VPPCOM_EBADFD;
1930
1931   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1932     return vcl_session_closed_error (s);
1933
1934   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
1935   is_ct = vcl_session_is_ct (s);
1936   mq = wrk->app_event_queue;
1937   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
1938   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
1939
1940   if (svm_fifo_is_empty_cons (rx_fifo))
1941     {
1942       if (is_nonblocking)
1943         {
1944           if (is_ct)
1945             svm_fifo_unset_event (s->rx_fifo);
1946           svm_fifo_unset_event (rx_fifo);
1947           return VPPCOM_EWOULDBLOCK;
1948         }
1949       while (svm_fifo_is_empty_cons (rx_fifo))
1950         {
1951           if (vcl_session_is_closing (s))
1952             return vcl_session_closing_error (s);
1953
1954           if (is_ct)
1955             svm_fifo_unset_event (s->rx_fifo);
1956           svm_fifo_unset_event (rx_fifo);
1957           svm_msg_q_lock (mq);
1958           if (svm_msg_q_is_empty (mq))
1959             svm_msg_q_wait (mq);
1960
1961           svm_msg_q_sub_w_lock (mq, &msg);
1962           e = svm_msg_q_msg_data (mq, &msg);
1963           svm_msg_q_unlock (mq);
1964           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1965             vcl_handle_mq_event (wrk, e);
1966           svm_msg_q_free_msg (mq, &msg);
1967         }
1968     }
1969
1970   n_read = svm_fifo_segments (rx_fifo, s->rx_bytes_pending,
1971                               (svm_fifo_seg_t *) ds, n_segments, max_bytes);
1972   if (n_read < 0)
1973     return VPPCOM_EAGAIN;
1974
1975   if (svm_fifo_max_dequeue_cons (rx_fifo) == n_read)
1976     {
1977       if (is_ct)
1978         svm_fifo_unset_event (s->rx_fifo);
1979       svm_fifo_unset_event (rx_fifo);
1980       if (svm_fifo_max_dequeue_cons (rx_fifo) != n_read
1981           && svm_fifo_set_event (rx_fifo)
1982           && vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK))
1983         {
1984           session_event_t *e;
1985           vec_add2 (wrk->unhandled_evts_vector, e, 1);
1986           e->event_type = SESSION_IO_EVT_RX;
1987           e->session_index = s->session_index;
1988         }
1989     }
1990
1991   s->rx_bytes_pending += n_read;
1992   return n_read;
1993 }
1994
1995 void
1996 vppcom_session_free_segments (uint32_t session_handle, uint32_t n_bytes)
1997 {
1998   vcl_worker_t *wrk = vcl_worker_get_current ();
1999   vcl_session_t *s;
2000   u8 is_ct;
2001
2002   s = vcl_session_get_w_handle (wrk, session_handle);
2003   if (PREDICT_FALSE (!s || (s->flags & VCL_SESSION_F_IS_VEP)))
2004     return;
2005
2006   is_ct = vcl_session_is_ct (s);
2007   svm_fifo_dequeue_drop (is_ct ? s->ct_rx_fifo : s->rx_fifo, n_bytes);
2008
2009   ASSERT (s->rx_bytes_pending < n_bytes);
2010   s->rx_bytes_pending -= n_bytes;
2011 }
2012
2013 static u8
2014 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
2015 {
2016   return (e->event_type == SESSION_IO_EVT_TX && e->session_index == sid);
2017 }
2018
2019 always_inline u8
2020 vcl_fifo_is_writeable (svm_fifo_t * f, u32 len, u8 is_dgram)
2021 {
2022   u32 max_enq = svm_fifo_max_enqueue_prod (f);
2023   if (is_dgram)
2024     return max_enq >= (sizeof (session_dgram_hdr_t) + len);
2025   else
2026     return max_enq > 0;
2027 }
2028
2029 always_inline int
2030 vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
2031                              size_t n, u8 is_flush, u8 is_dgram)
2032 {
2033   int n_write, is_nonblocking;
2034   session_evt_type_t et;
2035   svm_msg_q_msg_t msg;
2036   svm_fifo_t *tx_fifo;
2037   session_event_t *e;
2038   svm_msg_q_t *mq;
2039   u8 is_ct;
2040
2041   if (PREDICT_FALSE (!buf || n == 0))
2042     return VPPCOM_EINVAL;
2043
2044   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2045     {
2046       VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
2047             " session!", s->session_index, s->vpp_handle);
2048       return VPPCOM_EBADFD;
2049     }
2050
2051   if (PREDICT_FALSE (!vcl_session_is_open (s)))
2052     {
2053       VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
2054             s->session_index, s->vpp_handle, s->session_state,
2055             vppcom_session_state_str (s->session_state));
2056       return vcl_session_closed_error (s);;
2057     }
2058
2059   is_ct = vcl_session_is_ct (s);
2060   tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
2061   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
2062
2063   mq = wrk->app_event_queue;
2064   if (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2065     {
2066       if (is_nonblocking)
2067         {
2068           return VPPCOM_EWOULDBLOCK;
2069         }
2070       while (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
2071         {
2072           svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
2073           if (vcl_session_is_closing (s))
2074             return vcl_session_closing_error (s);
2075           svm_msg_q_lock (mq);
2076           if (svm_msg_q_is_empty (mq))
2077             svm_msg_q_wait (mq);
2078
2079           svm_msg_q_sub_w_lock (mq, &msg);
2080           e = svm_msg_q_msg_data (mq, &msg);
2081           svm_msg_q_unlock (mq);
2082
2083           if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
2084             vcl_handle_mq_event (wrk, e);
2085           svm_msg_q_free_msg (mq, &msg);
2086         }
2087     }
2088
2089   et = SESSION_IO_EVT_TX;
2090   if (is_flush && !is_ct)
2091     et = SESSION_IO_EVT_TX_FLUSH;
2092
2093   if (is_dgram)
2094     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
2095                                   s->vpp_evt_q, buf, n, et,
2096                                   0 /* do_evt */ , SVM_Q_WAIT);
2097   else
2098     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
2099                                    0 /* do_evt */ , SVM_Q_WAIT);
2100
2101   if (svm_fifo_set_event (s->tx_fifo))
2102     app_send_io_evt_to_vpp (
2103       s->vpp_evt_q, s->tx_fifo->shr->master_session_index, et, SVM_Q_WAIT);
2104
2105   /* The underlying fifo segment can run out of memory */
2106   if (PREDICT_FALSE (n_write < 0))
2107     return VPPCOM_EAGAIN;
2108
2109   VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
2110         s->vpp_handle, n_write);
2111
2112   return n_write;
2113 }
2114
2115 int
2116 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
2117 {
2118   vcl_worker_t *wrk = vcl_worker_get_current ();
2119   vcl_session_t *s;
2120
2121   s = vcl_session_get_w_handle (wrk, session_handle);
2122   if (PREDICT_FALSE (!s))
2123     return VPPCOM_EBADFD;
2124
2125   return vppcom_session_write_inline (wrk, s, buf, n,
2126                                       0 /* is_flush */ , s->is_dgram ? 1 : 0);
2127 }
2128
2129 int
2130 vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n)
2131 {
2132   vcl_worker_t *wrk = vcl_worker_get_current ();
2133   vcl_session_t *s;
2134
2135   s = vcl_session_get_w_handle (wrk, session_handle);
2136   if (PREDICT_FALSE (!s))
2137     return VPPCOM_EBADFD;
2138
2139   return vppcom_session_write_inline (wrk, s, buf, n,
2140                                       1 /* is_flush */ , s->is_dgram ? 1 : 0);
2141 }
2142
2143 #define vcl_fifo_rx_evt_valid_or_break(_s)                              \
2144 if (PREDICT_FALSE (!_s->rx_fifo))                                       \
2145   break;                                                                \
2146 if (PREDICT_FALSE (svm_fifo_is_empty (_s->rx_fifo)))                    \
2147   {                                                                     \
2148     if (!vcl_session_is_ct (_s))                                        \
2149       {                                                                 \
2150         svm_fifo_unset_event (_s->rx_fifo);                             \
2151         if (svm_fifo_is_empty (_s->rx_fifo))                            \
2152           break;                                                        \
2153       }                                                                 \
2154     else if (svm_fifo_is_empty (_s->ct_rx_fifo))                        \
2155       {                                                                 \
2156         svm_fifo_unset_event (_s->rx_fifo); /* rx evts on actual fifo*/ \
2157         if (svm_fifo_is_empty (_s->ct_rx_fifo))                         \
2158           break;                                                        \
2159       }                                                                 \
2160   }                                                                     \
2161
2162 static void
2163 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2164                             unsigned long n_bits, unsigned long *read_map,
2165                             unsigned long *write_map,
2166                             unsigned long *except_map, u32 * bits_set)
2167 {
2168   session_disconnected_msg_t *disconnected_msg;
2169   session_connected_msg_t *connected_msg;
2170   vcl_session_t *s;
2171   u32 sid;
2172
2173   switch (e->event_type)
2174     {
2175     case SESSION_IO_EVT_RX:
2176       sid = e->session_index;
2177       s = vcl_session_get (wrk, sid);
2178       if (!s || !vcl_session_is_open (s))
2179         break;
2180       vcl_fifo_rx_evt_valid_or_break (s);
2181       if (sid < n_bits && read_map)
2182         {
2183           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2184           *bits_set += 1;
2185         }
2186       break;
2187     case SESSION_IO_EVT_TX:
2188       sid = e->session_index;
2189       s = vcl_session_get (wrk, sid);
2190       if (!s || !vcl_session_is_open (s))
2191         break;
2192       if (sid < n_bits && write_map)
2193         {
2194           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2195           *bits_set += 1;
2196         }
2197       break;
2198     case SESSION_CTRL_EVT_ACCEPTED:
2199       if (!e->postponed)
2200         s = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
2201       else
2202         s = vcl_session_get (wrk, e->session_index);
2203       if (!s)
2204         break;
2205       sid = s->session_index;
2206       if (sid < n_bits && read_map)
2207         {
2208           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
2209           *bits_set += 1;
2210         }
2211       break;
2212     case SESSION_CTRL_EVT_CONNECTED:
2213       if (!e->postponed)
2214         {
2215           connected_msg = (session_connected_msg_t *) e->data;
2216           sid = vcl_session_connected_handler (wrk, connected_msg);
2217         }
2218       else
2219         sid = e->session_index;
2220       if (sid == VCL_INVALID_SESSION_INDEX)
2221         break;
2222       if (sid < n_bits && write_map)
2223         {
2224           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
2225           *bits_set += 1;
2226         }
2227       break;
2228     case SESSION_CTRL_EVT_DISCONNECTED:
2229       disconnected_msg = (session_disconnected_msg_t *) e->data;
2230       s = vcl_session_disconnected_handler (wrk, disconnected_msg);
2231       if (!s)
2232         break;
2233       sid = s->session_index;
2234       if (sid < n_bits && except_map)
2235         {
2236           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2237           *bits_set += 1;
2238         }
2239       break;
2240     case SESSION_CTRL_EVT_RESET:
2241       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2242       if (sid < n_bits && except_map)
2243         {
2244           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
2245           *bits_set += 1;
2246         }
2247       break;
2248     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2249       vcl_session_unlisten_reply_handler (wrk, e->data);
2250       break;
2251     case SESSION_CTRL_EVT_MIGRATED:
2252       vcl_session_migrated_handler (wrk, e->data);
2253       break;
2254     case SESSION_CTRL_EVT_CLEANUP:
2255       vcl_session_cleanup_handler (wrk, e->data);
2256       break;
2257     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2258       vcl_session_worker_update_reply_handler (wrk, e->data);
2259       break;
2260     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2261       vcl_session_req_worker_update_handler (wrk, e->data);
2262       break;
2263     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
2264       vcl_session_app_add_segment_handler (wrk, e->data);
2265       break;
2266     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
2267       vcl_session_app_del_segment_handler (wrk, e->data);
2268       break;
2269     case SESSION_CTRL_EVT_APP_WRK_RPC:
2270       vcl_worker_rpc_handler (wrk, e->data);
2271       break;
2272     default:
2273       clib_warning ("unhandled: %u", e->event_type);
2274       break;
2275     }
2276 }
2277
2278 static int
2279 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2280                       unsigned long n_bits, unsigned long *read_map,
2281                       unsigned long *write_map, unsigned long *except_map,
2282                       double time_to_wait, u32 * bits_set)
2283 {
2284   svm_msg_q_msg_t *msg;
2285   session_event_t *e;
2286   u32 i;
2287
2288   svm_msg_q_lock (mq);
2289   if (svm_msg_q_is_empty (mq))
2290     {
2291       if (*bits_set)
2292         {
2293           svm_msg_q_unlock (mq);
2294           return 0;
2295         }
2296
2297       if (!time_to_wait)
2298         {
2299           svm_msg_q_unlock (mq);
2300           return 0;
2301         }
2302       else if (time_to_wait < 0)
2303         {
2304           svm_msg_q_wait (mq);
2305         }
2306       else
2307         {
2308           if (svm_msg_q_timedwait (mq, time_to_wait))
2309             {
2310               svm_msg_q_unlock (mq);
2311               return 0;
2312             }
2313         }
2314     }
2315   vcl_mq_dequeue_batch (wrk, mq, ~0);
2316   svm_msg_q_unlock (mq);
2317
2318   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2319     {
2320       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2321       e = svm_msg_q_msg_data (mq, msg);
2322       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
2323                                   except_map, bits_set);
2324       svm_msg_q_free_msg (mq, msg);
2325     }
2326   vec_reset_length (wrk->mq_msg_vector);
2327   vcl_handle_pending_wrk_updates (wrk);
2328   return *bits_set;
2329 }
2330
2331 static int
2332 vppcom_select_condvar (vcl_worker_t * wrk, int n_bits,
2333                        vcl_si_set * read_map, vcl_si_set * write_map,
2334                        vcl_si_set * except_map, double time_to_wait,
2335                        u32 * bits_set)
2336 {
2337   double wait = 0, start = 0;
2338
2339   if (!*bits_set)
2340     {
2341       wait = time_to_wait;
2342       start = clib_time_now (&wrk->clib_time);
2343     }
2344
2345   do
2346     {
2347       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
2348                             write_map, except_map, wait, bits_set);
2349       if (*bits_set)
2350         return *bits_set;
2351       if (wait == -1)
2352         continue;
2353
2354       wait = wait - (clib_time_now (&wrk->clib_time) - start);
2355     }
2356   while (wait > 0);
2357
2358   return 0;
2359 }
2360
2361 static int
2362 vppcom_select_eventfd (vcl_worker_t * wrk, int n_bits,
2363                        vcl_si_set * read_map, vcl_si_set * write_map,
2364                        vcl_si_set * except_map, double time_to_wait,
2365                        u32 * bits_set)
2366 {
2367   vcl_mq_evt_conn_t *mqc;
2368   int __clib_unused n_read;
2369   int n_mq_evts, i;
2370   u64 buf;
2371
2372   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2373   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2374                           vec_len (wrk->mq_events), time_to_wait);
2375   for (i = 0; i < n_mq_evts; i++)
2376     {
2377       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2378       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2379       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
2380                             except_map, 0, bits_set);
2381     }
2382
2383   return (n_mq_evts > 0 ? (int) *bits_set : 0);
2384 }
2385
2386 int
2387 vppcom_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
2388                vcl_si_set * except_map, double time_to_wait)
2389 {
2390   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
2391   vcl_worker_t *wrk = vcl_worker_get_current ();
2392   vcl_session_t *session = 0;
2393   int i;
2394
2395   if (n_bits && read_map)
2396     {
2397       clib_bitmap_validate (wrk->rd_bitmap, minbits);
2398       clib_memcpy_fast (wrk->rd_bitmap, read_map,
2399                         vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2400       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2401     }
2402   if (n_bits && write_map)
2403     {
2404       clib_bitmap_validate (wrk->wr_bitmap, minbits);
2405       clib_memcpy_fast (wrk->wr_bitmap, write_map,
2406                         vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2407       memset (write_map, 0, vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2408     }
2409   if (n_bits && except_map)
2410     {
2411       clib_bitmap_validate (wrk->ex_bitmap, minbits);
2412       clib_memcpy_fast (wrk->ex_bitmap, except_map,
2413                         vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2414       memset (except_map, 0, vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2415     }
2416
2417   if (!n_bits)
2418     return 0;
2419
2420   if (!write_map)
2421     goto check_rd;
2422
2423   /* *INDENT-OFF* */
2424   clib_bitmap_foreach (sid, wrk->wr_bitmap)  {
2425     if (!(session = vcl_session_get (wrk, sid)))
2426       {
2427         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
2428         bits_set++;
2429         continue;
2430       }
2431
2432     if (vcl_session_write_ready (session))
2433       {
2434         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
2435         bits_set++;
2436       }
2437     else
2438       svm_fifo_add_want_deq_ntf (session->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
2439   }
2440
2441 check_rd:
2442   if (!read_map)
2443     goto check_mq;
2444
2445   clib_bitmap_foreach (sid, wrk->rd_bitmap)  {
2446     if (!(session = vcl_session_get (wrk, sid)))
2447       {
2448         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
2449         bits_set++;
2450         continue;
2451       }
2452
2453     if (vcl_session_read_ready (session))
2454       {
2455         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
2456         bits_set++;
2457       }
2458   }
2459   /* *INDENT-ON* */
2460
2461 check_mq:
2462
2463   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2464     {
2465       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
2466                                   read_map, write_map, except_map, &bits_set);
2467     }
2468   vec_reset_length (wrk->unhandled_evts_vector);
2469
2470   if (vcm->cfg.use_mq_eventfd)
2471     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
2472                            time_to_wait, &bits_set);
2473   else
2474     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
2475                            time_to_wait, &bits_set);
2476
2477   return (bits_set);
2478 }
2479
2480 static inline void
2481 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_handle)
2482 {
2483   vppcom_epoll_t *vep;
2484   u32 sh = vep_handle;
2485   vcl_session_t *s;
2486
2487   if (VPPCOM_DEBUG <= 2)
2488     return;
2489
2490   s = vcl_session_get_w_handle (wrk, vep_handle);
2491   if (PREDICT_FALSE (!s))
2492     {
2493       VDBG (0, "ERROR: Invalid vep_sh (%u)!", vep_handle);
2494       goto done;
2495     }
2496   if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP)))
2497     {
2498       VDBG (0, "ERROR: vep_sh (%u) is not a vep!", vep_handle);
2499       goto done;
2500     }
2501   vep = &s->vep;
2502   VDBG (0, "vep_sh (%u): Dumping epoll chain\n"
2503         "{\n"
2504         "   is_vep         = %u\n"
2505         "   is_vep_session = %u\n"
2506         "   next_sh        = 0x%x (%u)\n"
2507         "}\n", vep_handle, s->flags & VCL_SESSION_F_IS_VEP,
2508         s->flags & VCL_SESSION_F_IS_VEP_SESSION, vep->next_sh, vep->next_sh);
2509
2510   for (sh = vep->next_sh; sh != ~0; sh = vep->next_sh)
2511     {
2512       s = vcl_session_get_w_handle (wrk, sh);
2513       if (PREDICT_FALSE (!s))
2514         {
2515           VDBG (0, "ERROR: Invalid sh (%u)!", sh);
2516           goto done;
2517         }
2518       if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2519         {
2520           VDBG (0, "ERROR: sh (%u) is a vep!", vep_handle);
2521         }
2522       else if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2523         {
2524           VDBG (0, "ERROR: sh (%u) is not a vep session handle!", sh);
2525           goto done;
2526         }
2527       vep = &s->vep;
2528       if (PREDICT_FALSE (vep->vep_sh != vep_handle))
2529         VDBG (0, "ERROR: session (%u) vep_sh (%u) != vep_sh (%u)!",
2530               sh, s->vep.vep_sh, vep_handle);
2531       if (s->flags & VCL_SESSION_F_IS_VEP_SESSION)
2532         {
2533           VDBG (0, "vep_sh[%u]: sh 0x%x (%u)\n"
2534                 "{\n"
2535                 "   next_sh        = 0x%x (%u)\n"
2536                 "   prev_sh        = 0x%x (%u)\n"
2537                 "   vep_sh         = 0x%x (%u)\n"
2538                 "   ev.events      = 0x%x\n"
2539                 "   ev.data.u64    = 0x%llx\n"
2540                 "   et_mask        = 0x%x\n"
2541                 "}\n",
2542                 vep_handle, sh, sh, vep->next_sh, vep->next_sh, vep->prev_sh,
2543                 vep->prev_sh, vep->vep_sh, vep->vep_sh, vep->ev.events,
2544                 vep->ev.data.u64, vep->et_mask);
2545         }
2546     }
2547
2548 done:
2549   VDBG (0, "vep_sh (%u): Dump complete!\n", vep_handle);
2550 }
2551
2552 int
2553 vppcom_epoll_create (void)
2554 {
2555   vcl_worker_t *wrk = vcl_worker_get_current ();
2556   vcl_session_t *vep_session;
2557
2558   vep_session = vcl_session_alloc (wrk);
2559
2560   vep_session->flags |= VCL_SESSION_F_IS_VEP;
2561   vep_session->vep.vep_sh = ~0;
2562   vep_session->vep.next_sh = ~0;
2563   vep_session->vep.prev_sh = ~0;
2564   vep_session->vpp_handle = ~0;
2565
2566   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_session->session_index);
2567   VDBG (0, "Created vep_idx %u", vep_session->session_index);
2568
2569   return vcl_session_handle (vep_session);
2570 }
2571
2572 int
2573 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2574                   struct epoll_event *event)
2575 {
2576   vcl_worker_t *wrk = vcl_worker_get_current ();
2577   vcl_session_t *vep_session;
2578   int rv = VPPCOM_OK;
2579   vcl_session_t *s;
2580   svm_fifo_t *txf;
2581
2582   if (vep_handle == session_handle)
2583     {
2584       VDBG (0, "vep_sh == session handle (%u)!", vep_handle);
2585       return VPPCOM_EINVAL;
2586     }
2587
2588   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2589   if (PREDICT_FALSE (!vep_session))
2590     {
2591       VDBG (0, "Invalid vep_sh (%u)!", vep_handle);
2592       return VPPCOM_EBADFD;
2593     }
2594   if (PREDICT_FALSE (!(vep_session->flags & VCL_SESSION_F_IS_VEP)))
2595     {
2596       VDBG (0, "vep_sh (%u) is not a vep!", vep_handle);
2597       return VPPCOM_EINVAL;
2598     }
2599
2600   ASSERT (vep_session->vep.vep_sh == ~0);
2601   ASSERT (vep_session->vep.prev_sh == ~0);
2602
2603   s = vcl_session_get_w_handle (wrk, session_handle);
2604   if (PREDICT_FALSE (!s))
2605     {
2606       VDBG (0, "Invalid session_handle (%u)!", session_handle);
2607       return VPPCOM_EBADFD;
2608     }
2609   if (PREDICT_FALSE (s->flags & VCL_SESSION_F_IS_VEP))
2610     {
2611       VDBG (0, "session_handle (%u) is a vep!", vep_handle);
2612       return VPPCOM_EINVAL;
2613     }
2614
2615   switch (op)
2616     {
2617     case EPOLL_CTL_ADD:
2618       if (PREDICT_FALSE (!event))
2619         {
2620           VDBG (0, "EPOLL_CTL_ADD: NULL pointer to epoll_event structure!");
2621           return VPPCOM_EINVAL;
2622         }
2623       if (vep_session->vep.next_sh != ~0)
2624         {
2625           vcl_session_t *next_session;
2626           next_session = vcl_session_get_w_handle (wrk,
2627                                                    vep_session->vep.next_sh);
2628           if (PREDICT_FALSE (!next_session))
2629             {
2630               VDBG (0, "EPOLL_CTL_ADD: Invalid vep.next_sh (%u) on "
2631                     "vep_idx (%u)!", vep_session->vep.next_sh, vep_handle);
2632               return VPPCOM_EBADFD;
2633             }
2634           ASSERT (next_session->vep.prev_sh == vep_handle);
2635           next_session->vep.prev_sh = session_handle;
2636         }
2637       s->vep.next_sh = vep_session->vep.next_sh;
2638       s->vep.prev_sh = vep_handle;
2639       s->vep.vep_sh = vep_handle;
2640       s->vep.et_mask = VEP_DEFAULT_ET_MASK;
2641       s->vep.ev = *event;
2642       s->flags &= ~VCL_SESSION_F_IS_VEP;
2643       s->flags |= VCL_SESSION_F_IS_VEP_SESSION;
2644       vep_session->vep.next_sh = session_handle;
2645
2646       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2647       if (txf && (event->events & EPOLLOUT))
2648         svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2649
2650       /* Generate EPOLLOUT if tx fifo not full */
2651       if ((event->events & EPOLLOUT) && (vcl_session_write_ready (s) > 0))
2652         {
2653           session_event_t e = { 0 };
2654           e.event_type = SESSION_IO_EVT_TX;
2655           e.session_index = s->session_index;
2656           vec_add1 (wrk->unhandled_evts_vector, e);
2657         }
2658       /* Generate EPOLLIN if rx fifo has data */
2659       if ((event->events & EPOLLIN) && (vcl_session_read_ready (s) > 0))
2660         {
2661           session_event_t e = { 0 };
2662           e.event_type = SESSION_IO_EVT_RX;
2663           e.session_index = s->session_index;
2664           vec_add1 (wrk->unhandled_evts_vector, e);
2665         }
2666       VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2667             vep_handle, session_handle, event->events, event->data.u64);
2668       vcl_evt (VCL_EVT_EPOLL_CTLADD, s, event->events, event->data.u64);
2669       break;
2670
2671     case EPOLL_CTL_MOD:
2672       if (PREDICT_FALSE (!event))
2673         {
2674           VDBG (0, "EPOLL_CTL_MOD: NULL pointer to epoll_event structure!");
2675           rv = VPPCOM_EINVAL;
2676           goto done;
2677         }
2678       else if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2679         {
2680           VDBG (0, "sh %u EPOLL_CTL_MOD: not a vep session!", session_handle);
2681           rv = VPPCOM_EINVAL;
2682           goto done;
2683         }
2684       else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
2685         {
2686           VDBG (0, "EPOLL_CTL_MOD: sh %u vep_sh (%u) != vep_sh (%u)!",
2687                 session_handle, s->vep.vep_sh, vep_handle);
2688           rv = VPPCOM_EINVAL;
2689           goto done;
2690         }
2691
2692       /* Generate EPOLLOUT when tx_fifo/ct_tx_fifo not full */
2693       if ((event->events & EPOLLOUT) &&
2694           !(s->vep.ev.events & EPOLLOUT) && (vcl_session_write_ready (s) > 0))
2695         {
2696           session_event_t e = { 0 };
2697           e.event_type = SESSION_IO_EVT_TX;
2698           e.session_index = s->session_index;
2699           vec_add1 (wrk->unhandled_evts_vector, e);
2700         }
2701       s->vep.et_mask = VEP_DEFAULT_ET_MASK;
2702       s->vep.ev = *event;
2703       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2704       if (event->events & EPOLLOUT)
2705         svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2706       else
2707         svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2708       VDBG (1, "EPOLL_CTL_MOD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2709             vep_handle, session_handle, event->events, event->data.u64);
2710       break;
2711
2712     case EPOLL_CTL_DEL:
2713       if (PREDICT_FALSE (!(s->flags & VCL_SESSION_F_IS_VEP_SESSION)))
2714         {
2715           VDBG (0, "EPOLL_CTL_DEL: %u not a vep session!", session_handle);
2716           rv = VPPCOM_EINVAL;
2717           goto done;
2718         }
2719       else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
2720         {
2721           VDBG (0, "EPOLL_CTL_DEL: sh %u vep_sh (%u) != vep_sh (%u)!",
2722                 session_handle, s->vep.vep_sh, vep_handle);
2723           rv = VPPCOM_EINVAL;
2724           goto done;
2725         }
2726
2727       if (s->vep.prev_sh == vep_handle)
2728         vep_session->vep.next_sh = s->vep.next_sh;
2729       else
2730         {
2731           vcl_session_t *prev_session;
2732           prev_session = vcl_session_get_w_handle (wrk, s->vep.prev_sh);
2733           if (PREDICT_FALSE (!prev_session))
2734             {
2735               VDBG (0, "EPOLL_CTL_DEL: Invalid prev_sh (%u) on sh (%u)!",
2736                     s->vep.prev_sh, session_handle);
2737               return VPPCOM_EBADFD;
2738             }
2739           ASSERT (prev_session->vep.next_sh == session_handle);
2740           prev_session->vep.next_sh = s->vep.next_sh;
2741         }
2742       if (s->vep.next_sh != ~0)
2743         {
2744           vcl_session_t *next_session;
2745           next_session = vcl_session_get_w_handle (wrk, s->vep.next_sh);
2746           if (PREDICT_FALSE (!next_session))
2747             {
2748               VDBG (0, "EPOLL_CTL_DEL: Invalid next_sh (%u) on sh (%u)!",
2749                     s->vep.next_sh, session_handle);
2750               return VPPCOM_EBADFD;
2751             }
2752           ASSERT (next_session->vep.prev_sh == session_handle);
2753           next_session->vep.prev_sh = s->vep.prev_sh;
2754         }
2755
2756       memset (&s->vep, 0, sizeof (s->vep));
2757       s->vep.next_sh = ~0;
2758       s->vep.prev_sh = ~0;
2759       s->vep.vep_sh = ~0;
2760       s->flags &= ~VCL_SESSION_F_IS_VEP_SESSION;
2761
2762       txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
2763       if (txf)
2764         svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
2765
2766       VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sh %u!", vep_handle,
2767             session_handle);
2768       vcl_evt (VCL_EVT_EPOLL_CTLDEL, s, vep_sh);
2769       break;
2770
2771     default:
2772       VDBG (0, "Invalid operation (%d)!", op);
2773       rv = VPPCOM_EINVAL;
2774     }
2775
2776   vep_verify_epoll_chain (wrk, vep_handle);
2777
2778 done:
2779   return rv;
2780 }
2781
2782 static inline void
2783 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2784                                 struct epoll_event *events, u32 * num_ev)
2785 {
2786   session_disconnected_msg_t *disconnected_msg;
2787   session_connected_msg_t *connected_msg;
2788   u32 sid = ~0, session_events;
2789   u64 session_evt_data = ~0;
2790   vcl_session_t *s;
2791   u8 add_event = 0;
2792
2793   switch (e->event_type)
2794     {
2795     case SESSION_IO_EVT_RX:
2796       sid = e->session_index;
2797       s = vcl_session_get (wrk, sid);
2798       if (vcl_session_is_closed (s))
2799         break;
2800       vcl_fifo_rx_evt_valid_or_break (s);
2801       session_events = s->vep.ev.events;
2802       if (!(EPOLLIN & s->vep.ev.events)
2803           || (s->flags & VCL_SESSION_F_HAS_RX_EVT))
2804         break;
2805       add_event = 1;
2806       events[*num_ev].events |= EPOLLIN;
2807       session_evt_data = s->vep.ev.data.u64;
2808       s->flags |= VCL_SESSION_F_HAS_RX_EVT;
2809       break;
2810     case SESSION_IO_EVT_TX:
2811       sid = e->session_index;
2812       s = vcl_session_get (wrk, sid);
2813       if (vcl_session_is_closed (s))
2814         break;
2815       session_events = s->vep.ev.events;
2816       if (!(EPOLLOUT & session_events))
2817         break;
2818       add_event = 1;
2819       events[*num_ev].events |= EPOLLOUT;
2820       session_evt_data = s->vep.ev.data.u64;
2821       svm_fifo_reset_has_deq_ntf (vcl_session_is_ct (s) ?
2822                                   s->ct_tx_fifo : s->tx_fifo);
2823       break;
2824     case SESSION_CTRL_EVT_ACCEPTED:
2825       if (!e->postponed)
2826         s = vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
2827       else
2828         s = vcl_session_get (wrk, e->session_index);
2829       if (!s)
2830         break;
2831       session_events = s->vep.ev.events;
2832       sid = s->session_index;
2833       if (!(EPOLLIN & session_events))
2834         break;
2835       add_event = 1;
2836       events[*num_ev].events |= EPOLLIN;
2837       session_evt_data = s->vep.ev.data.u64;
2838       break;
2839     case SESSION_CTRL_EVT_CONNECTED:
2840       if (!e->postponed)
2841         {
2842           connected_msg = (session_connected_msg_t *) e->data;
2843           sid = vcl_session_connected_handler (wrk, connected_msg);
2844         }
2845       else
2846         sid = e->session_index;
2847       s = vcl_session_get (wrk, sid);
2848       if (vcl_session_is_closed (s))
2849         break;
2850       session_events = s->vep.ev.events;
2851       /* Generate EPOLLOUT because there's no connected event */
2852       if (!(EPOLLOUT & session_events))
2853         break;
2854       add_event = 1;
2855       events[*num_ev].events |= EPOLLOUT;
2856       session_evt_data = s->vep.ev.data.u64;
2857       if (s->session_state == VCL_STATE_DETACHED)
2858         events[*num_ev].events |= EPOLLHUP;
2859       break;
2860     case SESSION_CTRL_EVT_DISCONNECTED:
2861       disconnected_msg = (session_disconnected_msg_t *) e->data;
2862       s = vcl_session_disconnected_handler (wrk, disconnected_msg);
2863       if (vcl_session_is_closed (s))
2864         break;
2865       sid = s->session_index;
2866       session_events = s->vep.ev.events;
2867       add_event = 1;
2868       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2869       session_evt_data = s->vep.ev.data.u64;
2870       break;
2871     case SESSION_CTRL_EVT_RESET:
2872       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2873       s = vcl_session_get (wrk, sid);
2874       if (vcl_session_is_closed (s))
2875         break;
2876       session_events = s->vep.ev.events;
2877       add_event = 1;
2878       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2879       session_evt_data = s->vep.ev.data.u64;
2880       break;
2881     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2882       vcl_session_unlisten_reply_handler (wrk, e->data);
2883       break;
2884     case SESSION_CTRL_EVT_MIGRATED:
2885       vcl_session_migrated_handler (wrk, e->data);
2886       break;
2887     case SESSION_CTRL_EVT_CLEANUP:
2888       vcl_session_cleanup_handler (wrk, e->data);
2889       break;
2890     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2891       vcl_session_req_worker_update_handler (wrk, e->data);
2892       break;
2893     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2894       vcl_session_worker_update_reply_handler (wrk, e->data);
2895       break;
2896     case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
2897       vcl_session_app_add_segment_handler (wrk, e->data);
2898       break;
2899     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
2900       vcl_session_app_del_segment_handler (wrk, e->data);
2901       break;
2902     case SESSION_CTRL_EVT_APP_WRK_RPC:
2903       vcl_worker_rpc_handler (wrk, e->data);
2904       break;
2905     default:
2906       VDBG (0, "unhandled: %u", e->event_type);
2907       break;
2908     }
2909
2910   if (add_event)
2911     {
2912       events[*num_ev].data.u64 = session_evt_data;
2913       if (EPOLLONESHOT & session_events)
2914         {
2915           s = vcl_session_get (wrk, sid);
2916           s->vep.ev.events = 0;
2917         }
2918       *num_ev += 1;
2919     }
2920 }
2921
2922 static int
2923 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2924                           struct epoll_event *events, u32 maxevents,
2925                           double wait_for_time, u32 * num_ev)
2926 {
2927   svm_msg_q_msg_t *msg;
2928   session_event_t *e;
2929   int i;
2930
2931   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
2932     goto handle_dequeued;
2933
2934   svm_msg_q_lock (mq);
2935   if (svm_msg_q_is_empty (mq))
2936     {
2937       if (!wait_for_time)
2938         {
2939           svm_msg_q_unlock (mq);
2940           return 0;
2941         }
2942       else if (wait_for_time < 0)
2943         {
2944           svm_msg_q_wait (mq);
2945         }
2946       else
2947         {
2948           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2949             {
2950               svm_msg_q_unlock (mq);
2951               return 0;
2952             }
2953         }
2954     }
2955   ASSERT (maxevents > *num_ev);
2956   vcl_mq_dequeue_batch (wrk, mq, ~0);
2957   svm_msg_q_unlock (mq);
2958
2959 handle_dequeued:
2960   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2961     {
2962       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2963       e = svm_msg_q_msg_data (mq, msg);
2964       if (*num_ev < maxevents)
2965         vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
2966       else
2967         vcl_handle_mq_event (wrk, e);
2968       svm_msg_q_free_msg (mq, msg);
2969     }
2970   vec_reset_length (wrk->mq_msg_vector);
2971   vcl_handle_pending_wrk_updates (wrk);
2972   return *num_ev;
2973 }
2974
2975 static int
2976 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
2977                            int maxevents, u32 n_evts, double wait_for_time)
2978 {
2979   double wait = 0, start = 0, now;
2980
2981   if (!n_evts)
2982     {
2983       wait = wait_for_time;
2984       start = clib_time_now (&wrk->clib_time);
2985     }
2986
2987   do
2988     {
2989       vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events, maxevents,
2990                                 wait, &n_evts);
2991       if (n_evts)
2992         return n_evts;
2993       if (wait == -1)
2994         continue;
2995
2996       now = clib_time_now (&wrk->clib_time);
2997       wait -= (now - start) * 1e3;
2998       start = now;
2999     }
3000   while (wait > 0);
3001
3002   return 0;
3003 }
3004
3005 static int
3006 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
3007                            int maxevents, u32 n_evts, double wait_for_time)
3008 {
3009   vcl_mq_evt_conn_t *mqc;
3010   int __clib_unused n_read;
3011   int n_mq_evts, i;
3012   u64 buf;
3013
3014   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
3015 again:
3016   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
3017                           vec_len (wrk->mq_events), wait_for_time);
3018   for (i = 0; i < n_mq_evts; i++)
3019     {
3020       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
3021       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
3022       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
3023     }
3024   if (!n_evts && n_mq_evts > 0)
3025     goto again;
3026
3027   return (int) n_evts;
3028 }
3029
3030 int
3031 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
3032                    int maxevents, double wait_for_time)
3033 {
3034   vcl_worker_t *wrk = vcl_worker_get_current ();
3035   vcl_session_t *vep_session;
3036   u32 n_evts = 0;
3037   int i;
3038
3039   if (PREDICT_FALSE (maxevents <= 0))
3040     {
3041       VDBG (0, "ERROR: Invalid maxevents (%d)!", maxevents);
3042       return VPPCOM_EINVAL;
3043     }
3044
3045   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
3046   if (!vep_session)
3047     return VPPCOM_EBADFD;
3048
3049   if (PREDICT_FALSE (!(vep_session->flags & VCL_SESSION_F_IS_VEP)))
3050     {
3051       VDBG (0, "ERROR: vep_idx (%u) is not a vep!", vep_handle);
3052       return VPPCOM_EINVAL;
3053     }
3054
3055   memset (events, 0, sizeof (*events) * maxevents);
3056
3057   if (vec_len (wrk->unhandled_evts_vector))
3058     {
3059       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
3060         {
3061           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
3062                                           events, &n_evts);
3063           if (n_evts == maxevents)
3064             {
3065               vec_delete (wrk->unhandled_evts_vector, i + 1, 0);
3066               return n_evts;
3067             }
3068         }
3069       vec_reset_length (wrk->unhandled_evts_vector);
3070     }
3071
3072   if (vcm->cfg.use_mq_eventfd)
3073     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
3074                                       wait_for_time);
3075
3076   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
3077                                     wait_for_time);
3078 }
3079
3080 int
3081 vppcom_session_attr (uint32_t session_handle, uint32_t op,
3082                      void *buffer, uint32_t * buflen)
3083 {
3084   vcl_worker_t *wrk = vcl_worker_get_current ();
3085   vcl_session_t *session;
3086   int rv = VPPCOM_OK;
3087   u32 *flags = buffer, tmp_flags = 0;
3088   vppcom_endpt_t *ep = buffer;
3089
3090   session = vcl_session_get_w_handle (wrk, session_handle);
3091   if (!session)
3092     return VPPCOM_EBADFD;
3093
3094   switch (op)
3095     {
3096     case VPPCOM_ATTR_GET_NREAD:
3097       rv = vcl_session_read_ready (session);
3098       VDBG (2, "VPPCOM_ATTR_GET_NREAD: sh %u, nread = %d", session_handle,
3099             rv);
3100       break;
3101
3102     case VPPCOM_ATTR_GET_NWRITE:
3103       rv = vcl_session_write_ready (session);
3104       VDBG (2, "VPPCOM_ATTR_GET_NWRITE: sh %u, nwrite = %d", session_handle,
3105             rv);
3106       break;
3107
3108     case VPPCOM_ATTR_GET_FLAGS:
3109       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
3110         {
3111           *flags =
3112             O_RDWR |
3113             (vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK) ?
3114              O_NONBLOCK : 0);
3115           *buflen = sizeof (*flags);
3116           VDBG (2, "VPPCOM_ATTR_GET_FLAGS: sh %u, flags = 0x%08x, "
3117                 "is_nonblocking = %u", session_handle, *flags,
3118                 vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK));
3119         }
3120       else
3121         rv = VPPCOM_EINVAL;
3122       break;
3123
3124     case VPPCOM_ATTR_SET_FLAGS:
3125       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
3126         {
3127           if (*flags & O_NONBLOCK)
3128             vcl_session_set_attr (session, VCL_SESS_ATTR_NONBLOCK);
3129           else
3130             vcl_session_clear_attr (session, VCL_SESS_ATTR_NONBLOCK);
3131
3132           VDBG (2, "VPPCOM_ATTR_SET_FLAGS: sh %u, flags = 0x%08x,"
3133                 " is_nonblocking = %u", session_handle, *flags,
3134                 vcl_session_has_attr (session, VCL_SESS_ATTR_NONBLOCK));
3135         }
3136       else
3137         rv = VPPCOM_EINVAL;
3138       break;
3139
3140     case VPPCOM_ATTR_GET_PEER_ADDR:
3141       if (PREDICT_TRUE (buffer && buflen &&
3142                         (*buflen >= sizeof (*ep)) && ep->ip))
3143         {
3144           ep->is_ip4 = session->transport.is_ip4;
3145           ep->port = session->transport.rmt_port;
3146           if (session->transport.is_ip4)
3147             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3148                               sizeof (ip4_address_t));
3149           else
3150             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3151                               sizeof (ip6_address_t));
3152           *buflen = sizeof (*ep);
3153           VDBG (1, "VPPCOM_ATTR_GET_PEER_ADDR: sh %u, is_ip4 = %u, "
3154                 "addr = %U, port %u", session_handle, ep->is_ip4,
3155                 format_ip46_address, &session->transport.rmt_ip,
3156                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3157                 clib_net_to_host_u16 (ep->port));
3158         }
3159       else
3160         rv = VPPCOM_EINVAL;
3161       break;
3162
3163     case VPPCOM_ATTR_GET_LCL_ADDR:
3164       if (PREDICT_TRUE (buffer && buflen &&
3165                         (*buflen >= sizeof (*ep)) && ep->ip))
3166         {
3167           ep->is_ip4 = session->transport.is_ip4;
3168           ep->port = session->transport.lcl_port;
3169           if (session->transport.is_ip4)
3170             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip4,
3171                               sizeof (ip4_address_t));
3172           else
3173             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip6,
3174                               sizeof (ip6_address_t));
3175           *buflen = sizeof (*ep);
3176           VDBG (1, "VPPCOM_ATTR_GET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3177                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3178                 &session->transport.lcl_ip,
3179                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3180                 clib_net_to_host_u16 (ep->port));
3181         }
3182       else
3183         rv = VPPCOM_EINVAL;
3184       break;
3185
3186     case VPPCOM_ATTR_SET_LCL_ADDR:
3187       if (PREDICT_TRUE (buffer && buflen &&
3188                         (*buflen >= sizeof (*ep)) && ep->ip))
3189         {
3190           session->transport.is_ip4 = ep->is_ip4;
3191           session->transport.lcl_port = ep->port;
3192           vcl_ip_copy_from_ep (&session->transport.lcl_ip, ep);
3193           *buflen = sizeof (*ep);
3194           VDBG (1, "VPPCOM_ATTR_SET_LCL_ADDR: sh %u, is_ip4 = %u, addr = %U"
3195                 " port %d", session_handle, ep->is_ip4, format_ip46_address,
3196                 &session->transport.lcl_ip,
3197                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
3198                 clib_net_to_host_u16 (ep->port));
3199         }
3200       else
3201         rv = VPPCOM_EINVAL;
3202       break;
3203
3204     case VPPCOM_ATTR_GET_LIBC_EPFD:
3205       rv = session->libc_epfd;
3206       VDBG (2, "VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d", rv);
3207       break;
3208
3209     case VPPCOM_ATTR_SET_LIBC_EPFD:
3210       if (PREDICT_TRUE (buffer && buflen &&
3211                         (*buflen == sizeof (session->libc_epfd))))
3212         {
3213           session->libc_epfd = *(int *) buffer;
3214           *buflen = sizeof (session->libc_epfd);
3215
3216           VDBG (2, "VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, buflen %d",
3217                 session->libc_epfd, *buflen);
3218         }
3219       else
3220         rv = VPPCOM_EINVAL;
3221       break;
3222
3223     case VPPCOM_ATTR_GET_PROTOCOL:
3224       if (buffer && buflen && (*buflen >= sizeof (int)))
3225         {
3226           *(int *) buffer = session->session_type;
3227           *buflen = sizeof (int);
3228
3229           VDBG (2, "VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
3230                 *(int *) buffer, *(int *) buffer ? "UDP" : "TCP", *buflen);
3231         }
3232       else
3233         rv = VPPCOM_EINVAL;
3234       break;
3235
3236     case VPPCOM_ATTR_GET_LISTEN:
3237       if (buffer && buflen && (*buflen >= sizeof (int)))
3238         {
3239           *(int *) buffer = vcl_session_has_attr (session,
3240                                                   VCL_SESS_ATTR_LISTEN);
3241           *buflen = sizeof (int);
3242
3243           VDBG (2, "VPPCOM_ATTR_GET_LISTEN: %d, buflen %d", *(int *) buffer,
3244                 *buflen);
3245         }
3246       else
3247         rv = VPPCOM_EINVAL;
3248       break;
3249
3250     case VPPCOM_ATTR_GET_ERROR:
3251       if (buffer && buflen && (*buflen >= sizeof (int)))
3252         {
3253           *(int *) buffer = 0;
3254           *buflen = sizeof (int);
3255
3256           VDBG (2, "VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
3257                 *(int *) buffer, *buflen);
3258         }
3259       else
3260         rv = VPPCOM_EINVAL;
3261       break;
3262
3263     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
3264       if (buffer && buflen && (*buflen >= sizeof (u32)))
3265         {
3266
3267           /* VPP-TBD */
3268           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
3269                                 session->tx_fifo ?
3270                                 svm_fifo_size (session->tx_fifo) :
3271                                 vcm->cfg.tx_fifo_size);
3272           *buflen = sizeof (u32);
3273
3274           VDBG (2, "VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3275                 " #VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer,
3276                 *buflen);
3277         }
3278       else
3279         rv = VPPCOM_EINVAL;
3280       break;
3281
3282     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
3283       if (buffer && buflen && (*buflen == sizeof (u32)))
3284         {
3285           /* VPP-TBD */
3286           session->sndbuf_size = *(u32 *) buffer;
3287           VDBG (2, "VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), buflen %d,"
3288                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3289                 *buflen);
3290         }
3291       else
3292         rv = VPPCOM_EINVAL;
3293       break;
3294
3295     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
3296       if (buffer && buflen && (*buflen >= sizeof (u32)))
3297         {
3298
3299           /* VPP-TBD */
3300           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
3301                                 session->rx_fifo ?
3302                                 svm_fifo_size (session->rx_fifo) :
3303                                 vcm->cfg.rx_fifo_size);
3304           *buflen = sizeof (u32);
3305
3306           VDBG (2, "VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), buflen %d, "
3307                 "#VPP-TBD#", *(size_t *) buffer, *(size_t *) buffer, *buflen);
3308         }
3309       else
3310         rv = VPPCOM_EINVAL;
3311       break;
3312
3313     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
3314       if (buffer && buflen && (*buflen == sizeof (u32)))
3315         {
3316           /* VPP-TBD */
3317           session->rcvbuf_size = *(u32 *) buffer;
3318           VDBG (2, "VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), buflen %d,"
3319                 " #VPP-TBD#", session->sndbuf_size, session->sndbuf_size,
3320                 *buflen);
3321         }
3322       else
3323         rv = VPPCOM_EINVAL;
3324       break;
3325
3326     case VPPCOM_ATTR_GET_REUSEADDR:
3327       if (buffer && buflen && (*buflen >= sizeof (int)))
3328         {
3329           /* VPP-TBD */
3330           *(int *) buffer = vcl_session_has_attr (session,
3331                                                   VCL_SESS_ATTR_REUSEADDR);
3332           *buflen = sizeof (int);
3333
3334           VDBG (2, "VPPCOM_ATTR_GET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3335                 *(int *) buffer, *buflen);
3336         }
3337       else
3338         rv = VPPCOM_EINVAL;
3339       break;
3340
3341     case VPPCOM_ATTR_SET_REUSEADDR:
3342       if (buffer && buflen && (*buflen == sizeof (int)) &&
3343           !vcl_session_has_attr (session, VCL_SESS_ATTR_LISTEN))
3344         {
3345           /* VPP-TBD */
3346           if (*(int *) buffer)
3347             vcl_session_set_attr (session, VCL_SESS_ATTR_REUSEADDR);
3348           else
3349             vcl_session_clear_attr (session, VCL_SESS_ATTR_REUSEADDR);
3350
3351           VDBG (2, "VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d, #VPP-TBD#",
3352                 vcl_session_has_attr (session, VCL_SESS_ATTR_REUSEADDR),
3353                 *buflen);
3354         }
3355       else
3356         rv = VPPCOM_EINVAL;
3357       break;
3358
3359     case VPPCOM_ATTR_GET_REUSEPORT:
3360       if (buffer && buflen && (*buflen >= sizeof (int)))
3361         {
3362           /* VPP-TBD */
3363           *(int *) buffer = vcl_session_has_attr (session,
3364                                                   VCL_SESS_ATTR_REUSEPORT);
3365           *buflen = sizeof (int);
3366
3367           VDBG (2, "VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3368                 *(int *) buffer, *buflen);
3369         }
3370       else
3371         rv = VPPCOM_EINVAL;
3372       break;
3373
3374     case VPPCOM_ATTR_SET_REUSEPORT:
3375       if (buffer && buflen && (*buflen == sizeof (int)) &&
3376           !vcl_session_has_attr (session, VCL_SESS_ATTR_LISTEN))
3377         {
3378           /* VPP-TBD */
3379           if (*(int *) buffer)
3380             vcl_session_set_attr (session, VCL_SESS_ATTR_REUSEPORT);
3381           else
3382             vcl_session_clear_attr (session, VCL_SESS_ATTR_REUSEPORT);
3383
3384           VDBG (2, "VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d, #VPP-TBD#",
3385                 vcl_session_has_attr (session, VCL_SESS_ATTR_REUSEPORT),
3386                 *buflen);
3387         }
3388       else
3389         rv = VPPCOM_EINVAL;
3390       break;
3391
3392     case VPPCOM_ATTR_GET_BROADCAST:
3393       if (buffer && buflen && (*buflen >= sizeof (int)))
3394         {
3395           /* VPP-TBD */
3396           *(int *) buffer = vcl_session_has_attr (session,
3397                                                   VCL_SESS_ATTR_BROADCAST);
3398           *buflen = sizeof (int);
3399
3400           VDBG (2, "VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3401                 *(int *) buffer, *buflen);
3402         }
3403       else
3404         rv = VPPCOM_EINVAL;
3405       break;
3406
3407     case VPPCOM_ATTR_SET_BROADCAST:
3408       if (buffer && buflen && (*buflen == sizeof (int)))
3409         {
3410           /* VPP-TBD */
3411           if (*(int *) buffer)
3412             vcl_session_set_attr (session, VCL_SESS_ATTR_BROADCAST);
3413           else
3414             vcl_session_clear_attr (session, VCL_SESS_ATTR_BROADCAST);
3415
3416           VDBG (2, "VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, #VPP-TBD#",
3417                 vcl_session_has_attr (session, VCL_SESS_ATTR_BROADCAST),
3418                 *buflen);
3419         }
3420       else
3421         rv = VPPCOM_EINVAL;
3422       break;
3423
3424     case VPPCOM_ATTR_GET_V6ONLY:
3425       if (buffer && buflen && (*buflen >= sizeof (int)))
3426         {
3427           /* VPP-TBD */
3428           *(int *) buffer = vcl_session_has_attr (session,
3429                                                   VCL_SESS_ATTR_V6ONLY);
3430           *buflen = sizeof (int);
3431
3432           VDBG (2, "VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3433                 *(int *) buffer, *buflen);
3434         }
3435       else
3436         rv = VPPCOM_EINVAL;
3437       break;
3438
3439     case VPPCOM_ATTR_SET_V6ONLY:
3440       if (buffer && buflen && (*buflen == sizeof (int)))
3441         {
3442           /* VPP-TBD */
3443           if (*(int *) buffer)
3444             vcl_session_set_attr (session, VCL_SESS_ATTR_V6ONLY);
3445           else
3446             vcl_session_clear_attr (session, VCL_SESS_ATTR_V6ONLY);
3447
3448           VDBG (2, "VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, #VPP-TBD#",
3449                 vcl_session_has_attr (session, VCL_SESS_ATTR_V6ONLY),
3450                 *buflen);
3451         }
3452       else
3453         rv = VPPCOM_EINVAL;
3454       break;
3455
3456     case VPPCOM_ATTR_GET_KEEPALIVE:
3457       if (buffer && buflen && (*buflen >= sizeof (int)))
3458         {
3459           /* VPP-TBD */
3460           *(int *) buffer = vcl_session_has_attr (session,
3461                                                   VCL_SESS_ATTR_KEEPALIVE);
3462           *buflen = sizeof (int);
3463
3464           VDBG (2, "VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3465                 *(int *) buffer, *buflen);
3466         }
3467       else
3468         rv = VPPCOM_EINVAL;
3469       break;
3470
3471     case VPPCOM_ATTR_SET_KEEPALIVE:
3472       if (buffer && buflen && (*buflen == sizeof (int)))
3473         {
3474           /* VPP-TBD */
3475           if (*(int *) buffer)
3476             vcl_session_set_attr (session, VCL_SESS_ATTR_KEEPALIVE);
3477           else
3478             vcl_session_clear_attr (session, VCL_SESS_ATTR_KEEPALIVE);
3479
3480           VDBG (2, "VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, #VPP-TBD#",
3481                 vcl_session_has_attr (session, VCL_SESS_ATTR_KEEPALIVE),
3482                 *buflen);
3483         }
3484       else
3485         rv = VPPCOM_EINVAL;
3486       break;
3487
3488     case VPPCOM_ATTR_GET_TCP_NODELAY:
3489       if (buffer && buflen && (*buflen >= sizeof (int)))
3490         {
3491           /* VPP-TBD */
3492           *(int *) buffer = vcl_session_has_attr (session,
3493                                                   VCL_SESS_ATTR_TCP_NODELAY);
3494           *buflen = sizeof (int);
3495
3496           VDBG (2, "VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3497                 *(int *) buffer, *buflen);
3498         }
3499       else
3500         rv = VPPCOM_EINVAL;
3501       break;
3502
3503     case VPPCOM_ATTR_SET_TCP_NODELAY:
3504       if (buffer && buflen && (*buflen == sizeof (int)))
3505         {
3506           /* VPP-TBD */
3507           if (*(int *) buffer)
3508             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_NODELAY);
3509           else
3510             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_NODELAY);
3511
3512           VDBG (2, "VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, #VPP-TBD#",
3513                 vcl_session_has_attr (session, VCL_SESS_ATTR_TCP_NODELAY),
3514                 *buflen);
3515         }
3516       else
3517         rv = VPPCOM_EINVAL;
3518       break;
3519
3520     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3521       if (buffer && buflen && (*buflen >= sizeof (int)))
3522         {
3523           /* VPP-TBD */
3524           *(int *) buffer = vcl_session_has_attr (session,
3525                                                   VCL_SESS_ATTR_TCP_KEEPIDLE);
3526           *buflen = sizeof (int);
3527
3528           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3529                 *(int *) buffer, *buflen);
3530         }
3531       else
3532         rv = VPPCOM_EINVAL;
3533       break;
3534
3535     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3536       if (buffer && buflen && (*buflen == sizeof (int)))
3537         {
3538           /* VPP-TBD */
3539           if (*(int *) buffer)
3540             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_KEEPIDLE);
3541           else
3542             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_KEEPIDLE);
3543
3544           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, #VPP-TBD#",
3545                 vcl_session_has_attr (session,
3546                                       VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3547         }
3548       else
3549         rv = VPPCOM_EINVAL;
3550       break;
3551
3552     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3553       if (buffer && buflen && (*buflen >= sizeof (int)))
3554         {
3555           /* VPP-TBD */
3556           *(int *) buffer = vcl_session_has_attr (session,
3557                                                   VCL_SESS_ATTR_TCP_KEEPINTVL);
3558           *buflen = sizeof (int);
3559
3560           VDBG (2, "VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3561                 *(int *) buffer, *buflen);
3562         }
3563       else
3564         rv = VPPCOM_EINVAL;
3565       break;
3566
3567     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3568       if (buffer && buflen && (*buflen == sizeof (int)))
3569         {
3570           /* VPP-TBD */
3571           if (*(int *) buffer)
3572             vcl_session_set_attr (session, VCL_SESS_ATTR_TCP_KEEPINTVL);
3573           else
3574             vcl_session_clear_attr (session, VCL_SESS_ATTR_TCP_KEEPINTVL);
3575
3576           VDBG (2, "VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, #VPP-TBD#",
3577                 vcl_session_has_attr (session,
3578                                       VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3579         }
3580       else
3581         rv = VPPCOM_EINVAL;
3582       break;
3583
3584     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3585       if (buffer && buflen && (*buflen >= sizeof (u32)))
3586         {
3587           /* VPP-TBD */
3588           *(u32 *) buffer = session->user_mss;
3589           *buflen = sizeof (int);
3590
3591           VDBG (2, "VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d, #VPP-TBD#",
3592                 *(int *) buffer, *buflen);
3593         }
3594       else
3595         rv = VPPCOM_EINVAL;
3596       break;
3597
3598     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3599       if (buffer && buflen && (*buflen == sizeof (u32)))
3600         {
3601           /* VPP-TBD */
3602           session->user_mss = *(u32 *) buffer;
3603
3604           VDBG (2, "VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, #VPP-TBD#",
3605                 session->user_mss, *buflen);
3606         }
3607       else
3608         rv = VPPCOM_EINVAL;
3609       break;
3610
3611     case VPPCOM_ATTR_SET_SHUT:
3612       if (*flags == SHUT_RD || *flags == SHUT_RDWR)
3613         vcl_session_set_attr (session, VCL_SESS_ATTR_SHUT_RD);
3614       if (*flags == SHUT_WR || *flags == SHUT_RDWR)
3615         vcl_session_set_attr (session, VCL_SESS_ATTR_SHUT_WR);
3616       break;
3617
3618     case VPPCOM_ATTR_GET_SHUT:
3619       if (vcl_session_has_attr (session, VCL_SESS_ATTR_SHUT_RD))
3620         tmp_flags = 1;
3621       if (vcl_session_has_attr (session, VCL_SESS_ATTR_SHUT_WR))
3622         tmp_flags |= 2;
3623       if (tmp_flags == 1)
3624         *(int *) buffer = SHUT_RD;
3625       else if (tmp_flags == 2)
3626         *(int *) buffer = SHUT_WR;
3627       else if (tmp_flags == 3)
3628         *(int *) buffer = SHUT_RDWR;
3629       *buflen = sizeof (int);
3630       break;
3631
3632     case VPPCOM_ATTR_SET_CONNECTED:
3633       session->flags |= VCL_SESSION_F_CONNECTED;
3634       break;
3635
3636     default:
3637       rv = VPPCOM_EINVAL;
3638       break;
3639     }
3640
3641   return rv;
3642 }
3643
3644 int
3645 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3646                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3647 {
3648   vcl_worker_t *wrk = vcl_worker_get_current ();
3649   vcl_session_t *session;
3650   int rv = VPPCOM_OK;
3651
3652   if (flags == 0)
3653     rv = vppcom_session_read (session_handle, buffer, buflen);
3654   else if (flags & MSG_PEEK)
3655     rv = vppcom_session_peek (session_handle, buffer, buflen);
3656   else
3657     {
3658       VDBG (0, "Unsupport flags for recvfrom %d", flags);
3659       return VPPCOM_EAFNOSUPPORT;
3660     }
3661
3662   if (ep && rv > 0)
3663     {
3664       session = vcl_session_get_w_handle (wrk, session_handle);
3665       if (session->transport.is_ip4)
3666         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3667                           sizeof (ip4_address_t));
3668       else
3669         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3670                           sizeof (ip6_address_t));
3671       ep->is_ip4 = session->transport.is_ip4;
3672       ep->port = session->transport.rmt_port;
3673     }
3674
3675   return rv;
3676 }
3677
3678 int
3679 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3680                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3681 {
3682   vcl_worker_t *wrk = vcl_worker_get_current ();
3683   vcl_session_t *s;
3684
3685   s = vcl_session_get_w_handle (wrk, session_handle);
3686   if (!s)
3687     return VPPCOM_EBADFD;
3688
3689   if (!buffer)
3690     return VPPCOM_EINVAL;
3691
3692   if (ep)
3693     {
3694       if (!vcl_session_is_cl (s))
3695         return VPPCOM_EINVAL;
3696
3697       /* Session not connected/bound in vpp. Create it by 'connecting' it */
3698       if (PREDICT_FALSE (s->session_state == VCL_STATE_CLOSED))
3699         {
3700           u32 session_index = s->session_index;
3701           f64 timeout = vcm->cfg.session_timeout;
3702           int rv;
3703
3704           vcl_send_session_connect (wrk, s);
3705           rv = vppcom_wait_for_session_state_change (session_index,
3706                                                      VCL_STATE_READY,
3707                                                      timeout);
3708           if (rv < 0)
3709             return rv;
3710           s = vcl_session_get (wrk, session_index);
3711         }
3712
3713       s->transport.is_ip4 = ep->is_ip4;
3714       s->transport.rmt_port = ep->port;
3715       vcl_ip_copy_from_ep (&s->transport.rmt_ip, ep);
3716     }
3717
3718   if (flags)
3719     {
3720       // TBD check the flags and do the right thing
3721       VDBG (2, "handling flags 0x%u (%d) not implemented yet.", flags, flags);
3722     }
3723
3724   return (vppcom_session_write_inline (wrk, s, buffer, buflen, 1,
3725                                        s->is_dgram ? 1 : 0));
3726 }
3727
3728 int
3729 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3730 {
3731   vcl_worker_t *wrk = vcl_worker_get_current ();
3732   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3733   u32 i, keep_trying = 1;
3734   svm_msg_q_msg_t msg;
3735   session_event_t *e;
3736   int rv, num_ev = 0;
3737
3738   VDBG (3, "vp %p, nsids %u, wait_for_time %f", vp, n_sids, wait_for_time);
3739
3740   if (!vp)
3741     return VPPCOM_EFAULT;
3742
3743   do
3744     {
3745       vcl_session_t *session;
3746
3747       /* Dequeue all events and drop all unhandled io events */
3748       while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
3749         {
3750           e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
3751           vcl_handle_mq_event (wrk, e);
3752           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
3753         }
3754       vec_reset_length (wrk->unhandled_evts_vector);
3755
3756       for (i = 0; i < n_sids; i++)
3757         {
3758           session = vcl_session_get (wrk, vp[i].sh);
3759           if (!session)
3760             {
3761               vp[i].revents = POLLHUP;
3762               num_ev++;
3763               continue;
3764             }
3765
3766           vp[i].revents = 0;
3767
3768           if (POLLIN & vp[i].events)
3769             {
3770               rv = vcl_session_read_ready (session);
3771               if (rv > 0)
3772                 {
3773                   vp[i].revents |= POLLIN;
3774                   num_ev++;
3775                 }
3776               else if (rv < 0)
3777                 {
3778                   switch (rv)
3779                     {
3780                     case VPPCOM_ECONNRESET:
3781                       vp[i].revents = POLLHUP;
3782                       break;
3783
3784                     default:
3785                       vp[i].revents = POLLERR;
3786                       break;
3787                     }
3788                   num_ev++;
3789                 }
3790             }
3791
3792           if (POLLOUT & vp[i].events)
3793             {
3794               rv = vcl_session_write_ready (session);
3795               if (rv > 0)
3796                 {
3797                   vp[i].revents |= POLLOUT;
3798                   num_ev++;
3799                 }
3800               else if (rv < 0)
3801                 {
3802                   switch (rv)
3803                     {
3804                     case VPPCOM_ECONNRESET:
3805                       vp[i].revents = POLLHUP;
3806                       break;
3807
3808                     default:
3809                       vp[i].revents = POLLERR;
3810                       break;
3811                     }
3812                   num_ev++;
3813                 }
3814             }
3815
3816           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3817             {
3818               vp[i].revents = POLLNVAL;
3819               num_ev++;
3820             }
3821         }
3822       if (wait_for_time != -1)
3823         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3824     }
3825   while ((num_ev == 0) && keep_trying);
3826
3827   return num_ev;
3828 }
3829
3830 int
3831 vppcom_mq_epoll_fd (void)
3832 {
3833   vcl_worker_t *wrk = vcl_worker_get_current ();
3834   return wrk->mqs_epfd;
3835 }
3836
3837 int
3838 vppcom_session_index (vcl_session_handle_t session_handle)
3839 {
3840   return session_handle & 0xFFFFFF;
3841 }
3842
3843 int
3844 vppcom_session_worker (vcl_session_handle_t session_handle)
3845 {
3846   return session_handle >> 24;
3847 }
3848
3849 int
3850 vppcom_worker_register (void)
3851 {
3852   if (!vcl_worker_alloc_and_init ())
3853     return VPPCOM_EEXIST;
3854
3855   if (vcl_worker_register_with_vpp ())
3856     return VPPCOM_EEXIST;
3857
3858   return VPPCOM_OK;
3859 }
3860
3861 void
3862 vppcom_worker_unregister (void)
3863 {
3864   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
3865   vcl_set_worker_index (~0);
3866 }
3867
3868 void
3869 vppcom_worker_index_set (int index)
3870 {
3871   vcl_set_worker_index (index);
3872 }
3873
3874 int
3875 vppcom_worker_index (void)
3876 {
3877   return vcl_get_worker_index ();
3878 }
3879
3880 int
3881 vppcom_worker_mqs_epfd (void)
3882 {
3883   vcl_worker_t *wrk = vcl_worker_get_current ();
3884   if (!vcm->cfg.use_mq_eventfd)
3885     return -1;
3886   return wrk->mqs_epfd;
3887 }
3888
3889 int
3890 vppcom_session_is_connectable_listener (uint32_t session_handle)
3891 {
3892   vcl_session_t *session;
3893   vcl_worker_t *wrk = vcl_worker_get_current ();
3894   session = vcl_session_get_w_handle (wrk, session_handle);
3895   if (!session)
3896     return VPPCOM_EBADFD;
3897   return vcl_session_is_connectable_listener (wrk, session);
3898 }
3899
3900 int
3901 vppcom_session_listener (uint32_t session_handle)
3902 {
3903   vcl_worker_t *wrk = vcl_worker_get_current ();
3904   vcl_session_t *listen_session, *session;
3905   session = vcl_session_get_w_handle (wrk, session_handle);
3906   if (!session)
3907     return VPPCOM_EBADFD;
3908   if (session->listener_index == VCL_INVALID_SESSION_INDEX)
3909     return VPPCOM_EBADFD;
3910   listen_session = vcl_session_get_w_handle (wrk, session->listener_index);
3911   if (!listen_session)
3912     return VPPCOM_EBADFD;
3913   return vcl_session_handle (listen_session);
3914 }
3915
3916 int
3917 vppcom_session_n_accepted (uint32_t session_handle)
3918 {
3919   vcl_worker_t *wrk = vcl_worker_get_current ();
3920   vcl_session_t *session = vcl_session_get_w_handle (wrk, session_handle);
3921   if (!session)
3922     return VPPCOM_EBADFD;
3923   return session->n_accepted_sessions;
3924 }
3925
3926 const char *
3927 vppcom_proto_str (vppcom_proto_t proto)
3928 {
3929   char const *proto_str;
3930
3931   switch (proto)
3932     {
3933     case VPPCOM_PROTO_TCP:
3934       proto_str = "TCP";
3935       break;
3936     case VPPCOM_PROTO_UDP:
3937       proto_str = "UDP";
3938       break;
3939     case VPPCOM_PROTO_TLS:
3940       proto_str = "TLS";
3941       break;
3942     case VPPCOM_PROTO_QUIC:
3943       proto_str = "QUIC";
3944       break;
3945     default:
3946       proto_str = "UNKNOWN";
3947       break;
3948     }
3949   return proto_str;
3950 }
3951
3952 const char *
3953 vppcom_retval_str (int retval)
3954 {
3955   char const *st;
3956
3957   switch (retval)
3958     {
3959     case VPPCOM_OK:
3960       st = "VPPCOM_OK";
3961       break;
3962
3963     case VPPCOM_EAGAIN:
3964       st = "VPPCOM_EAGAIN";
3965       break;
3966
3967     case VPPCOM_EFAULT:
3968       st = "VPPCOM_EFAULT";
3969       break;
3970
3971     case VPPCOM_ENOMEM:
3972       st = "VPPCOM_ENOMEM";
3973       break;
3974
3975     case VPPCOM_EINVAL:
3976       st = "VPPCOM_EINVAL";
3977       break;
3978
3979     case VPPCOM_EBADFD:
3980       st = "VPPCOM_EBADFD";
3981       break;
3982
3983     case VPPCOM_EAFNOSUPPORT:
3984       st = "VPPCOM_EAFNOSUPPORT";
3985       break;
3986
3987     case VPPCOM_ECONNABORTED:
3988       st = "VPPCOM_ECONNABORTED";
3989       break;
3990
3991     case VPPCOM_ECONNRESET:
3992       st = "VPPCOM_ECONNRESET";
3993       break;
3994
3995     case VPPCOM_ENOTCONN:
3996       st = "VPPCOM_ENOTCONN";
3997       break;
3998
3999     case VPPCOM_ECONNREFUSED:
4000       st = "VPPCOM_ECONNREFUSED";
4001       break;
4002
4003     case VPPCOM_ETIMEDOUT:
4004       st = "VPPCOM_ETIMEDOUT";
4005       break;
4006
4007     default:
4008       st = "UNKNOWN_STATE";
4009       break;
4010     }
4011
4012   return st;
4013 }
4014
4015 /*
4016  * fd.io coding-style-patch-verification: ON
4017  *
4018  * Local Variables:
4019  * eval: (c-set-style "gnu")
4020  * End:
4021  */