session/svm: add want_tx_event flag to fifo
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <svm/svm_fifo_segment.h>
19 #include <vcl/vppcom.h>
20 #include <vcl/vcl_event.h>
21 #include <vcl/vcl_debug.h>
22 #include <vcl/vcl_private.h>
23
24 __thread uword __vcl_worker_index = ~0;
25
26 static u8 not_ready;
27
28 void
29 sigsegv_signal (int signum)
30 {
31   not_ready = 1;
32 }
33
34 static void
35 vcl_wait_for_memory (void *mem)
36 {
37   u8 __clib_unused test;
38   if (vcm->mounting_segment)
39     {
40       while (vcm->mounting_segment)
41         ;
42       return;
43     }
44   if (1 || vcm->debug)
45     {
46       usleep (1e5);
47       return;
48     }
49   if (signal (SIGSEGV, sigsegv_signal))
50     {
51       perror ("signal()");
52       return;
53     }
54   not_ready = 0;
55
56 again:
57   test = *(u8 *) mem;
58   if (not_ready)
59     {
60       not_ready = 0;
61       usleep (1);
62       goto again;
63     }
64
65   signal (SIGSEGV, SIG_DFL);
66 }
67
68 const char *
69 vppcom_session_state_str (session_state_t state)
70 {
71   char *st;
72
73   switch (state)
74     {
75     case STATE_START:
76       st = "STATE_START";
77       break;
78
79     case STATE_CONNECT:
80       st = "STATE_CONNECT";
81       break;
82
83     case STATE_LISTEN:
84       st = "STATE_LISTEN";
85       break;
86
87     case STATE_ACCEPT:
88       st = "STATE_ACCEPT";
89       break;
90
91     case STATE_CLOSE_ON_EMPTY:
92       st = "STATE_CLOSE_ON_EMPTY";
93       break;
94
95     case STATE_DISCONNECT:
96       st = "STATE_DISCONNECT";
97       break;
98
99     case STATE_FAILED:
100       st = "STATE_FAILED";
101       break;
102
103     default:
104       st = "UNKNOWN_STATE";
105       break;
106     }
107
108   return st;
109 }
110
111 u8 *
112 format_ip4_address (u8 * s, va_list * args)
113 {
114   u8 *a = va_arg (*args, u8 *);
115   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
116 }
117
118 u8 *
119 format_ip6_address (u8 * s, va_list * args)
120 {
121   ip6_address_t *a = va_arg (*args, ip6_address_t *);
122   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
123
124   i_max_n_zero = ARRAY_LEN (a->as_u16);
125   max_n_zeros = 0;
126   i_first_zero = i_max_n_zero;
127   n_zeros = 0;
128   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
129     {
130       u32 is_zero = a->as_u16[i] == 0;
131       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
132         {
133           i_first_zero = i;
134           n_zeros = 0;
135         }
136       n_zeros += is_zero;
137       if ((!is_zero && n_zeros > max_n_zeros)
138           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
139         {
140           i_max_n_zero = i_first_zero;
141           max_n_zeros = n_zeros;
142           i_first_zero = ARRAY_LEN (a->as_u16);
143           n_zeros = 0;
144         }
145     }
146
147   last_double_colon = 0;
148   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
149     {
150       if (i == i_max_n_zero && max_n_zeros > 1)
151         {
152           s = format (s, "::");
153           i += max_n_zeros - 1;
154           last_double_colon = 1;
155         }
156       else
157         {
158           s = format (s, "%s%x",
159                       (last_double_colon || i == 0) ? "" : ":",
160                       clib_net_to_host_u16 (a->as_u16[i]));
161           last_double_colon = 0;
162         }
163     }
164
165   return s;
166 }
167
168 /* Format an IP46 address. */
169 u8 *
170 format_ip46_address (u8 * s, va_list * args)
171 {
172   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
173   ip46_type_t type = va_arg (*args, ip46_type_t);
174   int is_ip4 = 1;
175
176   switch (type)
177     {
178     case IP46_TYPE_ANY:
179       is_ip4 = ip46_address_is_ip4 (ip46);
180       break;
181     case IP46_TYPE_IP4:
182       is_ip4 = 1;
183       break;
184     case IP46_TYPE_IP6:
185       is_ip4 = 0;
186       break;
187     }
188
189   return is_ip4 ?
190     format (s, "%U", format_ip4_address, &ip46->ip4) :
191     format (s, "%U", format_ip6_address, &ip46->ip6);
192 }
193
194 /*
195  * VPPCOM Utility Functions
196  */
197
198
199 static svm_msg_q_t *
200 vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
201 {
202   if (vcl_session_is_ct (s))
203     return wrk->vpp_event_queues[0];
204   else
205     return wrk->vpp_event_queues[s->tx_fifo->master_thread_index];
206 }
207
208 static void
209 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
210                                  session_handle_t handle, int retval)
211 {
212   app_session_evt_t _app_evt, *app_evt = &_app_evt;
213   session_accepted_reply_msg_t *rmp;
214   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
215   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
216   rmp->handle = handle;
217   rmp->context = context;
218   rmp->retval = retval;
219   app_send_ctrl_evt_to_vpp (mq, app_evt);
220 }
221
222 static void
223 vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
224                                      session_handle_t handle, int retval)
225 {
226   app_session_evt_t _app_evt, *app_evt = &_app_evt;
227   session_disconnected_reply_msg_t *rmp;
228   app_alloc_ctrl_evt_to_vpp (mq, app_evt,
229                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
230   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
231   rmp->handle = handle;
232   rmp->context = context;
233   rmp->retval = retval;
234   app_send_ctrl_evt_to_vpp (mq, app_evt);
235 }
236
237 static void
238 vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
239                               session_handle_t handle, int retval)
240 {
241   app_session_evt_t _app_evt, *app_evt = &_app_evt;
242   session_reset_reply_msg_t *rmp;
243   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_RESET_REPLY);
244   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
245   rmp->handle = handle;
246   rmp->context = context;
247   rmp->retval = retval;
248   app_send_ctrl_evt_to_vpp (mq, app_evt);
249 }
250
251 static u32
252 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
253 {
254   vcl_session_t *session, *listen_session;
255   svm_fifo_t *rx_fifo, *tx_fifo;
256   u32 vpp_wrk_index;
257   svm_msg_q_t *evt_q;
258
259   session = vcl_session_alloc (wrk);
260
261   listen_session = vcl_session_table_lookup_listener (wrk,
262                                                       mp->listener_handle);
263   if (!listen_session)
264     {
265       svm_msg_q_t *evt_q;
266       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
267       clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
268                     "unknown vpp listener handle %llx",
269                     getpid (), mp->listener_handle);
270       vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
271                                        VNET_API_ERROR_INVALID_ARGUMENT);
272       vcl_session_free (wrk, session);
273       return VCL_INVALID_SESSION_INDEX;
274     }
275
276   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
277   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
278
279   if (mp->server_event_queue_address)
280     {
281       session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address,
282                                              svm_msg_q_t *);
283       session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
284                                              svm_msg_q_t *);
285       vcl_wait_for_memory (session->vpp_evt_q);
286       rx_fifo->master_session_index = session->session_index;
287       tx_fifo->master_session_index = session->session_index;
288       rx_fifo->master_thread_index = vcl_get_worker_index ();
289       tx_fifo->master_thread_index = vcl_get_worker_index ();
290       vec_validate (wrk->vpp_event_queues, 0);
291       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
292       wrk->vpp_event_queues[0] = evt_q;
293     }
294   else
295     {
296       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
297                                              svm_msg_q_t *);
298       rx_fifo->client_session_index = session->session_index;
299       tx_fifo->client_session_index = session->session_index;
300       rx_fifo->client_thread_index = vcl_get_worker_index ();
301       tx_fifo->client_thread_index = vcl_get_worker_index ();
302       vpp_wrk_index = tx_fifo->master_thread_index;
303       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
304       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
305     }
306
307   session->vpp_handle = mp->handle;
308   session->client_context = mp->context;
309   session->rx_fifo = rx_fifo;
310   session->tx_fifo = tx_fifo;
311
312   session->session_state = STATE_ACCEPT;
313   session->transport.rmt_port = mp->port;
314   session->transport.is_ip4 = mp->is_ip4;
315   clib_memcpy (&session->transport.rmt_ip, mp->ip, sizeof (ip46_address_t));
316
317   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
318   session->transport.lcl_port = listen_session->transport.lcl_port;
319   session->transport.lcl_ip = listen_session->transport.lcl_ip;
320   session->session_type = listen_session->session_type;
321   session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
322
323   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
324         " address %U port %d queue %p!", getpid (), mp->handle,
325         session->session_index,
326         mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip,
327         mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
328         clib_net_to_host_u16 (mp->port), session->vpp_evt_q);
329   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
330
331   return session->session_index;
332 }
333
334 static u32
335 vcl_session_connected_handler (vcl_worker_t * wrk,
336                                session_connected_msg_t * mp)
337 {
338   u32 session_index, vpp_wrk_index;
339   svm_fifo_t *rx_fifo, *tx_fifo;
340   vcl_session_t *session = 0;
341   svm_msg_q_t *evt_q;
342
343   session_index = mp->context;
344   session = vcl_session_get (wrk, session_index);
345   if (!session)
346     {
347       clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
348                     "Invalid session index (%u)!",
349                     getpid (), mp->handle, session_index);
350       return VCL_INVALID_SESSION_INDEX;
351     }
352   if (mp->retval)
353     {
354       clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
355                     session_index, format_api_error, ntohl (mp->retval));
356       session->session_state = STATE_FAILED;
357       session->vpp_handle = mp->handle;
358       return session_index;
359     }
360
361   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
362   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
363   vcl_wait_for_memory (rx_fifo);
364   rx_fifo->client_session_index = session_index;
365   tx_fifo->client_session_index = session_index;
366   rx_fifo->client_thread_index = vcl_get_worker_index ();
367   tx_fifo->client_thread_index = vcl_get_worker_index ();
368
369   if (mp->client_event_queue_address)
370     {
371       session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
372                                              svm_msg_q_t *);
373       session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
374                                              svm_msg_q_t *);
375
376       vec_validate (wrk->vpp_event_queues, 0);
377       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
378       wrk->vpp_event_queues[0] = evt_q;
379     }
380   else
381     {
382       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
383                                              svm_msg_q_t *);
384       vpp_wrk_index = tx_fifo->master_thread_index;
385       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
386       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
387     }
388
389   session->rx_fifo = rx_fifo;
390   session->tx_fifo = tx_fifo;
391   session->vpp_handle = mp->handle;
392   session->transport.is_ip4 = mp->is_ip4;
393   clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
394                sizeof (session->transport.lcl_ip));
395   session->transport.lcl_port = mp->lcl_port;
396   session->session_state = STATE_CONNECT;
397
398   /* Add it to lookup table */
399   hash_set (wrk->session_index_by_vpp_handles, mp->handle, session_index);
400
401   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: connect succeeded! "
402         "session_rx_fifo %p, refcnt %d, session_tx_fifo %p, refcnt %d",
403         getpid (), mp->handle, session_index, session->rx_fifo,
404         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
405
406   return session_index;
407 }
408
409 static u32
410 vcl_session_reset_handler (vcl_worker_t * wrk,
411                            session_reset_msg_t * reset_msg)
412 {
413   vcl_session_t *session;
414   u32 sid;
415
416   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
417   session = vcl_session_get (wrk, sid);
418   if (!session)
419     {
420       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
421       return VCL_INVALID_SESSION_INDEX;
422     }
423   session->session_state = STATE_CLOSE_ON_EMPTY;
424   VDBG (0, "reset handle 0x%llx, sid %u ", reset_msg->handle, sid);
425   vcl_send_session_reset_reply (vcl_session_vpp_evt_q (wrk, session),
426                                 vcm->my_client_index, reset_msg->handle, 0);
427   return sid;
428 }
429
430 static u32
431 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
432 {
433   vcl_session_t *session;
434   u32 sid = mp->context;
435
436   session = vcl_session_get (wrk, sid);
437   if (mp->retval)
438     {
439       VDBG (0, "VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: bind failed: %U",
440             getpid (), mp->handle, sid, format_api_error, ntohl (mp->retval));
441       if (session)
442         {
443           session->session_state = STATE_FAILED;
444           session->vpp_handle = mp->handle;
445           return sid;
446         }
447       else
448         {
449           clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
450                         "Invalid session index (%u)!",
451                         getpid (), mp->handle, sid);
452           return VCL_INVALID_SESSION_INDEX;
453         }
454     }
455
456   session->vpp_handle = mp->handle;
457   session->transport.is_ip4 = mp->lcl_is_ip4;
458   clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
459                sizeof (ip46_address_t));
460   session->transport.lcl_port = mp->lcl_port;
461   vcl_session_table_add_listener (wrk, mp->handle, sid);
462   session->session_state = STATE_LISTEN;
463
464   if (session->is_dgram)
465     {
466       svm_fifo_t *rx_fifo, *tx_fifo;
467       session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
468       rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
469       rx_fifo->client_session_index = sid;
470       tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
471       tx_fifo->client_session_index = sid;
472       session->rx_fifo = rx_fifo;
473       session->tx_fifo = tx_fifo;
474     }
475
476   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: bind succeeded!",
477         getpid (), mp->handle, sid);
478   return sid;
479 }
480
481 static int
482 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
483 {
484   session_accepted_msg_t *accepted_msg;
485   session_disconnected_msg_t *disconnected_msg;
486   vcl_session_msg_t *vcl_msg;
487   vcl_session_t *session;
488   u64 handle;
489   u32 sid;
490
491   switch (e->event_type)
492     {
493     case FIFO_EVENT_APP_RX:
494     case FIFO_EVENT_APP_TX:
495     case SESSION_IO_EVT_CT_RX:
496     case SESSION_IO_EVT_CT_TX:
497       vec_add1 (wrk->unhandled_evts_vector, *e);
498       break;
499     case SESSION_CTRL_EVT_ACCEPTED:
500       accepted_msg = (session_accepted_msg_t *) e->data;
501       handle = accepted_msg->listener_handle;
502       session = vcl_session_table_lookup_listener (wrk, handle);
503       if (!session)
504         {
505           clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
506                         "listener handle %llx", getpid (), handle);
507           break;
508         }
509
510       clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
511       vcl_msg->accepted_msg = *accepted_msg;
512       break;
513     case SESSION_CTRL_EVT_CONNECTED:
514       vcl_session_connected_handler (wrk,
515                                      (session_connected_msg_t *) e->data);
516       break;
517     case SESSION_CTRL_EVT_DISCONNECTED:
518       disconnected_msg = (session_disconnected_msg_t *) e->data;
519       sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
520       session = vcl_session_get (wrk, sid);
521       if (!session)
522         {
523           VDBG (0, "request to disconnect unknown handle 0x%llx",
524                 disconnected_msg->handle);
525           break;
526         }
527       session->session_state = STATE_DISCONNECT;
528       VDBG (0, "disconnected handle 0x%llx, sid %u", disconnected_msg->handle,
529             sid);
530       break;
531     case SESSION_CTRL_EVT_RESET:
532       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
533       break;
534     case SESSION_CTRL_EVT_BOUND:
535       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
536       break;
537     default:
538       clib_warning ("unhandled %u", e->event_type);
539     }
540   return VPPCOM_OK;
541 }
542
543 static inline int
544 vppcom_wait_for_session_state_change (u32 session_index,
545                                       session_state_t state,
546                                       f64 wait_for_time)
547 {
548   vcl_worker_t *wrk = vcl_worker_get_current ();
549   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
550   vcl_session_t *volatile session;
551   svm_msg_q_msg_t msg;
552   session_event_t *e;
553
554   do
555     {
556       session = vcl_session_get (wrk, session_index);
557       if (PREDICT_FALSE (!session))
558         {
559           return VPPCOM_EBADFD;
560         }
561       if (session->session_state & state)
562         {
563           return VPPCOM_OK;
564         }
565       if (session->session_state & STATE_FAILED)
566         {
567           return VPPCOM_ECONNREFUSED;
568         }
569
570       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
571         continue;
572       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
573       vcl_handle_mq_event (wrk, e);
574       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
575     }
576   while (clib_time_now (&wrk->clib_time) < timeout);
577
578   VDBG (0, "VCL<%d>: timeout waiting for state 0x%x (%s)", getpid (), state,
579         vppcom_session_state_str (state));
580   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
581
582   return VPPCOM_ETIMEDOUT;
583 }
584
585 static int
586 vppcom_app_session_enable (void)
587 {
588   int rv;
589
590   if (vcm->app_state != STATE_APP_ENABLED)
591     {
592       vppcom_send_session_enable_disable (1 /* is_enabled == TRUE */ );
593       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
594       if (PREDICT_FALSE (rv))
595         {
596           VDBG (0, "VCL<%d>: application session enable timed out! "
597                 "returning %d (%s)", getpid (), rv, vppcom_retval_str (rv));
598           return rv;
599         }
600     }
601   return VPPCOM_OK;
602 }
603
604 static int
605 vppcom_app_attach (void)
606 {
607   int rv;
608
609   vppcom_app_send_attach ();
610   rv = vcl_wait_for_app_state_change (STATE_APP_ATTACHED);
611   if (PREDICT_FALSE (rv))
612     {
613       VDBG (0, "VCL<%d>: application attach timed out! returning %d (%s)",
614             getpid (), rv, vppcom_retval_str (rv));
615       return rv;
616     }
617
618   return VPPCOM_OK;
619 }
620
621 static int
622 vppcom_session_unbind (u32 session_handle)
623 {
624   vcl_worker_t *wrk = vcl_worker_get_current ();
625   vcl_session_t *session = 0;
626   u64 vpp_handle;
627
628   session = vcl_session_get_w_handle (wrk, session_handle);
629   if (!session)
630     return VPPCOM_EBADFD;
631
632   vpp_handle = session->vpp_handle;
633   vcl_session_table_del_listener (wrk, vpp_handle);
634   session->vpp_handle = ~0;
635   session->session_state = STATE_DISCONNECT;
636
637   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
638         " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
639         vppcom_session_state_str (STATE_DISCONNECT));
640   vcl_evt (VCL_EVT_UNBIND, session);
641   vppcom_send_unbind_sock (vpp_handle);
642
643   return VPPCOM_OK;
644 }
645
646 static int
647 vppcom_session_disconnect (u32 session_handle)
648 {
649   vcl_worker_t *wrk = vcl_worker_get_current ();
650   svm_msg_q_t *vpp_evt_q;
651   vcl_session_t *session;
652   session_state_t state;
653   u64 vpp_handle;
654
655   session = vcl_session_get_w_handle (wrk, session_handle);
656   if (!session)
657     return VPPCOM_EBADFD;
658
659   vpp_handle = session->vpp_handle;
660   state = session->session_state;
661
662   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u state 0x%x (%s)", getpid (),
663         vpp_handle, session_handle, state, vppcom_session_state_str (state));
664
665   if (PREDICT_FALSE (state & STATE_LISTEN))
666     {
667       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
668                     "Cannot disconnect a listen socket!",
669                     getpid (), vpp_handle, session_handle);
670       return VPPCOM_EBADFD;
671     }
672
673   if (state & STATE_CLOSE_ON_EMPTY)
674     {
675       vpp_evt_q = vcl_session_vpp_evt_q (wrk, session);
676       vcl_send_session_disconnected_reply (vpp_evt_q, vcm->my_client_index,
677                                            vpp_handle, 0);
678       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
679             "REPLY...", getpid (), vpp_handle, session_handle);
680     }
681   else
682     {
683       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...",
684             getpid (), vpp_handle, session_handle);
685       vppcom_send_disconnect_session (vpp_handle);
686     }
687
688   return VPPCOM_OK;
689 }
690
691 /*
692  * VPPCOM Public API functions
693  */
694 int
695 vppcom_app_create (char *app_name)
696 {
697   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
698   int rv;
699
700   if (!vcm->is_init)
701     {
702       vcm->is_init = 1;
703       vppcom_cfg (&vcm->cfg);
704       vcl_cfg = &vcm->cfg;
705
706       vcm->main_cpu = pthread_self ();
707       vppcom_init_error_string_table ();
708       svm_fifo_segment_main_init (vcl_cfg->segment_baseva,
709                                   20 /* timeout in secs */ );
710       pool_init_fixed (vcm->workers, vcl_cfg->max_workers);
711       clib_spinlock_init (&vcm->workers_lock);
712       vcl_worker_alloc_and_init ();
713     }
714
715   if (vcm->my_client_index == ~0)
716     {
717       /* API hookup and connect to VPP */
718       vppcom_api_hookup ();
719       vcl_elog_init (vcm);
720       vcm->app_state = STATE_APP_START;
721       rv = vppcom_connect_to_vpp (app_name);
722       if (rv)
723         {
724           clib_warning ("VCL<%d>: ERROR: couldn't connect to VPP!",
725                         getpid ());
726           return rv;
727         }
728
729       VDBG (0, "VCL<%d>: sending session enable", getpid ());
730       rv = vppcom_app_session_enable ();
731       if (rv)
732         {
733           clib_warning ("VCL<%d>: ERROR: vppcom_app_session_enable() "
734                         "failed!", getpid ());
735           return rv;
736         }
737
738       VDBG (0, "VCL<%d>: sending app attach", getpid ());
739       rv = vppcom_app_attach ();
740       if (rv)
741         {
742           clib_warning ("VCL<%d>: ERROR: vppcom_app_attach() failed!",
743                         getpid ());
744           return rv;
745         }
746
747       VDBG (0, "VCL<%d>: app_name '%s', my_client_index %d (0x%x)",
748             getpid (), app_name, vcm->my_client_index, vcm->my_client_index);
749     }
750
751   return VPPCOM_OK;
752 }
753
754 void
755 vppcom_app_destroy (void)
756 {
757   int rv;
758   f64 orig_app_timeout;
759
760   if (vcm->my_client_index == ~0)
761     return;
762
763   VDBG (0, "VCL<%d>: detaching from VPP, my_client_index %d (0x%x)",
764         getpid (), vcm->my_client_index, vcm->my_client_index);
765   vcl_evt (VCL_EVT_DETACH, vcm);
766
767   vppcom_app_send_detach ();
768   orig_app_timeout = vcm->cfg.app_timeout;
769   vcm->cfg.app_timeout = 2.0;
770   rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
771   vcm->cfg.app_timeout = orig_app_timeout;
772   if (PREDICT_FALSE (rv))
773     VDBG (0, "VCL<%d>: application detach timed out! returning %d (%s)",
774           getpid (), rv, vppcom_retval_str (rv));
775
776   vcl_elog_stop (vcm);
777   vl_client_disconnect_from_vlib ();
778   vcm->my_client_index = ~0;
779   vcm->app_state = STATE_APP_START;
780 }
781
782 int
783 vppcom_session_create (u8 proto, u8 is_nonblocking)
784 {
785   vcl_worker_t *wrk = vcl_worker_get_current ();
786   vcl_session_t *session;
787
788   session = vcl_session_alloc (wrk);
789
790   session->session_type = proto;
791   session->session_state = STATE_START;
792   session->vpp_handle = ~0;
793   session->is_dgram = proto == VPPCOM_PROTO_UDP;
794
795   if (is_nonblocking)
796     VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
797
798   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
799            is_nonblocking, session_index);
800
801   VDBG (0, "VCL<%d>: sid %u", getpid (), session->session_index);
802
803   return vcl_session_handle (session);
804 }
805
806 int
807 vppcom_session_close (uint32_t session_handle)
808 {
809   vcl_worker_t *wrk = vcl_worker_get_current ();
810   vcl_session_t *session = 0;
811   u8 is_vep, is_vep_session;
812   session_state_t state;
813   u32 next_sh, vep_sh;
814   int rv = VPPCOM_OK;
815   u64 vpp_handle;
816
817   session = vcl_session_get_w_handle (wrk, session_handle);
818   if (!session)
819     return VPPCOM_EBADFD;
820
821   is_vep = session->is_vep;
822   is_vep_session = session->is_vep_session;
823   next_sh = session->vep.next_sh;
824   vep_sh = session->vep.vep_sh;
825   state = session->session_state;
826   vpp_handle = session->vpp_handle;
827
828   if (VPPCOM_DEBUG > 0)
829     {
830       if (is_vep)
831         clib_warning ("VCL<%d>: vep_idx %u / sid %u: "
832                       "closing epoll session...",
833                       getpid (), session_handle, session_handle);
834       else
835         clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %d: "
836                       "closing session...",
837                       getpid (), vpp_handle, session_handle);
838     }
839
840   if (is_vep)
841     {
842       while (next_sh != ~0)
843         {
844           rv = vppcom_epoll_ctl (session_handle, EPOLL_CTL_DEL, next_sh, 0);
845           if (PREDICT_FALSE (rv < 0))
846             VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL "
847                   "vep_idx %u failed! rv %d (%s)",
848                   getpid (), vpp_handle, next_sh, vep_sh,
849                   rv, vppcom_retval_str (rv));
850
851           next_sh = session->vep.next_sh;
852         }
853     }
854   else
855     {
856       if (is_vep_session)
857         {
858           rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, session_handle, 0);
859           if (rv < 0)
860             VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL "
861                   "vep_idx %u failed! rv %d (%s)",
862                   getpid (), vpp_handle, session_handle,
863                   vep_sh, rv, vppcom_retval_str (rv));
864         }
865
866       if (state & STATE_LISTEN)
867         {
868           rv = vppcom_session_unbind (session_handle);
869           if (PREDICT_FALSE (rv < 0))
870             VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: listener unbind "
871                   "failed! rv %d (%s)",
872                   getpid (), vpp_handle, session_handle,
873                   rv, vppcom_retval_str (rv));
874         }
875       else if (state & STATE_OPEN)
876         {
877           rv = vppcom_session_disconnect (session_handle);
878           if (PREDICT_FALSE (rv < 0))
879             clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
880                           "session disconnect failed! rv %d (%s)",
881                           getpid (), vpp_handle, session_handle,
882                           rv, vppcom_retval_str (rv));
883         }
884     }
885
886   if (vcl_session_is_ct (session))
887     {
888       vcl_cut_through_registration_t *ctr;
889       uword mq_addr;
890
891       mq_addr = pointer_to_uword (session->our_evt_q);
892       ctr = vcl_ct_registration_lock_and_lookup (wrk, mq_addr);
893       ASSERT (ctr);
894       if (ctr->epoll_evt_conn_index != ~0)
895         vcl_mq_epoll_del_evfd (wrk, ctr->epoll_evt_conn_index);
896       VDBG (0, "Removing ct registration %u",
897             vcl_ct_registration_index (wrk, ctr));
898       vcl_ct_registration_del (wrk, ctr);
899       vcl_ct_registration_lookup_del (wrk, mq_addr);
900       vcl_ct_registration_unlock (wrk);
901     }
902
903   if (vpp_handle != ~0)
904     {
905       vcl_session_table_del_vpp_handle (wrk, vpp_handle);
906     }
907   vcl_session_free (wrk, session);
908
909   if (VPPCOM_DEBUG > 0)
910     {
911       if (is_vep)
912         clib_warning ("VCL<%d>: vep_idx %u / sid %u: epoll session removed.",
913                       getpid (), session_handle, session_handle);
914       else
915         clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: session removed.",
916                       getpid (), vpp_handle, session_handle);
917     }
918
919   vcl_evt (VCL_EVT_CLOSE, session, rv);
920
921   return rv;
922 }
923
924 int
925 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
926 {
927   vcl_worker_t *wrk = vcl_worker_get_current ();
928   vcl_session_t *session = 0;
929
930   if (!ep || !ep->ip)
931     return VPPCOM_EINVAL;
932
933   session = vcl_session_get_w_handle (wrk, session_handle);
934   if (!session)
935     return VPPCOM_EBADFD;
936
937   if (session->is_vep)
938     {
939       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
940                     "bind to an epoll session!", getpid (), session_handle);
941       return VPPCOM_EBADFD;
942     }
943
944   session->transport.is_ip4 = ep->is_ip4;
945   if (ep->is_ip4)
946     clib_memcpy (&session->transport.lcl_ip.ip4, ep->ip,
947                  sizeof (ip4_address_t));
948   else
949     clib_memcpy (&session->transport.lcl_ip.ip6, ep->ip,
950                  sizeof (ip6_address_t));
951   session->transport.lcl_port = ep->port;
952
953   VDBG (0, "VCL<%d>: sid %u: binding to local %s address %U port %u, "
954         "proto %s", getpid (), session_handle,
955         session->transport.is_ip4 ? "IPv4" : "IPv6",
956         format_ip46_address, &session->transport.lcl_ip,
957         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
958         clib_net_to_host_u16 (session->transport.lcl_port),
959         session->session_type ? "UDP" : "TCP");
960   vcl_evt (VCL_EVT_BIND, session);
961
962   if (session->session_type == VPPCOM_PROTO_UDP)
963     vppcom_session_listen (session_handle, 10);
964
965   return VPPCOM_OK;
966 }
967
968 int
969 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
970 {
971   vcl_worker_t *wrk = vcl_worker_get_current ();
972   vcl_session_t *listen_session = 0;
973   u64 listen_vpp_handle;
974   int rv;
975
976   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
977   if (!listen_session)
978     return VPPCOM_EBADFD;
979
980   if (q_len == 0 || q_len == ~0)
981     q_len = vcm->cfg.listen_queue_size;
982
983   if (listen_session->is_vep)
984     {
985       clib_warning ("VCL<%d>: ERROR: sid %u: cannot listen on an "
986                     "epoll session!", getpid (), listen_sh);
987       return VPPCOM_EBADFD;
988     }
989
990   listen_vpp_handle = listen_session->vpp_handle;
991   if (listen_session->session_state & STATE_LISTEN)
992     {
993       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: already in listen state!",
994             getpid (), listen_vpp_handle, listen_sh);
995       return VPPCOM_OK;
996     }
997
998   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: sending VPP bind+listen "
999         "request...", getpid (), listen_vpp_handle, listen_sh);
1000
1001   /*
1002    * Send listen request to vpp and wait for reply
1003    */
1004   vppcom_send_bind_sock (listen_session);
1005   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1006                                              STATE_LISTEN,
1007                                              vcm->cfg.session_timeout);
1008
1009   if (PREDICT_FALSE (rv))
1010     {
1011       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1012       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: bind+listen failed! "
1013             "returning %d (%s)", getpid (), listen_session->vpp_handle,
1014             listen_sh, rv, vppcom_retval_str (rv));
1015       return rv;
1016     }
1017
1018   return VPPCOM_OK;
1019 }
1020
1021 static int
1022 validate_args_session_accept_ (vcl_worker_t * wrk,
1023                                vcl_session_t * listen_session)
1024 {
1025   /* Input validation - expects spinlock on sessions_lockp */
1026   if (listen_session->is_vep)
1027     {
1028       clib_warning ("VCL<%d>: ERROR: sid %u: cannot accept on an "
1029                     "epoll session!", getpid (),
1030                     listen_session->session_index);
1031       return VPPCOM_EBADFD;
1032     }
1033
1034   if (listen_session->session_state != STATE_LISTEN)
1035     {
1036       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1037                     "not in listen state! state 0x%x (%s)", getpid (),
1038                     listen_session->vpp_handle, listen_session->session_index,
1039                     listen_session->session_state,
1040                     vppcom_session_state_str (listen_session->session_state));
1041       return VPPCOM_EBADFD;
1042     }
1043   return VPPCOM_OK;
1044 }
1045
1046 int
1047 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1048                        uint32_t flags)
1049 {
1050   u32 client_session_index = ~0, listen_session_index;
1051   vcl_worker_t *wrk = vcl_worker_get_current ();
1052   session_accepted_msg_t accepted_msg;
1053   vcl_session_t *listen_session = 0;
1054   vcl_session_t *client_session = 0;
1055   svm_msg_q_t *vpp_evt_q;
1056   vcl_session_msg_t *evt;
1057   u64 listen_vpp_handle;
1058   svm_msg_q_msg_t msg;
1059   session_event_t *e;
1060   u8 is_nonblocking;
1061   int rv;
1062
1063   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1064   if (!listen_session)
1065     return VPPCOM_EBADFD;
1066
1067   listen_session_index = listen_session->session_index;
1068   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1069     return rv;
1070
1071   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1072     {
1073       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1074       accepted_msg = evt->accepted_msg;
1075       goto handle;
1076     }
1077
1078   is_nonblocking = VCL_SESS_ATTR_TEST (listen_session->attr,
1079                                        VCL_SESS_ATTR_NONBLOCK);
1080   if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1081     return VPPCOM_EAGAIN;
1082
1083   while (1)
1084     {
1085       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1086         return VPPCOM_EAGAIN;
1087
1088       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1089       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1090         {
1091           clib_warning ("discarded event: %u", e->event_type);
1092           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1093           continue;
1094         }
1095       clib_memcpy (&accepted_msg, e->data, sizeof (accepted_msg));
1096       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1097       break;
1098     }
1099
1100 handle:
1101
1102   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg);
1103   listen_session = vcl_session_get (wrk, listen_session_index);
1104   client_session = vcl_session_get (wrk, client_session_index);
1105
1106   if (flags & O_NONBLOCK)
1107     VCL_SESS_ATTR_SET (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
1108
1109   listen_vpp_handle = listen_session->vpp_handle;
1110   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: Got a client request! "
1111         "vpp handle 0x%llx, sid %u, flags %d, is_nonblocking %u",
1112         getpid (), listen_vpp_handle, listen_session_handle,
1113         client_session->vpp_handle, client_session_index,
1114         flags, VCL_SESS_ATTR_TEST (client_session->attr,
1115                                    VCL_SESS_ATTR_NONBLOCK));
1116
1117   if (ep)
1118     {
1119       ep->is_ip4 = client_session->transport.is_ip4;
1120       ep->port = client_session->transport.rmt_port;
1121       if (client_session->transport.is_ip4)
1122         clib_memcpy (ep->ip, &client_session->transport.rmt_ip.ip4,
1123                      sizeof (ip4_address_t));
1124       else
1125         clib_memcpy (ep->ip, &client_session->transport.rmt_ip.ip6,
1126                      sizeof (ip6_address_t));
1127     }
1128
1129   if (accepted_msg.server_event_queue_address)
1130     vpp_evt_q = uword_to_pointer (accepted_msg.vpp_event_queue_address,
1131                                   svm_msg_q_t *);
1132   else
1133     vpp_evt_q = client_session->vpp_evt_q;
1134
1135   vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
1136                                    client_session->vpp_handle, 0);
1137
1138   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: accepted vpp handle 0x%llx, "
1139         "sid %u connection from peer %s address %U port %u to local %s "
1140         "address %U port %u", getpid (), listen_vpp_handle,
1141         listen_session_handle, client_session->vpp_handle,
1142         client_session_index,
1143         client_session->transport.is_ip4 ? "IPv4" : "IPv6",
1144         format_ip46_address, &client_session->transport.rmt_ip,
1145         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1146         clib_net_to_host_u16 (client_session->transport.rmt_port),
1147         client_session->transport.is_ip4 ? "IPv4" : "IPv6",
1148         format_ip46_address, &client_session->transport.lcl_ip,
1149         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1150         clib_net_to_host_u16 (client_session->transport.lcl_port));
1151   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1152            client_session_index);
1153
1154   return vcl_session_handle (client_session);
1155 }
1156
1157 int
1158 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1159 {
1160   vcl_worker_t *wrk = vcl_worker_get_current ();
1161   vcl_session_t *session = 0;
1162   u32 session_index;
1163   int rv;
1164
1165   session = vcl_session_get_w_handle (wrk, session_handle);
1166   if (!session)
1167     return VPPCOM_EBADFD;
1168   session_index = session->session_index;
1169
1170   if (PREDICT_FALSE (session->is_vep))
1171     {
1172       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1173                     "connect on an epoll session!", getpid (),
1174                     session_handle);
1175       return VPPCOM_EBADFD;
1176     }
1177
1178   if (PREDICT_FALSE (session->session_state & CLIENT_STATE_OPEN))
1179     {
1180       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: session already "
1181             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1182             getpid (), session->vpp_handle, session_handle,
1183             session->transport.is_ip4 ? "IPv4" : "IPv6",
1184             format_ip46_address,
1185             &session->transport.rmt_ip, session->transport.is_ip4 ?
1186             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1187             clib_net_to_host_u16 (session->transport.rmt_port),
1188             session->session_type ? "UDP" : "TCP", session->session_state,
1189             vppcom_session_state_str (session->session_state));
1190       return VPPCOM_OK;
1191     }
1192
1193   session->transport.is_ip4 = server_ep->is_ip4;
1194   if (session->transport.is_ip4)
1195     clib_memcpy (&session->transport.rmt_ip.ip4, server_ep->ip,
1196                  sizeof (ip4_address_t));
1197   else
1198     clib_memcpy (&session->transport.rmt_ip.ip6, server_ep->ip,
1199                  sizeof (ip6_address_t));
1200   session->transport.rmt_port = server_ep->port;
1201
1202   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connecting to server %s %U "
1203         "port %d proto %s",
1204         getpid (), session->vpp_handle, session_handle,
1205         session->transport.is_ip4 ? "IPv4" : "IPv6",
1206         format_ip46_address,
1207         &session->transport.rmt_ip, session->transport.is_ip4 ?
1208         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1209         clib_net_to_host_u16 (session->transport.rmt_port),
1210         session->session_type ? "UDP" : "TCP");
1211
1212   /*
1213    * Send connect request and wait for reply from vpp
1214    */
1215   vppcom_send_connect_sock (session);
1216   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
1217                                              vcm->cfg.session_timeout);
1218
1219   session = vcl_session_get (wrk, session_index);
1220
1221   if (PREDICT_FALSE (rv))
1222     {
1223       if (VPPCOM_DEBUG > 0)
1224         {
1225           if (session)
1226             clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: connect "
1227                           "failed! returning %d (%s)", getpid (),
1228                           session->vpp_handle, session_handle, rv,
1229                           vppcom_retval_str (rv));
1230           else
1231             clib_warning ("VCL<%d>: no session for sid %u: connect failed! "
1232                           "returning %d (%s)", getpid (),
1233                           session_handle, rv, vppcom_retval_str (rv));
1234         }
1235     }
1236   else
1237     VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connected!",
1238           getpid (), session->vpp_handle, session_handle);
1239
1240   return rv;
1241 }
1242
1243 static u8
1244 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1245 {
1246   if (!is_ct)
1247     return (e->event_type == FIFO_EVENT_APP_RX
1248             && e->fifo->client_session_index == sid);
1249   else
1250     return (e->event_type == SESSION_IO_EVT_CT_TX);
1251 }
1252
1253 static inline u8
1254 vcl_session_is_readable (vcl_session_t * s)
1255 {
1256   return ((s->session_state & STATE_OPEN)
1257           || (s->session_state == STATE_LISTEN
1258               && s->session_type == VPPCOM_PROTO_UDP));
1259 }
1260
1261 static inline int
1262 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1263                               u8 peek)
1264 {
1265   vcl_worker_t *wrk = vcl_worker_get_current ();
1266   int n_read = 0, rv, is_nonblocking;
1267   vcl_session_t *s = 0;
1268   svm_fifo_t *rx_fifo;
1269   svm_msg_q_msg_t msg;
1270   session_event_t *e;
1271   svm_msg_q_t *mq;
1272   u8 is_ct;
1273
1274   if (PREDICT_FALSE (!buf))
1275     return VPPCOM_EINVAL;
1276
1277   s = vcl_session_get_w_handle (wrk, session_handle);
1278   if (PREDICT_FALSE (!s || s->is_vep))
1279     return VPPCOM_EBADFD;
1280
1281   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
1282     {
1283       session_state_t state = s->session_state;
1284       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1285
1286       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
1287             "state 0x%x (%s), returning %d (%s)",
1288             getpid (), s->vpp_handle, session_handle, state,
1289             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1290       return rv;
1291     }
1292
1293   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1294   is_ct = vcl_session_is_ct (s);
1295   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1296   rx_fifo = s->rx_fifo;
1297
1298   if (svm_fifo_is_empty (rx_fifo))
1299     {
1300       if (is_nonblocking)
1301         {
1302           svm_fifo_unset_event (rx_fifo);
1303           return VPPCOM_OK;
1304         }
1305       while (svm_fifo_is_empty (rx_fifo))
1306         {
1307           svm_fifo_unset_event (rx_fifo);
1308           svm_msg_q_lock (mq);
1309           if (svm_msg_q_is_empty (mq))
1310             svm_msg_q_wait (mq);
1311
1312           svm_msg_q_sub_w_lock (mq, &msg);
1313           e = svm_msg_q_msg_data (mq, &msg);
1314           svm_msg_q_unlock (mq);
1315           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1316             {
1317               vcl_handle_mq_event (wrk, e);
1318               svm_msg_q_free_msg (mq, &msg);
1319               continue;
1320             }
1321           svm_msg_q_free_msg (mq, &msg);
1322
1323           if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
1324             return 0;
1325         }
1326     }
1327
1328   if (s->is_dgram)
1329     n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1330   else
1331     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1332
1333   if (svm_fifo_is_empty (rx_fifo))
1334     svm_fifo_unset_event (rx_fifo);
1335
1336   if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
1337     {
1338       /* If the peer is not polling send notification */
1339       if (!svm_fifo_has_event (s->rx_fifo))
1340         app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
1341                                 SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
1342     }
1343
1344   VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
1345         getpid (), s->vpp_handle, session_handle, n_read, rx_fifo);
1346
1347   return n_read;
1348 }
1349
1350 int
1351 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1352 {
1353   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1354 }
1355
1356 static int
1357 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1358 {
1359   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1360 }
1361
1362 int
1363 vppcom_session_read_segments (uint32_t session_handle,
1364                               vppcom_data_segments_t ds)
1365 {
1366   vcl_worker_t *wrk = vcl_worker_get_current ();
1367   int n_read = 0, rv, is_nonblocking;
1368   vcl_session_t *s = 0;
1369   svm_fifo_t *rx_fifo;
1370   svm_msg_q_msg_t msg;
1371   session_event_t *e;
1372   svm_msg_q_t *mq;
1373   u8 is_ct;
1374
1375   s = vcl_session_get_w_handle (wrk, session_handle);
1376   if (PREDICT_FALSE (!s || s->is_vep))
1377     return VPPCOM_EBADFD;
1378
1379   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
1380     {
1381       session_state_t state = s->session_state;
1382       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1383       return rv;
1384     }
1385
1386   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1387   is_ct = vcl_session_is_ct (s);
1388   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1389   rx_fifo = s->rx_fifo;
1390
1391   if (svm_fifo_is_empty (rx_fifo))
1392     {
1393       if (is_nonblocking)
1394         {
1395           svm_fifo_unset_event (rx_fifo);
1396           return VPPCOM_OK;
1397         }
1398       while (svm_fifo_is_empty (rx_fifo))
1399         {
1400           svm_fifo_unset_event (rx_fifo);
1401           svm_msg_q_lock (mq);
1402           if (svm_msg_q_is_empty (mq))
1403             svm_msg_q_wait (mq);
1404
1405           svm_msg_q_sub_w_lock (mq, &msg);
1406           e = svm_msg_q_msg_data (mq, &msg);
1407           svm_msg_q_unlock (mq);
1408           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1409             {
1410               vcl_handle_mq_event (wrk, e);
1411               svm_msg_q_free_msg (mq, &msg);
1412               continue;
1413             }
1414           svm_msg_q_free_msg (mq, &msg);
1415
1416           if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
1417             return 0;
1418         }
1419     }
1420
1421   n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
1422   svm_fifo_unset_event (rx_fifo);
1423
1424   if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
1425     {
1426       /* If the peer is not polling send notification */
1427       if (!svm_fifo_has_event (s->rx_fifo))
1428         app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
1429                                 SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
1430     }
1431
1432   return n_read;
1433 }
1434
1435 void
1436 vppcom_session_free_segments (uint32_t session_handle,
1437                               vppcom_data_segments_t ds)
1438 {
1439   vcl_worker_t *wrk = vcl_worker_get_current ();
1440   vcl_session_t *s;
1441
1442   s = vcl_session_get_w_handle (wrk, session_handle);
1443   if (PREDICT_FALSE (!s || s->is_vep))
1444     return;
1445
1446   svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
1447 }
1448
1449 static inline int
1450 vppcom_session_read_ready (vcl_session_t * session)
1451 {
1452   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1453   if (PREDICT_FALSE (session->is_vep))
1454     {
1455       clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
1456                     "epoll session!", getpid (), session->session_index);
1457       return VPPCOM_EBADFD;
1458     }
1459
1460   if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
1461     {
1462       session_state_t state = session->session_state;
1463       int rv;
1464
1465       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1466
1467       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
1468             " state 0x%x (%s), returning %d (%s)", getpid (),
1469             session->vpp_handle, session->session_index, state,
1470             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1471       return rv;
1472     }
1473
1474   if (session->session_state & STATE_LISTEN)
1475     return clib_fifo_elts (session->accept_evts_fifo);
1476
1477   return svm_fifo_max_dequeue (session->rx_fifo);
1478 }
1479
1480 int
1481 vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
1482 {
1483   u32 first_copy = clib_min (ds[0].len, max_bytes);
1484   clib_memcpy (buf, ds[0].data, first_copy);
1485   if (first_copy < max_bytes)
1486     {
1487       clib_memcpy (buf + first_copy, ds[1].data,
1488                    clib_min (ds[1].len, max_bytes - first_copy));
1489     }
1490   return 0;
1491 }
1492
1493 static u8
1494 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1495 {
1496   if (!is_ct)
1497     return (e->event_type == FIFO_EVENT_APP_TX
1498             && e->fifo->client_session_index == sid);
1499   else
1500     return (e->event_type == SESSION_IO_EVT_CT_RX);
1501 }
1502
1503 int
1504 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
1505 {
1506   vcl_worker_t *wrk = vcl_worker_get_current ();
1507   int rv, n_write, is_nonblocking;
1508   vcl_session_t *s = 0;
1509   svm_fifo_t *tx_fifo = 0;
1510   session_evt_type_t et;
1511   svm_msg_q_msg_t msg;
1512   session_event_t *e;
1513   svm_msg_q_t *mq;
1514   u8 is_ct;
1515
1516   if (PREDICT_FALSE (!buf))
1517     return VPPCOM_EINVAL;
1518
1519   s = vcl_session_get_w_handle (wrk, session_handle);
1520   if (PREDICT_FALSE (!s))
1521     return VPPCOM_EBADFD;
1522
1523   if (PREDICT_FALSE (s->is_vep))
1524     {
1525       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1526                     "cannot write to an epoll session!",
1527                     getpid (), s->vpp_handle, session_handle);
1528
1529       return VPPCOM_EBADFD;
1530     }
1531
1532   if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
1533     {
1534       session_state_t state = s->session_state;
1535       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1536       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
1537             "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
1538             state, vppcom_session_state_str (state));
1539       return rv;
1540     }
1541
1542   tx_fifo = s->tx_fifo;
1543   is_ct = vcl_session_is_ct (s);
1544   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1545   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1546   if (svm_fifo_is_full (tx_fifo))
1547     {
1548       if (is_nonblocking)
1549         {
1550           return VPPCOM_EWOULDBLOCK;
1551         }
1552       while (svm_fifo_is_full (tx_fifo))
1553         {
1554           svm_fifo_set_want_tx_evt (tx_fifo, 1);
1555           svm_msg_q_lock (mq);
1556           svm_msg_q_wait (mq);
1557
1558           svm_msg_q_sub_w_lock (mq, &msg);
1559           e = svm_msg_q_msg_data (mq, &msg);
1560           svm_msg_q_unlock (mq);
1561
1562           if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
1563             vcl_handle_mq_event (wrk, e);
1564           svm_msg_q_free_msg (mq, &msg);
1565         }
1566     }
1567
1568   ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
1569   et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
1570   if (s->is_dgram)
1571     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
1572                                   s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
1573   else
1574     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
1575                                    SVM_Q_WAIT);
1576
1577   ASSERT (n_write > 0);
1578
1579   VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
1580         s->vpp_handle, session_handle, n_write);
1581
1582   return n_write;
1583 }
1584
1585 static vcl_session_t *
1586 vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
1587 {
1588   vcl_session_t *s;
1589   s = vcl_session_get (wrk, f->client_session_index);
1590   if (s)
1591     {
1592       /* rx fifo */
1593       if (type == 0 && s->rx_fifo == f)
1594         return s;
1595       /* tx fifo */
1596       if (type == 1 && s->tx_fifo == f)
1597         return s;
1598     }
1599   s = vcl_session_get (wrk, f->master_session_index);
1600   if (s)
1601     {
1602       if (type == 0 && s->rx_fifo == f)
1603         return s;
1604       if (type == 1 && s->tx_fifo == f)
1605         return s;
1606     }
1607   return 0;
1608 }
1609
1610 static inline int
1611 vppcom_session_write_ready (vcl_session_t * session)
1612 {
1613   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1614   if (PREDICT_FALSE (session->is_vep))
1615     {
1616       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1617                     "cannot write to an epoll session!",
1618                     getpid (), session->vpp_handle, session->session_index);
1619       return VPPCOM_EBADFD;
1620     }
1621
1622   if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
1623     {
1624       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1625                     "cannot write to a listen session!",
1626                     getpid (), session->vpp_handle, session->session_index);
1627       return VPPCOM_EBADFD;
1628     }
1629
1630   if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
1631     {
1632       session_state_t state = session->session_state;
1633       int rv;
1634
1635       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1636       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1637                     "session is not open! state 0x%x (%s), "
1638                     "returning %d (%s)", getpid (), session->vpp_handle,
1639                     session->session_index,
1640                     state, vppcom_session_state_str (state),
1641                     rv, vppcom_retval_str (rv));
1642       return rv;
1643     }
1644
1645   VDBG (3, "VCL<%d>: vpp handle 0x%llx, sid %u: peek %s (%p), ready = %d",
1646         getpid (), session->vpp_handle, session->session_index,
1647         session->tx_fifo, svm_fifo_max_enqueue (session->tx_fifo));
1648
1649   return svm_fifo_max_enqueue (session->tx_fifo);
1650 }
1651
1652 static inline int
1653 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
1654 {
1655   svm_msg_q_msg_t *msg;
1656   u32 n_msgs;
1657   int i;
1658
1659   n_msgs = svm_msg_q_size (mq);
1660   for (i = 0; i < n_msgs; i++)
1661     {
1662       vec_add2 (wrk->mq_msg_vector, msg, 1);
1663       svm_msg_q_sub_w_lock (mq, msg);
1664     }
1665   return n_msgs;
1666 }
1667
1668 #define vcl_fifo_rx_evt_valid_or_break(_fifo)                   \
1669 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                  \
1670   {                                                             \
1671     svm_fifo_unset_event (_fifo);                               \
1672     if (svm_fifo_is_empty (_fifo))                              \
1673       break;                                                    \
1674   }                                                             \
1675
1676 static void
1677 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
1678                             unsigned long n_bits, unsigned long *read_map,
1679                             unsigned long *write_map,
1680                             unsigned long *except_map, u32 * bits_set)
1681 {
1682   session_disconnected_msg_t *disconnected_msg;
1683   session_connected_msg_t *connected_msg;
1684   session_accepted_msg_t *accepted_msg;
1685   vcl_session_msg_t *vcl_msg;
1686   vcl_session_t *session;
1687   u64 handle;
1688   u32 sid;
1689
1690   switch (e->event_type)
1691     {
1692     case FIFO_EVENT_APP_RX:
1693       vcl_fifo_rx_evt_valid_or_break (e->fifo);
1694       sid = e->fifo->client_session_index;
1695       session = vcl_session_get (wrk, sid);
1696       if (!session)
1697         break;
1698       if (sid < n_bits && read_map)
1699         {
1700           clib_bitmap_set_no_check (read_map, sid, 1);
1701           *bits_set += 1;
1702         }
1703       break;
1704     case FIFO_EVENT_APP_TX:
1705       sid = e->fifo->client_session_index;
1706       session = vcl_session_get (wrk, sid);
1707       if (!session)
1708         break;
1709       if (sid < n_bits && write_map)
1710         {
1711           clib_bitmap_set_no_check (write_map, sid, 1);
1712           *bits_set += 1;
1713         }
1714       break;
1715     case SESSION_IO_EVT_CT_TX:
1716       vcl_fifo_rx_evt_valid_or_break (e->fifo);
1717       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
1718       if (!session)
1719         break;
1720       sid = session->session_index;
1721       if (sid < n_bits && read_map)
1722         {
1723           clib_bitmap_set_no_check (read_map, sid, 1);
1724           *bits_set += 1;
1725         }
1726       break;
1727     case SESSION_IO_EVT_CT_RX:
1728       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
1729       if (!session)
1730         break;
1731       sid = session->session_index;
1732       if (sid < n_bits && write_map)
1733         {
1734           clib_bitmap_set_no_check (write_map, sid, 1);
1735           *bits_set += 1;
1736         }
1737       break;
1738     case SESSION_CTRL_EVT_ACCEPTED:
1739       accepted_msg = (session_accepted_msg_t *) e->data;
1740       handle = accepted_msg->listener_handle;
1741       session = vcl_session_table_lookup_listener (wrk, handle);
1742       if (!session)
1743         {
1744           clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
1745                         "listener handle %llx", getpid (), handle);
1746           break;
1747         }
1748
1749       clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
1750       vcl_msg->accepted_msg = *accepted_msg;
1751       sid = session->session_index;
1752       if (sid < n_bits && read_map)
1753         {
1754           clib_bitmap_set_no_check (read_map, sid, 1);
1755           *bits_set += 1;
1756         }
1757       break;
1758     case SESSION_CTRL_EVT_CONNECTED:
1759       connected_msg = (session_connected_msg_t *) e->data;
1760       vcl_session_connected_handler (wrk, connected_msg);
1761       break;
1762     case SESSION_CTRL_EVT_DISCONNECTED:
1763       disconnected_msg = (session_disconnected_msg_t *) e->data;
1764       sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
1765       if (sid < n_bits && except_map)
1766         {
1767           clib_bitmap_set_no_check (except_map, sid, 1);
1768           *bits_set += 1;
1769         }
1770       break;
1771     case SESSION_CTRL_EVT_RESET:
1772       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
1773       if (sid < n_bits && except_map)
1774         {
1775           clib_bitmap_set_no_check (except_map, sid, 1);
1776           *bits_set += 1;
1777         }
1778       break;
1779     default:
1780       clib_warning ("unhandled: %u", e->event_type);
1781       break;
1782     }
1783 }
1784
1785 static int
1786 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
1787                       unsigned long n_bits, unsigned long *read_map,
1788                       unsigned long *write_map, unsigned long *except_map,
1789                       double time_to_wait, u32 * bits_set)
1790 {
1791   svm_msg_q_msg_t *msg;
1792   session_event_t *e;
1793   u32 i;
1794
1795   svm_msg_q_lock (mq);
1796   if (svm_msg_q_is_empty (mq))
1797     {
1798       if (*bits_set)
1799         {
1800           svm_msg_q_unlock (mq);
1801           return 0;
1802         }
1803
1804       if (!time_to_wait)
1805         {
1806           svm_msg_q_unlock (mq);
1807           return 0;
1808         }
1809       else if (time_to_wait < 0)
1810         {
1811           svm_msg_q_wait (mq);
1812         }
1813       else
1814         {
1815           if (svm_msg_q_timedwait (mq, time_to_wait))
1816             {
1817               svm_msg_q_unlock (mq);
1818               return 0;
1819             }
1820         }
1821     }
1822   vcl_mq_dequeue_batch (wrk, mq);
1823   svm_msg_q_unlock (mq);
1824
1825   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
1826     {
1827       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
1828       e = svm_msg_q_msg_data (mq, msg);
1829       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
1830                                   except_map, bits_set);
1831       svm_msg_q_free_msg (mq, msg);
1832     }
1833   vec_reset_length (wrk->mq_msg_vector);
1834   return *bits_set;
1835 }
1836
1837 static int
1838 vppcom_select_condvar (vcl_worker_t * wrk, unsigned long n_bits,
1839                        unsigned long *read_map, unsigned long *write_map,
1840                        unsigned long *except_map, double time_to_wait,
1841                        u32 * bits_set)
1842 {
1843   double total_wait = 0, wait_slice;
1844   vcl_cut_through_registration_t *cr;
1845
1846   time_to_wait = (time_to_wait == -1) ? 10e9 : time_to_wait;
1847   wait_slice = wrk->cut_through_registrations ? 10e-6 : time_to_wait;
1848   do
1849     {
1850       vcl_ct_registration_lock (wrk);
1851       /* *INDENT-OFF* */
1852       pool_foreach (cr, wrk->cut_through_registrations, ({
1853         vcl_select_handle_mq (wrk, cr->mq, n_bits, read_map, write_map, except_map,
1854                               0, bits_set);
1855       }));
1856       /* *INDENT-ON* */
1857       vcl_ct_registration_unlock (wrk);
1858
1859       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
1860                             write_map, except_map, time_to_wait, bits_set);
1861       total_wait += wait_slice;
1862       if (*bits_set)
1863         return *bits_set;
1864     }
1865   while (total_wait < time_to_wait);
1866
1867   return 0;
1868 }
1869
1870 static int
1871 vppcom_select_eventfd (vcl_worker_t * wrk, unsigned long n_bits,
1872                        unsigned long *read_map, unsigned long *write_map,
1873                        unsigned long *except_map, double time_to_wait,
1874                        u32 * bits_set)
1875 {
1876   vcl_mq_evt_conn_t *mqc;
1877   int __clib_unused n_read;
1878   int n_mq_evts, i;
1879   u64 buf;
1880
1881   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
1882   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
1883                           vec_len (wrk->mq_events), time_to_wait);
1884   for (i = 0; i < n_mq_evts; i++)
1885     {
1886       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
1887       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
1888       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
1889                             except_map, 0, bits_set);
1890     }
1891
1892   return (n_mq_evts > 0 ? (int) *bits_set : 0);
1893 }
1894
1895 int
1896 vppcom_select (unsigned long n_bits, unsigned long *read_map,
1897                unsigned long *write_map, unsigned long *except_map,
1898                double time_to_wait)
1899 {
1900   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
1901   vcl_worker_t *wrk = vcl_worker_get_current ();
1902   vcl_session_t *session = 0;
1903   int rv, i;
1904
1905   ASSERT (sizeof (clib_bitmap_t) == sizeof (long int));
1906
1907   if (n_bits && read_map)
1908     {
1909       clib_bitmap_validate (wrk->rd_bitmap, minbits);
1910       clib_memcpy (wrk->rd_bitmap, read_map,
1911                    vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
1912       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
1913     }
1914   if (n_bits && write_map)
1915     {
1916       clib_bitmap_validate (wrk->wr_bitmap, minbits);
1917       clib_memcpy (wrk->wr_bitmap, write_map,
1918                    vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
1919       memset (write_map, 0,
1920               vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
1921     }
1922   if (n_bits && except_map)
1923     {
1924       clib_bitmap_validate (wrk->ex_bitmap, minbits);
1925       clib_memcpy (wrk->ex_bitmap, except_map,
1926                    vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
1927       memset (except_map, 0,
1928               vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
1929     }
1930
1931   if (!n_bits)
1932     return 0;
1933
1934   if (!write_map)
1935     goto check_rd;
1936
1937   /* *INDENT-OFF* */
1938   clib_bitmap_foreach (sid, wrk->wr_bitmap, ({
1939     if (!(session = vcl_session_get (wrk, sid)))
1940       {
1941         VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
1942               getpid (), sid);
1943         return VPPCOM_EBADFD;
1944       }
1945
1946     rv = svm_fifo_is_full (session->tx_fifo);
1947     if (!rv)
1948       {
1949         clib_bitmap_set_no_check (write_map, sid, 1);
1950         bits_set++;
1951       }
1952   }));
1953
1954 check_rd:
1955   if (!read_map)
1956     goto check_mq;
1957
1958   clib_bitmap_foreach (sid, wrk->rd_bitmap, ({
1959     if (!(session = vcl_session_get (wrk, sid)))
1960       {
1961         VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
1962               getpid (), sid);
1963         return VPPCOM_EBADFD;
1964       }
1965
1966     rv = vppcom_session_read_ready (session);
1967     if (rv)
1968       {
1969         clib_bitmap_set_no_check (read_map, sid, 1);
1970         bits_set++;
1971       }
1972   }));
1973   /* *INDENT-ON* */
1974
1975 check_mq:
1976
1977   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
1978     {
1979       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
1980                                   read_map, write_map, except_map, &bits_set);
1981     }
1982   vec_reset_length (wrk->unhandled_evts_vector);
1983
1984   if (vcm->cfg.use_mq_eventfd)
1985     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
1986                            time_to_wait, &bits_set);
1987   else
1988     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
1989                            time_to_wait, &bits_set);
1990
1991   return (bits_set);
1992 }
1993
1994 static inline void
1995 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_idx)
1996 {
1997   vcl_session_t *session;
1998   vppcom_epoll_t *vep;
1999   u32 sid = vep_idx;
2000
2001   if (VPPCOM_DEBUG <= 1)
2002     return;
2003
2004   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
2005   session = vcl_session_get (wrk, vep_idx);
2006   if (PREDICT_FALSE (!session))
2007     {
2008       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!",
2009                     getpid (), vep_idx);
2010       goto done;
2011     }
2012   if (PREDICT_FALSE (!session->is_vep))
2013     {
2014       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2015                     getpid (), vep_idx);
2016       goto done;
2017     }
2018   vep = &session->vep;
2019   clib_warning ("VCL<%d>: vep_idx (%u): Dumping epoll chain\n"
2020                 "{\n"
2021                 "   is_vep         = %u\n"
2022                 "   is_vep_session = %u\n"
2023                 "   next_sid       = 0x%x (%u)\n"
2024                 "   wait_cont_idx  = 0x%x (%u)\n"
2025                 "}\n", getpid (), vep_idx,
2026                 session->is_vep, session->is_vep_session,
2027                 vep->next_sh, vep->next_sh,
2028                 session->wait_cont_idx, session->wait_cont_idx);
2029
2030   for (sid = vep->next_sh; sid != ~0; sid = vep->next_sh)
2031     {
2032       session = vcl_session_get (wrk, sid);
2033       if (PREDICT_FALSE (!session))
2034         {
2035           clib_warning ("VCL<%d>: ERROR: Invalid sid (%u)!", getpid (), sid);
2036           goto done;
2037         }
2038       if (PREDICT_FALSE (session->is_vep))
2039         clib_warning ("VCL<%d>: ERROR: sid (%u) is a vep!",
2040                       getpid (), vep_idx);
2041       else if (PREDICT_FALSE (!session->is_vep_session))
2042         {
2043           clib_warning ("VCL<%d>: ERROR: session (%u) "
2044                         "is not a vep session!", getpid (), sid);
2045           goto done;
2046         }
2047       vep = &session->vep;
2048       if (PREDICT_FALSE (vep->vep_sh != vep_idx))
2049         clib_warning ("VCL<%d>: ERROR: session (%u) vep_idx (%u) != "
2050                       "vep_idx (%u)!", getpid (),
2051                       sid, session->vep.vep_sh, vep_idx);
2052       if (session->is_vep_session)
2053         {
2054           clib_warning ("vep_idx[%u]: sid 0x%x (%u)\n"
2055                         "{\n"
2056                         "   next_sid       = 0x%x (%u)\n"
2057                         "   prev_sid       = 0x%x (%u)\n"
2058                         "   vep_idx        = 0x%x (%u)\n"
2059                         "   ev.events      = 0x%x\n"
2060                         "   ev.data.u64    = 0x%llx\n"
2061                         "   et_mask        = 0x%x\n"
2062                         "}\n",
2063                         vep_idx, sid, sid,
2064                         vep->next_sh, vep->next_sh,
2065                         vep->prev_sh, vep->prev_sh,
2066                         vep->vep_sh, vep->vep_sh,
2067                         vep->ev.events, vep->ev.data.u64, vep->et_mask);
2068         }
2069     }
2070
2071 done:
2072   clib_warning ("VCL<%d>: vep_idx (%u): Dump complete!\n",
2073                 getpid (), vep_idx);
2074 }
2075
2076 int
2077 vppcom_epoll_create (void)
2078 {
2079   vcl_worker_t *wrk = vcl_worker_get_current ();
2080   vcl_session_t *vep_session;
2081
2082   vep_session = vcl_session_alloc (wrk);
2083
2084   vep_session->is_vep = 1;
2085   vep_session->vep.vep_sh = ~0;
2086   vep_session->vep.next_sh = ~0;
2087   vep_session->vep.prev_sh = ~0;
2088   vep_session->wait_cont_idx = ~0;
2089   vep_session->vpp_handle = ~0;
2090
2091   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_sh);
2092   VDBG (0, "VCL<%d>: Created vep_idx %u / sid %u!",
2093         getpid (), vep_session->session_index, vep_session->session_index);
2094
2095   return vcl_session_handle (vep_session);
2096 }
2097
2098 int
2099 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2100                   struct epoll_event *event)
2101 {
2102   vcl_worker_t *wrk = vcl_worker_get_current ();
2103   vcl_session_t *vep_session;
2104   vcl_session_t *session;
2105   int rv = VPPCOM_OK;
2106
2107   if (vep_handle == session_handle)
2108     {
2109       clib_warning ("VCL<%d>: ERROR: vep_idx == session_index (%u)!",
2110                     getpid (), vep_handle);
2111       return VPPCOM_EINVAL;
2112     }
2113
2114   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2115   if (PREDICT_FALSE (!vep_session))
2116     {
2117       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!", vep_handle);
2118       return VPPCOM_EBADFD;
2119     }
2120   if (PREDICT_FALSE (!vep_session->is_vep))
2121     {
2122       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2123                     getpid (), vep_handle);
2124       return VPPCOM_EINVAL;
2125     }
2126
2127   ASSERT (vep_session->vep.vep_sh == ~0);
2128   ASSERT (vep_session->vep.prev_sh == ~0);
2129
2130   session = vcl_session_get_w_handle (wrk, session_handle);
2131   if (PREDICT_FALSE (!session))
2132     {
2133       VDBG (0, "VCL<%d>: ERROR: Invalid session_handle (%u)!",
2134             getpid (), session_handle);
2135       return VPPCOM_EBADFD;
2136     }
2137   if (PREDICT_FALSE (session->is_vep))
2138     {
2139       clib_warning ("ERROR: session_handle (%u) is a vep!", vep_handle);
2140       return VPPCOM_EINVAL;
2141     }
2142
2143   switch (op)
2144     {
2145     case EPOLL_CTL_ADD:
2146       if (PREDICT_FALSE (!event))
2147         {
2148           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: NULL pointer to "
2149                         "epoll_event structure!", getpid ());
2150           return VPPCOM_EINVAL;
2151         }
2152       if (vep_session->vep.next_sh != ~0)
2153         {
2154           vcl_session_t *next_session;
2155           next_session = vcl_session_get_w_handle (wrk,
2156                                                    vep_session->vep.next_sh);
2157           if (PREDICT_FALSE (!next_session))
2158             {
2159               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: Invalid "
2160                             "vep.next_sid (%u) on vep_idx (%u)!",
2161                             getpid (), vep_session->vep.next_sh, vep_handle);
2162               return VPPCOM_EBADFD;
2163             }
2164           ASSERT (next_session->vep.prev_sh == vep_handle);
2165           next_session->vep.prev_sh = session_handle;
2166         }
2167       session->vep.next_sh = vep_session->vep.next_sh;
2168       session->vep.prev_sh = vep_handle;
2169       session->vep.vep_sh = vep_handle;
2170       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2171       session->vep.ev = *event;
2172       session->is_vep = 0;
2173       session->is_vep_session = 1;
2174       vep_session->vep.next_sh = session_handle;
2175
2176       VDBG (1, "VCL<%d>: EPOLL_CTL_ADD: vep_idx %u, sid %u, events 0x%x, "
2177             "data 0x%llx!", getpid (), vep_handle, session_handle,
2178             event->events, event->data.u64);
2179       vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
2180       break;
2181
2182     case EPOLL_CTL_MOD:
2183       if (PREDICT_FALSE (!event))
2184         {
2185           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_MOD: NULL pointer to "
2186                         "epoll_event structure!", getpid ());
2187           rv = VPPCOM_EINVAL;
2188           goto done;
2189         }
2190       else if (PREDICT_FALSE (!session->is_vep_session))
2191         {
2192           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2193                         "not a vep session!", getpid (), session_handle);
2194           rv = VPPCOM_EINVAL;
2195           goto done;
2196         }
2197       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2198         {
2199           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2200                         "vep_idx (%u) != vep_idx (%u)!",
2201                         getpid (), session_handle,
2202                         session->vep.vep_sh, vep_handle);
2203           rv = VPPCOM_EINVAL;
2204           goto done;
2205         }
2206       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2207       session->vep.ev = *event;
2208       VDBG (1, "VCL<%d>: EPOLL_CTL_MOD: vep_idx %u, sid %u, events 0x%x,"
2209             " data 0x%llx!", getpid (), vep_handle, session_handle,
2210             event->events, event->data.u64);
2211       break;
2212
2213     case EPOLL_CTL_DEL:
2214       if (PREDICT_FALSE (!session->is_vep_session))
2215         {
2216           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2217                         "not a vep session!", getpid (), session_handle);
2218           rv = VPPCOM_EINVAL;
2219           goto done;
2220         }
2221       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2222         {
2223           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2224                         "vep_idx (%u) != vep_idx (%u)!",
2225                         getpid (), session_handle,
2226                         session->vep.vep_sh, vep_handle);
2227           rv = VPPCOM_EINVAL;
2228           goto done;
2229         }
2230
2231       vep_session->wait_cont_idx =
2232         (vep_session->wait_cont_idx == session_handle) ?
2233         session->vep.next_sh : vep_session->wait_cont_idx;
2234
2235       if (session->vep.prev_sh == vep_handle)
2236         vep_session->vep.next_sh = session->vep.next_sh;
2237       else
2238         {
2239           vcl_session_t *prev_session;
2240           prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh);
2241           if (PREDICT_FALSE (!prev_session))
2242             {
2243               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2244                             "vep.prev_sid (%u) on sid (%u)!",
2245                             getpid (), session->vep.prev_sh, session_handle);
2246               return VPPCOM_EBADFD;
2247             }
2248           ASSERT (prev_session->vep.next_sh == session_handle);
2249           prev_session->vep.next_sh = session->vep.next_sh;
2250         }
2251       if (session->vep.next_sh != ~0)
2252         {
2253           vcl_session_t *next_session;
2254           next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh);
2255           if (PREDICT_FALSE (!next_session))
2256             {
2257               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2258                             "vep.next_sid (%u) on sid (%u)!",
2259                             getpid (), session->vep.next_sh, session_handle);
2260               return VPPCOM_EBADFD;
2261             }
2262           ASSERT (next_session->vep.prev_sh == session_handle);
2263           next_session->vep.prev_sh = session->vep.prev_sh;
2264         }
2265
2266       memset (&session->vep, 0, sizeof (session->vep));
2267       session->vep.next_sh = ~0;
2268       session->vep.prev_sh = ~0;
2269       session->vep.vep_sh = ~0;
2270       session->is_vep_session = 0;
2271       VDBG (1, "VCL<%d>: EPOLL_CTL_DEL: vep_idx %u, sid %u!",
2272             getpid (), vep_handle, session_handle);
2273       vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
2274       break;
2275
2276     default:
2277       clib_warning ("VCL<%d>: ERROR: Invalid operation (%d)!", getpid (), op);
2278       rv = VPPCOM_EINVAL;
2279     }
2280
2281   vep_verify_epoll_chain (wrk, vep_handle);
2282
2283 done:
2284   return rv;
2285 }
2286
2287 static inline void
2288 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2289                                 struct epoll_event *events, u32 * num_ev)
2290 {
2291   session_disconnected_msg_t *disconnected_msg;
2292   session_connected_msg_t *connected_msg;
2293   session_accepted_msg_t *accepted_msg;
2294   u64 session_evt_data = ~0, handle;
2295   u32 sid = ~0, session_events;
2296   vcl_session_msg_t *vcl_msg;
2297   vcl_session_t *session;
2298   u8 add_event = 0;
2299
2300   switch (e->event_type)
2301     {
2302     case FIFO_EVENT_APP_RX:
2303       ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
2304       vcl_fifo_rx_evt_valid_or_break (e->fifo);
2305       sid = e->fifo->client_session_index;
2306       session = vcl_session_get (wrk, sid);
2307       session_events = session->vep.ev.events;
2308       if (!(EPOLLIN & session->vep.ev.events))
2309         break;
2310       add_event = 1;
2311       events[*num_ev].events |= EPOLLIN;
2312       session_evt_data = session->vep.ev.data.u64;
2313       break;
2314     case FIFO_EVENT_APP_TX:
2315       sid = e->fifo->client_session_index;
2316       session = vcl_session_get (wrk, sid);
2317       session_events = session->vep.ev.events;
2318       if (!(EPOLLOUT & session_events))
2319         break;
2320       add_event = 1;
2321       events[*num_ev].events |= EPOLLOUT;
2322       session_evt_data = session->vep.ev.data.u64;
2323       break;
2324     case SESSION_IO_EVT_CT_TX:
2325       vcl_fifo_rx_evt_valid_or_break (e->fifo);
2326       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
2327       sid = session->session_index;
2328       session_events = session->vep.ev.events;
2329       if (!(EPOLLIN & session->vep.ev.events))
2330         break;
2331       add_event = 1;
2332       events[*num_ev].events |= EPOLLIN;
2333       session_evt_data = session->vep.ev.data.u64;
2334       break;
2335     case SESSION_IO_EVT_CT_RX:
2336       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
2337       sid = session->session_index;
2338       session_events = session->vep.ev.events;
2339       if (!(EPOLLOUT & session_events))
2340         break;
2341       add_event = 1;
2342       events[*num_ev].events |= EPOLLOUT;
2343       session_evt_data = session->vep.ev.data.u64;
2344       break;
2345     case SESSION_CTRL_EVT_ACCEPTED:
2346       accepted_msg = (session_accepted_msg_t *) e->data;
2347       handle = accepted_msg->listener_handle;
2348       session = vcl_session_table_lookup_listener (wrk, handle);
2349       if (!session)
2350         {
2351           clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
2352                         "listener handle %llx", getpid (), handle);
2353           break;
2354         }
2355
2356       clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
2357       vcl_msg->accepted_msg = *accepted_msg;
2358       session_events = session->vep.ev.events;
2359       if (!(EPOLLIN & session_events))
2360         break;
2361
2362       add_event = 1;
2363       events[*num_ev].events |= EPOLLIN;
2364       session_evt_data = session->vep.ev.data.u64;
2365       break;
2366     case SESSION_CTRL_EVT_CONNECTED:
2367       connected_msg = (session_connected_msg_t *) e->data;
2368       vcl_session_connected_handler (wrk, connected_msg);
2369       /* Generate EPOLLOUT because there's no connected event */
2370       sid = vcl_session_index_from_vpp_handle (wrk, connected_msg->handle);
2371       session = vcl_session_get (wrk, sid);
2372       session_events = session->vep.ev.events;
2373       if (EPOLLOUT & session_events)
2374         {
2375           add_event = 1;
2376           events[*num_ev].events |= EPOLLOUT;
2377           session_evt_data = session->vep.ev.data.u64;
2378         }
2379       break;
2380     case SESSION_CTRL_EVT_DISCONNECTED:
2381       disconnected_msg = (session_disconnected_msg_t *) e->data;
2382       sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
2383       if (!(session = vcl_session_get (wrk, sid)))
2384         break;
2385       add_event = 1;
2386       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2387       session_evt_data = session->vep.ev.data.u64;
2388       session_events = session->vep.ev.events;
2389       break;
2390     case SESSION_CTRL_EVT_RESET:
2391       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2392       if (!(session = vcl_session_get (wrk, sid)))
2393         break;
2394       add_event = 1;
2395       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2396       session_evt_data = session->vep.ev.data.u64;
2397       session_events = session->vep.ev.events;
2398       break;
2399     default:
2400       VDBG (0, "unhandled: %u", e->event_type);
2401       break;
2402     }
2403
2404   if (add_event)
2405     {
2406       events[*num_ev].data.u64 = session_evt_data;
2407       if (EPOLLONESHOT & session_events)
2408         {
2409           session = vcl_session_get (wrk, sid);
2410           session->vep.ev.events = 0;
2411         }
2412       *num_ev += 1;
2413     }
2414 }
2415
2416 static int
2417 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2418                           struct epoll_event *events, u32 maxevents,
2419                           double wait_for_time, u32 * num_ev)
2420 {
2421   svm_msg_q_msg_t *msg;
2422   session_event_t *e;
2423   int i;
2424
2425   svm_msg_q_lock (mq);
2426   if (svm_msg_q_is_empty (mq))
2427     {
2428       if (!wait_for_time)
2429         {
2430           svm_msg_q_unlock (mq);
2431           return 0;
2432         }
2433       else if (wait_for_time < 0)
2434         {
2435           svm_msg_q_wait (mq);
2436         }
2437       else
2438         {
2439           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2440             {
2441               svm_msg_q_unlock (mq);
2442               return 0;
2443             }
2444         }
2445     }
2446   vcl_mq_dequeue_batch (wrk, mq);
2447   svm_msg_q_unlock (mq);
2448
2449   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2450     {
2451       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2452       e = svm_msg_q_msg_data (mq, msg);
2453       vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
2454       svm_msg_q_free_msg (mq, msg);
2455       if (*num_ev == maxevents)
2456         {
2457           i += 1;
2458           break;
2459         }
2460     }
2461
2462   vec_delete (wrk->mq_msg_vector, i, 0);
2463
2464   return *num_ev;
2465 }
2466
2467 static int
2468 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
2469                            int maxevents, u32 n_evts, double wait_for_time)
2470 {
2471   vcl_cut_through_registration_t *cr;
2472   double total_wait = 0, wait_slice;
2473   int rv;
2474
2475   wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
2476   wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time;
2477
2478   do
2479     {
2480       vcl_ct_registration_lock (wrk);
2481       /* *INDENT-OFF* */
2482       pool_foreach (cr, wrk->cut_through_registrations, ({
2483         vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &n_evts);
2484       }));
2485       /* *INDENT-ON* */
2486       vcl_ct_registration_unlock (wrk);
2487
2488       rv = vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events,
2489                                      maxevents, n_evts ? 0 : wait_slice,
2490                                      &n_evts);
2491       if (rv)
2492         total_wait += wait_slice;
2493       if (n_evts)
2494         return n_evts;
2495     }
2496   while (total_wait < wait_for_time);
2497   return n_evts;
2498 }
2499
2500 static int
2501 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
2502                            int maxevents, u32 n_evts, double wait_for_time)
2503 {
2504   vcl_mq_evt_conn_t *mqc;
2505   int __clib_unused n_read;
2506   int n_mq_evts, i;
2507   u64 buf;
2508
2509   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2510   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2511                           vec_len (wrk->mq_events), wait_for_time);
2512   for (i = 0; i < n_mq_evts; i++)
2513     {
2514       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2515       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2516       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
2517     }
2518
2519   return (int) n_evts;
2520 }
2521
2522 int
2523 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
2524                    int maxevents, double wait_for_time)
2525 {
2526   vcl_worker_t *wrk = vcl_worker_get_current ();
2527   vcl_session_t *vep_session;
2528   u32 n_evts = 0;
2529   int i;
2530
2531   if (PREDICT_FALSE (maxevents <= 0))
2532     {
2533       clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
2534                     getpid (), maxevents);
2535       return VPPCOM_EINVAL;
2536     }
2537
2538   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2539   if (!vep_session)
2540     return VPPCOM_EBADFD;
2541
2542   if (PREDICT_FALSE (!vep_session->is_vep))
2543     {
2544       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2545                     getpid (), vep_handle);
2546       return VPPCOM_EINVAL;
2547     }
2548
2549   memset (events, 0, sizeof (*events) * maxevents);
2550
2551   if (vec_len (wrk->unhandled_evts_vector))
2552     {
2553       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2554         {
2555           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
2556                                           events, &n_evts);
2557           if (n_evts == maxevents)
2558             {
2559               i += 1;
2560               break;
2561             }
2562         }
2563
2564       vec_delete (wrk->unhandled_evts_vector, i, 0);
2565     }
2566
2567   if (vcm->cfg.use_mq_eventfd)
2568     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
2569                                       wait_for_time);
2570
2571   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
2572                                     wait_for_time);
2573 }
2574
2575 int
2576 vppcom_session_attr (uint32_t session_handle, uint32_t op,
2577                      void *buffer, uint32_t * buflen)
2578 {
2579   vcl_worker_t *wrk = vcl_worker_get_current ();
2580   vcl_session_t *session;
2581   int rv = VPPCOM_OK;
2582   u32 *flags = buffer;
2583   vppcom_endpt_t *ep = buffer;
2584
2585   session = vcl_session_get_w_handle (wrk, session_handle);
2586   if (!session)
2587     return VPPCOM_EBADFD;
2588
2589   switch (op)
2590     {
2591     case VPPCOM_ATTR_GET_NREAD:
2592       rv = vppcom_session_read_ready (session);
2593       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d",
2594             getpid (), rv);
2595       break;
2596
2597     case VPPCOM_ATTR_GET_NWRITE:
2598       rv = vppcom_session_write_ready (session);
2599       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
2600             getpid (), session_handle, rv);
2601       break;
2602
2603     case VPPCOM_ATTR_GET_FLAGS:
2604       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
2605         {
2606           *flags = O_RDWR | (VCL_SESS_ATTR_TEST (session->attr,
2607                                                  VCL_SESS_ATTR_NONBLOCK));
2608           *buflen = sizeof (*flags);
2609           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_FLAGS: sid %u, flags = 0x%08x, "
2610                 "is_nonblocking = %u", getpid (),
2611                 session_handle, *flags,
2612                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2613         }
2614       else
2615         rv = VPPCOM_EINVAL;
2616       break;
2617
2618     case VPPCOM_ATTR_SET_FLAGS:
2619       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
2620         {
2621           if (*flags & O_NONBLOCK)
2622             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
2623           else
2624             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
2625
2626           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_FLAGS: sid %u, flags = 0x%08x,"
2627                 " is_nonblocking = %u",
2628                 getpid (), session_handle, *flags,
2629                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2630         }
2631       else
2632         rv = VPPCOM_EINVAL;
2633       break;
2634
2635     case VPPCOM_ATTR_GET_PEER_ADDR:
2636       if (PREDICT_TRUE (buffer && buflen &&
2637                         (*buflen >= sizeof (*ep)) && ep->ip))
2638         {
2639           ep->is_ip4 = session->transport.is_ip4;
2640           ep->port = session->transport.rmt_port;
2641           if (session->transport.is_ip4)
2642             clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
2643                          sizeof (ip4_address_t));
2644           else
2645             clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
2646                          sizeof (ip6_address_t));
2647           *buflen = sizeof (*ep);
2648           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_PEER_ADDR: sid %u, is_ip4 = %u, "
2649                 "addr = %U, port %u", getpid (),
2650                 session_handle, ep->is_ip4, format_ip46_address,
2651                 &session->transport.rmt_ip,
2652                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2653                 clib_net_to_host_u16 (ep->port));
2654         }
2655       else
2656         rv = VPPCOM_EINVAL;
2657       break;
2658
2659     case VPPCOM_ATTR_GET_LCL_ADDR:
2660       if (PREDICT_TRUE (buffer && buflen &&
2661                         (*buflen >= sizeof (*ep)) && ep->ip))
2662         {
2663           ep->is_ip4 = session->transport.is_ip4;
2664           ep->port = session->transport.lcl_port;
2665           if (session->transport.is_ip4)
2666             clib_memcpy (ep->ip, &session->transport.lcl_ip.ip4,
2667                          sizeof (ip4_address_t));
2668           else
2669             clib_memcpy (ep->ip, &session->transport.lcl_ip.ip6,
2670                          sizeof (ip6_address_t));
2671           *buflen = sizeof (*ep);
2672           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_LCL_ADDR: sid %u, is_ip4 = %u,"
2673                 " addr = %U port %d", getpid (),
2674                 session_handle, ep->is_ip4, format_ip46_address,
2675                 &session->transport.lcl_ip,
2676                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2677                 clib_net_to_host_u16 (ep->port));
2678         }
2679       else
2680         rv = VPPCOM_EINVAL;
2681       break;
2682
2683     case VPPCOM_ATTR_GET_LIBC_EPFD:
2684       rv = session->libc_epfd;
2685       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d",
2686             getpid (), rv);
2687       break;
2688
2689     case VPPCOM_ATTR_SET_LIBC_EPFD:
2690       if (PREDICT_TRUE (buffer && buflen &&
2691                         (*buflen == sizeof (session->libc_epfd))))
2692         {
2693           session->libc_epfd = *(int *) buffer;
2694           *buflen = sizeof (session->libc_epfd);
2695
2696           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, "
2697                 "buflen %d", getpid (), session->libc_epfd, *buflen);
2698         }
2699       else
2700         rv = VPPCOM_EINVAL;
2701       break;
2702
2703     case VPPCOM_ATTR_GET_PROTOCOL:
2704       if (buffer && buflen && (*buflen >= sizeof (int)))
2705         {
2706           *(int *) buffer = session->session_type;
2707           *buflen = sizeof (int);
2708
2709           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
2710                 getpid (), *(int *) buffer, *(int *) buffer ? "UDP" : "TCP",
2711                 *buflen);
2712         }
2713       else
2714         rv = VPPCOM_EINVAL;
2715       break;
2716
2717     case VPPCOM_ATTR_GET_LISTEN:
2718       if (buffer && buflen && (*buflen >= sizeof (int)))
2719         {
2720           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2721                                                 VCL_SESS_ATTR_LISTEN);
2722           *buflen = sizeof (int);
2723
2724           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LISTEN: %d, buflen %d",
2725                 getpid (), *(int *) buffer, *buflen);
2726         }
2727       else
2728         rv = VPPCOM_EINVAL;
2729       break;
2730
2731     case VPPCOM_ATTR_GET_ERROR:
2732       if (buffer && buflen && (*buflen >= sizeof (int)))
2733         {
2734           *(int *) buffer = 0;
2735           *buflen = sizeof (int);
2736
2737           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
2738                 getpid (), *(int *) buffer, *buflen);
2739         }
2740       else
2741         rv = VPPCOM_EINVAL;
2742       break;
2743
2744     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
2745       if (buffer && buflen && (*buflen >= sizeof (u32)))
2746         {
2747
2748           /* VPP-TBD */
2749           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
2750                                 session->tx_fifo ? session->tx_fifo->nitems :
2751                                 vcm->cfg.tx_fifo_size);
2752           *buflen = sizeof (u32);
2753
2754           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), "
2755                 "buflen %d, #VPP-TBD#", getpid (),
2756                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
2757         }
2758       else
2759         rv = VPPCOM_EINVAL;
2760       break;
2761
2762     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
2763       if (buffer && buflen && (*buflen == sizeof (u32)))
2764         {
2765           /* VPP-TBD */
2766           session->sndbuf_size = *(u32 *) buffer;
2767           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), "
2768                 "buflen %d, #VPP-TBD#", getpid (),
2769                 session->sndbuf_size, session->sndbuf_size, *buflen);
2770         }
2771       else
2772         rv = VPPCOM_EINVAL;
2773       break;
2774
2775     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
2776       if (buffer && buflen && (*buflen >= sizeof (u32)))
2777         {
2778
2779           /* VPP-TBD */
2780           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
2781                                 session->rx_fifo ? session->rx_fifo->nitems :
2782                                 vcm->cfg.rx_fifo_size);
2783           *buflen = sizeof (u32);
2784
2785           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), "
2786                 "buflen %d, #VPP-TBD#", getpid (),
2787                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
2788         }
2789       else
2790         rv = VPPCOM_EINVAL;
2791       break;
2792
2793     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
2794       if (buffer && buflen && (*buflen == sizeof (u32)))
2795         {
2796           /* VPP-TBD */
2797           session->rcvbuf_size = *(u32 *) buffer;
2798           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), "
2799                 "buflen %d, #VPP-TBD#", getpid (),
2800                 session->sndbuf_size, session->sndbuf_size, *buflen);
2801         }
2802       else
2803         rv = VPPCOM_EINVAL;
2804       break;
2805
2806     case VPPCOM_ATTR_GET_REUSEADDR:
2807       if (buffer && buflen && (*buflen >= sizeof (int)))
2808         {
2809           /* VPP-TBD */
2810           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2811                                                 VCL_SESS_ATTR_REUSEADDR);
2812           *buflen = sizeof (int);
2813
2814           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEADDR: %d, "
2815                 "buflen %d, #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2816         }
2817       else
2818         rv = VPPCOM_EINVAL;
2819       break;
2820
2821     case VPPCOM_ATTR_SET_REUSEADDR:
2822       if (buffer && buflen && (*buflen == sizeof (int)) &&
2823           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
2824         {
2825           /* VPP-TBD */
2826           if (*(int *) buffer)
2827             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEADDR);
2828           else
2829             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEADDR);
2830
2831           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d,"
2832                 " #VPP-TBD#", getpid (),
2833                 VCL_SESS_ATTR_TEST (session->attr,
2834                                     VCL_SESS_ATTR_REUSEADDR), *buflen);
2835         }
2836       else
2837         rv = VPPCOM_EINVAL;
2838       break;
2839
2840     case VPPCOM_ATTR_GET_REUSEPORT:
2841       if (buffer && buflen && (*buflen >= sizeof (int)))
2842         {
2843           /* VPP-TBD */
2844           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2845                                                 VCL_SESS_ATTR_REUSEPORT);
2846           *buflen = sizeof (int);
2847
2848           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d,"
2849                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2850         }
2851       else
2852         rv = VPPCOM_EINVAL;
2853       break;
2854
2855     case VPPCOM_ATTR_SET_REUSEPORT:
2856       if (buffer && buflen && (*buflen == sizeof (int)) &&
2857           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
2858         {
2859           /* VPP-TBD */
2860           if (*(int *) buffer)
2861             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEPORT);
2862           else
2863             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEPORT);
2864
2865           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d,"
2866                 " #VPP-TBD#", getpid (),
2867                 VCL_SESS_ATTR_TEST (session->attr,
2868                                     VCL_SESS_ATTR_REUSEPORT), *buflen);
2869         }
2870       else
2871         rv = VPPCOM_EINVAL;
2872       break;
2873
2874     case VPPCOM_ATTR_GET_BROADCAST:
2875       if (buffer && buflen && (*buflen >= sizeof (int)))
2876         {
2877           /* VPP-TBD */
2878           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2879                                                 VCL_SESS_ATTR_BROADCAST);
2880           *buflen = sizeof (int);
2881
2882           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d,"
2883                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2884         }
2885       else
2886         rv = VPPCOM_EINVAL;
2887       break;
2888
2889     case VPPCOM_ATTR_SET_BROADCAST:
2890       if (buffer && buflen && (*buflen == sizeof (int)))
2891         {
2892           /* VPP-TBD */
2893           if (*(int *) buffer)
2894             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_BROADCAST);
2895           else
2896             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_BROADCAST);
2897
2898           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, "
2899                 "#VPP-TBD#", getpid (),
2900                 VCL_SESS_ATTR_TEST (session->attr,
2901                                     VCL_SESS_ATTR_BROADCAST), *buflen);
2902         }
2903       else
2904         rv = VPPCOM_EINVAL;
2905       break;
2906
2907     case VPPCOM_ATTR_GET_V6ONLY:
2908       if (buffer && buflen && (*buflen >= sizeof (int)))
2909         {
2910           /* VPP-TBD */
2911           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2912                                                 VCL_SESS_ATTR_V6ONLY);
2913           *buflen = sizeof (int);
2914
2915           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, "
2916                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2917         }
2918       else
2919         rv = VPPCOM_EINVAL;
2920       break;
2921
2922     case VPPCOM_ATTR_SET_V6ONLY:
2923       if (buffer && buflen && (*buflen == sizeof (int)))
2924         {
2925           /* VPP-TBD */
2926           if (*(int *) buffer)
2927             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_V6ONLY);
2928           else
2929             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_V6ONLY);
2930
2931           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, "
2932                 "#VPP-TBD#", getpid (),
2933                 VCL_SESS_ATTR_TEST (session->attr,
2934                                     VCL_SESS_ATTR_V6ONLY), *buflen);
2935         }
2936       else
2937         rv = VPPCOM_EINVAL;
2938       break;
2939
2940     case VPPCOM_ATTR_GET_KEEPALIVE:
2941       if (buffer && buflen && (*buflen >= sizeof (int)))
2942         {
2943           /* VPP-TBD */
2944           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2945                                                 VCL_SESS_ATTR_KEEPALIVE);
2946           *buflen = sizeof (int);
2947
2948           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, "
2949                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2950         }
2951       else
2952         rv = VPPCOM_EINVAL;
2953       break;
2954
2955     case VPPCOM_ATTR_SET_KEEPALIVE:
2956       if (buffer && buflen && (*buflen == sizeof (int)))
2957         {
2958           /* VPP-TBD */
2959           if (*(int *) buffer)
2960             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_KEEPALIVE);
2961           else
2962             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_KEEPALIVE);
2963
2964           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, "
2965                 "#VPP-TBD#", getpid (),
2966                 VCL_SESS_ATTR_TEST (session->attr,
2967                                     VCL_SESS_ATTR_KEEPALIVE), *buflen);
2968         }
2969       else
2970         rv = VPPCOM_EINVAL;
2971       break;
2972
2973     case VPPCOM_ATTR_GET_TCP_NODELAY:
2974       if (buffer && buflen && (*buflen >= sizeof (int)))
2975         {
2976           /* VPP-TBD */
2977           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2978                                                 VCL_SESS_ATTR_TCP_NODELAY);
2979           *buflen = sizeof (int);
2980
2981           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, "
2982                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2983         }
2984       else
2985         rv = VPPCOM_EINVAL;
2986       break;
2987
2988     case VPPCOM_ATTR_SET_TCP_NODELAY:
2989       if (buffer && buflen && (*buflen == sizeof (int)))
2990         {
2991           /* VPP-TBD */
2992           if (*(int *) buffer)
2993             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
2994           else
2995             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
2996
2997           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, "
2998                 "#VPP-TBD#", getpid (),
2999                 VCL_SESS_ATTR_TEST (session->attr,
3000                                     VCL_SESS_ATTR_TCP_NODELAY), *buflen);
3001         }
3002       else
3003         rv = VPPCOM_EINVAL;
3004       break;
3005
3006     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3007       if (buffer && buflen && (*buflen >= sizeof (int)))
3008         {
3009           /* VPP-TBD */
3010           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3011                                                 VCL_SESS_ATTR_TCP_KEEPIDLE);
3012           *buflen = sizeof (int);
3013
3014           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, "
3015                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3016         }
3017       else
3018         rv = VPPCOM_EINVAL;
3019       break;
3020
3021     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3022       if (buffer && buflen && (*buflen == sizeof (int)))
3023         {
3024           /* VPP-TBD */
3025           if (*(int *) buffer)
3026             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3027           else
3028             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3029
3030           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, "
3031                 "#VPP-TBD#", getpid (),
3032                 VCL_SESS_ATTR_TEST (session->attr,
3033                                     VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3034         }
3035       else
3036         rv = VPPCOM_EINVAL;
3037       break;
3038
3039     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3040       if (buffer && buflen && (*buflen >= sizeof (int)))
3041         {
3042           /* VPP-TBD */
3043           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3044                                                 VCL_SESS_ATTR_TCP_KEEPINTVL);
3045           *buflen = sizeof (int);
3046
3047           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, "
3048                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3049         }
3050       else
3051         rv = VPPCOM_EINVAL;
3052       break;
3053
3054     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3055       if (buffer && buflen && (*buflen == sizeof (int)))
3056         {
3057           /* VPP-TBD */
3058           if (*(int *) buffer)
3059             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3060           else
3061             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3062
3063           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, "
3064                 "#VPP-TBD#", getpid (),
3065                 VCL_SESS_ATTR_TEST (session->attr,
3066                                     VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3067         }
3068       else
3069         rv = VPPCOM_EINVAL;
3070       break;
3071
3072     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3073       if (buffer && buflen && (*buflen >= sizeof (u32)))
3074         {
3075           /* VPP-TBD */
3076           *(u32 *) buffer = session->user_mss;
3077           *buflen = sizeof (int);
3078
3079           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d,"
3080                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3081         }
3082       else
3083         rv = VPPCOM_EINVAL;
3084       break;
3085
3086     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3087       if (buffer && buflen && (*buflen == sizeof (u32)))
3088         {
3089           /* VPP-TBD */
3090           session->user_mss = *(u32 *) buffer;
3091
3092           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, "
3093                 "#VPP-TBD#", getpid (), session->user_mss, *buflen);
3094         }
3095       else
3096         rv = VPPCOM_EINVAL;
3097       break;
3098
3099     default:
3100       rv = VPPCOM_EINVAL;
3101       break;
3102     }
3103
3104   return rv;
3105 }
3106
3107 int
3108 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3109                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3110 {
3111   vcl_worker_t *wrk = vcl_worker_get_current ();
3112   int rv = VPPCOM_OK;
3113   vcl_session_t *session = 0;
3114
3115   if (ep)
3116     {
3117       session = vcl_session_get_w_handle (wrk, session_handle);
3118       if (PREDICT_FALSE (!session))
3119         {
3120           VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
3121                 getpid (), session_handle);
3122           return VPPCOM_EBADFD;
3123         }
3124       ep->is_ip4 = session->transport.is_ip4;
3125       ep->port = session->transport.rmt_port;
3126     }
3127
3128   if (flags == 0)
3129     rv = vppcom_session_read (session_handle, buffer, buflen);
3130   else if (flags & MSG_PEEK)
3131     rv = vppcom_session_peek (session_handle, buffer, buflen);
3132   else
3133     {
3134       clib_warning ("VCL<%d>: Unsupport flags for recvfrom %d",
3135                     getpid (), flags);
3136       return VPPCOM_EAFNOSUPPORT;
3137     }
3138
3139   if (ep)
3140     {
3141       if (session->transport.is_ip4)
3142         clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
3143                      sizeof (ip4_address_t));
3144       else
3145         clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
3146                      sizeof (ip6_address_t));
3147     }
3148
3149   return rv;
3150 }
3151
3152 int
3153 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3154                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3155 {
3156   if (!buffer)
3157     return VPPCOM_EINVAL;
3158
3159   if (ep)
3160     {
3161       // TBD
3162       return VPPCOM_EINVAL;
3163     }
3164
3165   if (flags)
3166     {
3167       // TBD check the flags and do the right thing
3168       VDBG (2, "VCL<%d>: handling flags 0x%u (%d) not implemented yet.",
3169             getpid (), flags, flags);
3170     }
3171
3172   return (vppcom_session_write (session_handle, buffer, buflen));
3173 }
3174
3175 int
3176 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3177 {
3178   vcl_worker_t *wrk = vcl_worker_get_current ();
3179   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3180   u32 i, keep_trying = 1;
3181   int rv, num_ev = 0;
3182
3183   VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
3184         getpid (), vp, n_sids, wait_for_time);
3185
3186   if (!vp)
3187     return VPPCOM_EFAULT;
3188
3189   do
3190     {
3191       vcl_session_t *session;
3192
3193       for (i = 0; i < n_sids; i++)
3194         {
3195           ASSERT (vp[i].revents);
3196
3197           session = vcl_session_get (wrk, vp[i].sid);
3198           if (!session)
3199             continue;
3200
3201           if (*vp[i].revents)
3202             *vp[i].revents = 0;
3203
3204           if (POLLIN & vp[i].events)
3205             {
3206               rv = vppcom_session_read_ready (session);
3207               if (rv > 0)
3208                 {
3209                   *vp[i].revents |= POLLIN;
3210                   num_ev++;
3211                 }
3212               else if (rv < 0)
3213                 {
3214                   switch (rv)
3215                     {
3216                     case VPPCOM_ECONNRESET:
3217                       *vp[i].revents = POLLHUP;
3218                       break;
3219
3220                     default:
3221                       *vp[i].revents = POLLERR;
3222                       break;
3223                     }
3224                   num_ev++;
3225                 }
3226             }
3227
3228           if (POLLOUT & vp[i].events)
3229             {
3230               rv = vppcom_session_write_ready (session);
3231               if (rv > 0)
3232                 {
3233                   *vp[i].revents |= POLLOUT;
3234                   num_ev++;
3235                 }
3236               else if (rv < 0)
3237                 {
3238                   switch (rv)
3239                     {
3240                     case VPPCOM_ECONNRESET:
3241                       *vp[i].revents = POLLHUP;
3242                       break;
3243
3244                     default:
3245                       *vp[i].revents = POLLERR;
3246                       break;
3247                     }
3248                   num_ev++;
3249                 }
3250             }
3251
3252           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3253             {
3254               *vp[i].revents = POLLNVAL;
3255               num_ev++;
3256             }
3257         }
3258       if (wait_for_time != -1)
3259         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3260     }
3261   while ((num_ev == 0) && keep_trying);
3262
3263   if (VPPCOM_DEBUG > 3)
3264     {
3265       clib_warning ("VCL<%d>: returning %d", getpid (), num_ev);
3266       for (i = 0; i < n_sids; i++)
3267         {
3268           clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
3269                         ".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
3270                         vp[i].events, *vp[i].revents);
3271         }
3272     }
3273   return num_ev;
3274 }
3275
3276 int
3277 vppcom_mq_epoll_fd (void)
3278 {
3279   vcl_worker_t *wrk = vcl_worker_get_current ();
3280   return wrk->mqs_epfd;
3281 }
3282
3283 int
3284 vppcom_session_index (uint32_t session_handle)
3285 {
3286   return session_handle & 0xFFFFFF;
3287 }
3288
3289 int
3290 vppcom_worker_register (void)
3291 {
3292   if (vcl_worker_alloc_and_init ())
3293     return VPPCOM_OK;
3294   return VPPCOM_EEXIST;
3295 }
3296
3297 /*
3298  * fd.io coding-style-patch-verification: ON
3299  *
3300  * Local Variables:
3301  * eval: (c-set-style "gnu")
3302  * End:
3303  */