vcl: register workers in order
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <svm/svm_fifo_segment.h>
19 #include <vcl/vppcom.h>
20 #include <vcl/vcl_event.h>
21 #include <vcl/vcl_debug.h>
22 #include <vcl/vcl_private.h>
23
24 __thread uword __vcl_worker_index = ~0;
25
26 static u8 not_ready;
27
28 void
29 sigsegv_signal (int signum)
30 {
31   not_ready = 1;
32 }
33
34 static void
35 vcl_wait_for_memory (void *mem)
36 {
37   u8 __clib_unused test;
38   if (vcm->mounting_segment)
39     {
40       while (vcm->mounting_segment)
41         ;
42       return;
43     }
44   if (1 || vcm->debug)
45     {
46       usleep (1e5);
47       return;
48     }
49   if (signal (SIGSEGV, sigsegv_signal))
50     {
51       perror ("signal()");
52       return;
53     }
54   not_ready = 0;
55
56 again:
57   test = *(u8 *) mem;
58   if (not_ready)
59     {
60       not_ready = 0;
61       usleep (1);
62       goto again;
63     }
64
65   signal (SIGSEGV, SIG_DFL);
66 }
67
68 const char *
69 vppcom_session_state_str (session_state_t state)
70 {
71   char *st;
72
73   switch (state)
74     {
75     case STATE_START:
76       st = "STATE_START";
77       break;
78
79     case STATE_CONNECT:
80       st = "STATE_CONNECT";
81       break;
82
83     case STATE_LISTEN:
84       st = "STATE_LISTEN";
85       break;
86
87     case STATE_ACCEPT:
88       st = "STATE_ACCEPT";
89       break;
90
91     case STATE_CLOSE_ON_EMPTY:
92       st = "STATE_CLOSE_ON_EMPTY";
93       break;
94
95     case STATE_DISCONNECT:
96       st = "STATE_DISCONNECT";
97       break;
98
99     case STATE_FAILED:
100       st = "STATE_FAILED";
101       break;
102
103     default:
104       st = "UNKNOWN_STATE";
105       break;
106     }
107
108   return st;
109 }
110
111 u8 *
112 format_ip4_address (u8 * s, va_list * args)
113 {
114   u8 *a = va_arg (*args, u8 *);
115   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
116 }
117
118 u8 *
119 format_ip6_address (u8 * s, va_list * args)
120 {
121   ip6_address_t *a = va_arg (*args, ip6_address_t *);
122   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
123
124   i_max_n_zero = ARRAY_LEN (a->as_u16);
125   max_n_zeros = 0;
126   i_first_zero = i_max_n_zero;
127   n_zeros = 0;
128   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
129     {
130       u32 is_zero = a->as_u16[i] == 0;
131       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
132         {
133           i_first_zero = i;
134           n_zeros = 0;
135         }
136       n_zeros += is_zero;
137       if ((!is_zero && n_zeros > max_n_zeros)
138           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
139         {
140           i_max_n_zero = i_first_zero;
141           max_n_zeros = n_zeros;
142           i_first_zero = ARRAY_LEN (a->as_u16);
143           n_zeros = 0;
144         }
145     }
146
147   last_double_colon = 0;
148   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
149     {
150       if (i == i_max_n_zero && max_n_zeros > 1)
151         {
152           s = format (s, "::");
153           i += max_n_zeros - 1;
154           last_double_colon = 1;
155         }
156       else
157         {
158           s = format (s, "%s%x",
159                       (last_double_colon || i == 0) ? "" : ":",
160                       clib_net_to_host_u16 (a->as_u16[i]));
161           last_double_colon = 0;
162         }
163     }
164
165   return s;
166 }
167
168 /* Format an IP46 address. */
169 u8 *
170 format_ip46_address (u8 * s, va_list * args)
171 {
172   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
173   ip46_type_t type = va_arg (*args, ip46_type_t);
174   int is_ip4 = 1;
175
176   switch (type)
177     {
178     case IP46_TYPE_ANY:
179       is_ip4 = ip46_address_is_ip4 (ip46);
180       break;
181     case IP46_TYPE_IP4:
182       is_ip4 = 1;
183       break;
184     case IP46_TYPE_IP6:
185       is_ip4 = 0;
186       break;
187     }
188
189   return is_ip4 ?
190     format (s, "%U", format_ip4_address, &ip46->ip4) :
191     format (s, "%U", format_ip6_address, &ip46->ip6);
192 }
193
194 /*
195  * VPPCOM Utility Functions
196  */
197
198
199 static svm_msg_q_t *
200 vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
201 {
202   if (vcl_session_is_ct (s))
203     return wrk->vpp_event_queues[0];
204   else
205     return wrk->vpp_event_queues[s->tx_fifo->master_thread_index];
206 }
207
208 static void
209 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
210                                  session_handle_t handle, int retval)
211 {
212   app_session_evt_t _app_evt, *app_evt = &_app_evt;
213   session_accepted_reply_msg_t *rmp;
214   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
215   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
216   rmp->handle = handle;
217   rmp->context = context;
218   rmp->retval = retval;
219   app_send_ctrl_evt_to_vpp (mq, app_evt);
220 }
221
222 static void
223 vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
224                                      session_handle_t handle, int retval)
225 {
226   app_session_evt_t _app_evt, *app_evt = &_app_evt;
227   session_disconnected_reply_msg_t *rmp;
228   app_alloc_ctrl_evt_to_vpp (mq, app_evt,
229                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
230   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
231   rmp->handle = handle;
232   rmp->context = context;
233   rmp->retval = retval;
234   app_send_ctrl_evt_to_vpp (mq, app_evt);
235 }
236
237 static void
238 vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
239                               session_handle_t handle, int retval)
240 {
241   app_session_evt_t _app_evt, *app_evt = &_app_evt;
242   session_reset_reply_msg_t *rmp;
243   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_RESET_REPLY);
244   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
245   rmp->handle = handle;
246   rmp->context = context;
247   rmp->retval = retval;
248   app_send_ctrl_evt_to_vpp (mq, app_evt);
249 }
250
251 static u32
252 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
253 {
254   vcl_session_t *session, *listen_session;
255   svm_fifo_t *rx_fifo, *tx_fifo;
256   u32 vpp_wrk_index;
257   svm_msg_q_t *evt_q;
258
259   session = vcl_session_alloc (wrk);
260
261   listen_session = vcl_session_table_lookup_listener (wrk,
262                                                       mp->listener_handle);
263   if (!listen_session)
264     {
265       svm_msg_q_t *evt_q;
266       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
267       clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
268                     "unknown vpp listener handle %llx",
269                     getpid (), mp->listener_handle);
270       vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
271                                        VNET_API_ERROR_INVALID_ARGUMENT);
272       vcl_session_free (wrk, session);
273       return VCL_INVALID_SESSION_INDEX;
274     }
275
276   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
277   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
278
279   if (mp->server_event_queue_address)
280     {
281       session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address,
282                                              svm_msg_q_t *);
283       session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
284                                              svm_msg_q_t *);
285       vcl_wait_for_memory (session->vpp_evt_q);
286       rx_fifo->master_session_index = session->session_index;
287       tx_fifo->master_session_index = session->session_index;
288       vec_validate (wrk->vpp_event_queues, 0);
289       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
290       wrk->vpp_event_queues[0] = evt_q;
291     }
292   else
293     {
294       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
295                                              svm_msg_q_t *);
296       rx_fifo->client_session_index = session->session_index;
297       tx_fifo->client_session_index = session->session_index;
298
299       vpp_wrk_index = tx_fifo->master_thread_index;
300       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
301       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
302     }
303
304   session->vpp_handle = mp->handle;
305   session->client_context = mp->context;
306   session->rx_fifo = rx_fifo;
307   session->tx_fifo = tx_fifo;
308
309   session->session_state = STATE_ACCEPT;
310   session->transport.rmt_port = mp->port;
311   session->transport.is_ip4 = mp->is_ip4;
312   clib_memcpy (&session->transport.rmt_ip, mp->ip, sizeof (ip46_address_t));
313
314   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
315   session->transport.lcl_port = listen_session->transport.lcl_port;
316   session->transport.lcl_ip = listen_session->transport.lcl_ip;
317   session->session_type = listen_session->session_type;
318   session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
319
320   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
321         " address %U port %d queue %p!", getpid (), mp->handle,
322         session->session_index,
323         mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip,
324         mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
325         clib_net_to_host_u16 (mp->port), session->vpp_evt_q);
326   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
327
328   return session->session_index;
329 }
330
331 static u32
332 vcl_session_connected_handler (vcl_worker_t * wrk,
333                                session_connected_msg_t * mp)
334 {
335   u32 session_index, vpp_wrk_index;
336   svm_fifo_t *rx_fifo, *tx_fifo;
337   vcl_session_t *session = 0;
338   svm_msg_q_t *evt_q;
339
340   session_index = mp->context;
341   session = vcl_session_get (wrk, session_index);
342   if (!session)
343     {
344       clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
345                     "Invalid session index (%u)!",
346                     getpid (), mp->handle, session_index);
347       return VCL_INVALID_SESSION_INDEX;
348     }
349   if (mp->retval)
350     {
351       clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
352                     mp->handle, session_index, format_api_error,
353                     ntohl (mp->retval));
354       session->session_state = STATE_FAILED;
355       session->vpp_handle = mp->handle;
356       return session_index;
357     }
358
359   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
360   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
361   vcl_wait_for_memory (rx_fifo);
362   rx_fifo->client_session_index = session_index;
363   tx_fifo->client_session_index = session_index;
364
365   if (mp->client_event_queue_address)
366     {
367       session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
368                                              svm_msg_q_t *);
369       session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
370                                              svm_msg_q_t *);
371
372       vec_validate (wrk->vpp_event_queues, 0);
373       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
374       wrk->vpp_event_queues[0] = evt_q;
375     }
376   else
377     {
378       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
379                                              svm_msg_q_t *);
380       vpp_wrk_index = tx_fifo->master_thread_index;
381       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
382       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
383     }
384
385   session->rx_fifo = rx_fifo;
386   session->tx_fifo = tx_fifo;
387   session->vpp_handle = mp->handle;
388   session->transport.is_ip4 = mp->is_ip4;
389   clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
390                sizeof (session->transport.lcl_ip));
391   session->transport.lcl_port = mp->lcl_port;
392   session->session_state = STATE_CONNECT;
393
394   /* Add it to lookup table */
395   hash_set (wrk->session_index_by_vpp_handles, mp->handle, session_index);
396
397   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: connect succeeded! "
398         "session_rx_fifo %p, refcnt %d, session_tx_fifo %p, refcnt %d",
399         getpid (), mp->handle, session_index, session->rx_fifo,
400         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
401
402   return session_index;
403 }
404
405 static u32
406 vcl_session_reset_handler (vcl_worker_t * wrk,
407                            session_reset_msg_t * reset_msg)
408 {
409   vcl_session_t *session;
410   u32 sid;
411
412   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
413   session = vcl_session_get (wrk, sid);
414   if (!session)
415     {
416       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
417       return VCL_INVALID_SESSION_INDEX;
418     }
419   session->session_state = STATE_CLOSE_ON_EMPTY;
420   VDBG (0, "reset handle 0x%llx, sid %u ", reset_msg->handle, sid);
421   vcl_send_session_reset_reply (vcl_session_vpp_evt_q (wrk, session),
422                                 vcm->my_client_index, reset_msg->handle, 0);
423   return sid;
424 }
425
426 static u32
427 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
428 {
429   vcl_session_t *session;
430   u32 sid = mp->context;
431
432   session = vcl_session_get (wrk, sid);
433   if (mp->retval)
434     {
435       VDBG (0, "VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: bind failed: %U",
436             getpid (), mp->handle, sid, format_api_error, ntohl (mp->retval));
437       if (session)
438         {
439           session->session_state = STATE_FAILED;
440           session->vpp_handle = mp->handle;
441           return sid;
442         }
443       else
444         {
445           clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
446                         "Invalid session index (%u)!",
447                         getpid (), mp->handle, sid);
448           return VCL_INVALID_SESSION_INDEX;
449         }
450     }
451
452   session->vpp_handle = mp->handle;
453   session->transport.is_ip4 = mp->lcl_is_ip4;
454   clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
455                sizeof (ip46_address_t));
456   session->transport.lcl_port = mp->lcl_port;
457   vcl_session_table_add_listener (wrk, mp->handle, sid);
458   session->session_state = STATE_LISTEN;
459
460   if (session->is_dgram)
461     {
462       svm_fifo_t *rx_fifo, *tx_fifo;
463       session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
464       rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
465       rx_fifo->client_session_index = sid;
466       tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
467       tx_fifo->client_session_index = sid;
468       session->rx_fifo = rx_fifo;
469       session->tx_fifo = tx_fifo;
470     }
471
472   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: bind succeeded!",
473         getpid (), mp->handle, sid);
474   return sid;
475 }
476
477 int
478 vcl_handle_mq_ctrl_event (vcl_worker_t * wrk, session_event_t * e)
479 {
480   session_accepted_msg_t *accepted_msg;
481   session_disconnected_msg_t *disconnected_msg;
482   vcl_session_msg_t *vcl_msg;
483   vcl_session_t *session;
484   u64 handle;
485   u32 sid;
486
487   switch (e->event_type)
488     {
489     case FIFO_EVENT_APP_RX:
490       clib_warning ("unhandled rx: sid %u (0x%x)",
491                     e->fifo->client_session_index,
492                     e->fifo->client_session_index);
493       break;
494     case SESSION_CTRL_EVT_ACCEPTED:
495       accepted_msg = (session_accepted_msg_t *) e->data;
496       handle = accepted_msg->listener_handle;
497       session = vcl_session_table_lookup_listener (wrk, handle);
498       if (!session)
499         {
500           clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
501                         "listener handle %llx", getpid (), handle);
502           break;
503         }
504
505       clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
506       vcl_msg->accepted_msg = *accepted_msg;
507       break;
508     case SESSION_CTRL_EVT_CONNECTED:
509       vcl_session_connected_handler (wrk,
510                                      (session_connected_msg_t *) e->data);
511       break;
512     case SESSION_CTRL_EVT_DISCONNECTED:
513       disconnected_msg = (session_disconnected_msg_t *) e->data;
514       sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
515       session = vcl_session_get (wrk, sid);
516       if (!session)
517         {
518           VDBG (0, "request to disconnect unknown handle 0x%llx",
519                 disconnected_msg->handle);
520           break;
521         }
522       session->session_state = STATE_DISCONNECT;
523       VDBG (0, "disconnected handle 0xllx, sid %u", disconnected_msg->handle,
524             sid);
525       break;
526     case SESSION_CTRL_EVT_RESET:
527       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
528       break;
529     case SESSION_CTRL_EVT_BOUND:
530       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
531       break;
532     default:
533       clib_warning ("unhandled %u", e->event_type);
534     }
535   return VPPCOM_OK;
536 }
537
538 static inline int
539 vppcom_wait_for_session_state_change (u32 session_index,
540                                       session_state_t state,
541                                       f64 wait_for_time)
542 {
543   vcl_worker_t *wrk = vcl_worker_get_current ();
544   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
545   vcl_session_t *volatile session;
546   svm_msg_q_msg_t msg;
547   session_event_t *e;
548
549   do
550     {
551       session = vcl_session_get (wrk, session_index);
552       if (PREDICT_FALSE (!session))
553         {
554           return VPPCOM_EBADFD;
555         }
556       if (session->session_state & state)
557         {
558           return VPPCOM_OK;
559         }
560       if (session->session_state & STATE_FAILED)
561         {
562           return VPPCOM_ECONNREFUSED;
563         }
564
565       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
566         continue;
567       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
568       vcl_handle_mq_ctrl_event (wrk, e);
569       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
570     }
571   while (clib_time_now (&wrk->clib_time) < timeout);
572
573   VDBG (0, "VCL<%d>: timeout waiting for state 0x%x (%s)", getpid (), state,
574         vppcom_session_state_str (state));
575   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
576
577   return VPPCOM_ETIMEDOUT;
578 }
579
580 static int
581 vppcom_app_session_enable (void)
582 {
583   int rv;
584
585   if (vcm->app_state != STATE_APP_ENABLED)
586     {
587       vppcom_send_session_enable_disable (1 /* is_enabled == TRUE */ );
588       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
589       if (PREDICT_FALSE (rv))
590         {
591           VDBG (0, "VCL<%d>: application session enable timed out! "
592                 "returning %d (%s)", getpid (), rv, vppcom_retval_str (rv));
593           return rv;
594         }
595     }
596   return VPPCOM_OK;
597 }
598
599 static int
600 vppcom_app_attach (void)
601 {
602   int rv;
603
604   vppcom_app_send_attach ();
605   rv = vcl_wait_for_app_state_change (STATE_APP_ATTACHED);
606   if (PREDICT_FALSE (rv))
607     {
608       VDBG (0, "VCL<%d>: application attach timed out! returning %d (%s)",
609             getpid (), rv, vppcom_retval_str (rv));
610       return rv;
611     }
612
613   return VPPCOM_OK;
614 }
615
616 static int
617 vppcom_session_unbind (u32 session_handle)
618 {
619   vcl_worker_t *wrk = vcl_worker_get_current ();
620   vcl_session_t *session = 0;
621   u64 vpp_handle;
622
623   session = vcl_session_get_w_handle (wrk, session_handle);
624   if (!session)
625     return VPPCOM_EBADFD;
626
627   vpp_handle = session->vpp_handle;
628   vcl_session_table_del_listener (wrk, vpp_handle);
629   session->vpp_handle = ~0;
630   session->session_state = STATE_DISCONNECT;
631
632   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
633         " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
634         vppcom_session_state_str (STATE_DISCONNECT));
635   vcl_evt (VCL_EVT_UNBIND, session);
636   vppcom_send_unbind_sock (vpp_handle);
637
638   return VPPCOM_OK;
639 }
640
641 static int
642 vppcom_session_disconnect (u32 session_handle)
643 {
644   vcl_worker_t *wrk = vcl_worker_get_current ();
645   svm_msg_q_t *vpp_evt_q;
646   vcl_session_t *session;
647   session_state_t state;
648   u64 vpp_handle;
649
650   session = vcl_session_get_w_handle (wrk, session_handle);
651   vpp_handle = session->vpp_handle;
652   state = session->session_state;
653
654   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u state 0x%x (%s)", getpid (),
655         vpp_handle, session_handle, state, vppcom_session_state_str (state));
656
657   if (PREDICT_FALSE (state & STATE_LISTEN))
658     {
659       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
660                     "Cannot disconnect a listen socket!",
661                     getpid (), vpp_handle, session_handle);
662       return VPPCOM_EBADFD;
663     }
664
665   if (state & STATE_CLOSE_ON_EMPTY)
666     {
667       vpp_evt_q = vcl_session_vpp_evt_q (wrk, session);
668       vcl_send_session_disconnected_reply (vpp_evt_q, vcm->my_client_index,
669                                            vpp_handle, 0);
670       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
671             "REPLY...", getpid (), vpp_handle, session_handle);
672     }
673   else
674     {
675       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...",
676             getpid (), vpp_handle, session_handle);
677       vppcom_send_disconnect_session (vpp_handle);
678     }
679
680   return VPPCOM_OK;
681 }
682
683 /*
684  * VPPCOM Public API functions
685  */
686 int
687 vppcom_app_create (char *app_name)
688 {
689   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
690   int rv;
691
692   if (!vcm->is_init)
693     {
694       vcm->is_init = 1;
695       vppcom_cfg (&vcm->cfg);
696       vcl_cfg = &vcm->cfg;
697
698       vcm->main_cpu = pthread_self ();
699       vppcom_init_error_string_table ();
700       svm_fifo_segment_main_init (vcl_cfg->segment_baseva,
701                                   20 /* timeout in secs */ );
702       pool_init_fixed (vcm->workers, vcl_cfg->max_workers);
703       clib_spinlock_init (&vcm->workers_lock);
704       vcl_worker_alloc_and_init ();
705     }
706
707   if (vcm->my_client_index == ~0)
708     {
709       /* API hookup and connect to VPP */
710       vppcom_api_hookup ();
711       vcl_elog_init (vcm);
712       vcm->app_state = STATE_APP_START;
713       rv = vppcom_connect_to_vpp (app_name);
714       if (rv)
715         {
716           clib_warning ("VCL<%d>: ERROR: couldn't connect to VPP!",
717                         getpid ());
718           return rv;
719         }
720
721       VDBG (0, "VCL<%d>: sending session enable", getpid ());
722       rv = vppcom_app_session_enable ();
723       if (rv)
724         {
725           clib_warning ("VCL<%d>: ERROR: vppcom_app_session_enable() "
726                         "failed!", getpid ());
727           return rv;
728         }
729
730       VDBG (0, "VCL<%d>: sending app attach", getpid ());
731       rv = vppcom_app_attach ();
732       if (rv)
733         {
734           clib_warning ("VCL<%d>: ERROR: vppcom_app_attach() failed!",
735                         getpid ());
736           return rv;
737         }
738
739       VDBG (0, "VCL<%d>: app_name '%s', my_client_index %d (0x%x)",
740             getpid (), app_name, vcm->my_client_index, vcm->my_client_index);
741     }
742
743   return VPPCOM_OK;
744 }
745
746 void
747 vppcom_app_destroy (void)
748 {
749   int rv;
750   f64 orig_app_timeout;
751
752   if (vcm->my_client_index == ~0)
753     return;
754
755   VDBG (0, "VCL<%d>: detaching from VPP, my_client_index %d (0x%x)",
756         getpid (), vcm->my_client_index, vcm->my_client_index);
757   vcl_evt (VCL_EVT_DETACH, vcm);
758
759   vppcom_app_send_detach ();
760   orig_app_timeout = vcm->cfg.app_timeout;
761   vcm->cfg.app_timeout = 2.0;
762   rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
763   vcm->cfg.app_timeout = orig_app_timeout;
764   if (PREDICT_FALSE (rv))
765     VDBG (0, "VCL<%d>: application detach timed out! returning %d (%s)",
766           getpid (), rv, vppcom_retval_str (rv));
767
768   vcl_elog_stop (vcm);
769   vl_client_disconnect_from_vlib ();
770   vcm->my_client_index = ~0;
771   vcm->app_state = STATE_APP_START;
772 }
773
774 int
775 vppcom_session_create (u8 proto, u8 is_nonblocking)
776 {
777   vcl_worker_t *wrk = vcl_worker_get_current ();
778   vcl_session_t *session;
779
780   session = vcl_session_alloc (wrk);
781
782   session->session_type = proto;
783   session->session_state = STATE_START;
784   session->vpp_handle = ~0;
785   session->is_dgram = proto == VPPCOM_PROTO_UDP;
786
787   if (is_nonblocking)
788     VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
789
790   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
791            is_nonblocking, session_index);
792
793   VDBG (0, "VCL<%d>: sid %u", getpid (), session->session_index);
794
795   return vcl_session_handle (session);
796 }
797
798 int
799 vppcom_session_close (uint32_t session_handle)
800 {
801   vcl_worker_t *wrk = vcl_worker_get_current ();
802   vcl_session_t *session = 0;
803   u8 is_vep, is_vep_session;
804   session_state_t state;
805   u32 next_sh, vep_sh;
806   int rv = VPPCOM_OK;
807   u64 vpp_handle;
808
809   session = vcl_session_get_w_handle (wrk, session_handle);
810   if (!session)
811     return VPPCOM_EBADFD;
812
813   is_vep = session->is_vep;
814   is_vep_session = session->is_vep_session;
815   next_sh = session->vep.next_sh;
816   vep_sh = session->vep.vep_sh;
817   state = session->session_state;
818   vpp_handle = session->vpp_handle;
819
820   if (VPPCOM_DEBUG > 0)
821     {
822       if (is_vep)
823         clib_warning ("VCL<%d>: vep_idx %u / sid %u: "
824                       "closing epoll session...",
825                       getpid (), session_handle, session_handle);
826       else
827         clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %d: "
828                       "closing session...",
829                       getpid (), vpp_handle, session_handle);
830     }
831
832   if (is_vep)
833     {
834       while (next_sh != ~0)
835         {
836           rv = vppcom_epoll_ctl (session_handle, EPOLL_CTL_DEL, next_sh, 0);
837           if (PREDICT_FALSE (rv < 0))
838             VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL "
839                   "vep_idx %u failed! rv %d (%s)",
840                   getpid (), vpp_handle, next_sh, vep_sh,
841                   rv, vppcom_retval_str (rv));
842
843           next_sh = session->vep.next_sh;
844         }
845     }
846   else
847     {
848       if (is_vep_session)
849         {
850           rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, session_handle, 0);
851           if (rv < 0)
852             VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL "
853                   "vep_idx %u failed! rv %d (%s)",
854                   getpid (), vpp_handle, session_handle,
855                   vep_sh, rv, vppcom_retval_str (rv));
856         }
857
858       if (state & STATE_LISTEN)
859         {
860           rv = vppcom_session_unbind (session_handle);
861           if (PREDICT_FALSE (rv < 0))
862             VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: listener unbind "
863                   "failed! rv %d (%s)",
864                   getpid (), vpp_handle, session_handle,
865                   rv, vppcom_retval_str (rv));
866         }
867       else if (state & STATE_OPEN)
868         {
869           rv = vppcom_session_disconnect (session_handle);
870           if (PREDICT_FALSE (rv < 0))
871             clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
872                           "session disconnect failed! rv %d (%s)",
873                           getpid (), vpp_handle, session_handle,
874                           rv, vppcom_retval_str (rv));
875         }
876     }
877
878   if (vcl_session_is_ct (session))
879     {
880       vcl_cut_through_registration_t *ctr;
881       uword mq_addr;
882
883       mq_addr = pointer_to_uword (session->our_evt_q);
884       ctr = vcl_ct_registration_lock_and_lookup (wrk, mq_addr);
885       ASSERT (ctr);
886       if (ctr->epoll_evt_conn_index != ~0)
887         vcl_mq_epoll_del_evfd (wrk, ctr->epoll_evt_conn_index);
888       VDBG (0, "Removing ct registration %u",
889             vcl_ct_registration_index (wrk, ctr));
890       vcl_ct_registration_del (wrk, ctr);
891       vcl_ct_registration_lookup_del (wrk, mq_addr);
892       vcl_ct_registration_unlock (wrk);
893     }
894
895   if (vpp_handle != ~0)
896     {
897       vcl_session_table_del_vpp_handle (wrk, vpp_handle);
898     }
899   vcl_session_free (wrk, session);
900
901   if (VPPCOM_DEBUG > 0)
902     {
903       if (is_vep)
904         clib_warning ("VCL<%d>: vep_idx %u / sid %u: epoll session removed.",
905                       getpid (), session_handle, session_handle);
906       else
907         clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: session removed.",
908                       getpid (), vpp_handle, session_handle);
909     }
910
911   vcl_evt (VCL_EVT_CLOSE, session, rv);
912
913   return rv;
914 }
915
916 int
917 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
918 {
919   vcl_worker_t *wrk = vcl_worker_get_current ();
920   vcl_session_t *session = 0;
921
922   if (!ep || !ep->ip)
923     return VPPCOM_EINVAL;
924
925   session = vcl_session_get_w_handle (wrk, session_handle);
926   if (!session)
927     return VPPCOM_EBADFD;
928
929   if (session->is_vep)
930     {
931       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
932                     "bind to an epoll session!", getpid (), session_handle);
933       return VPPCOM_EBADFD;
934     }
935
936   session->transport.is_ip4 = ep->is_ip4;
937   if (ep->is_ip4)
938     clib_memcpy (&session->transport.lcl_ip.ip4, ep->ip,
939                  sizeof (ip4_address_t));
940   else
941     clib_memcpy (&session->transport.lcl_ip.ip6, ep->ip,
942                  sizeof (ip6_address_t));
943   session->transport.lcl_port = ep->port;
944
945   VDBG (0, "VCL<%d>: sid %u: binding to local %s address %U port %u, "
946         "proto %s", getpid (), session_handle,
947         session->transport.is_ip4 ? "IPv4" : "IPv6",
948         format_ip46_address, &session->transport.lcl_ip,
949         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
950         clib_net_to_host_u16 (session->transport.lcl_port),
951         session->session_type ? "UDP" : "TCP");
952   vcl_evt (VCL_EVT_BIND, session);
953
954   if (session->session_type == VPPCOM_PROTO_UDP)
955     vppcom_session_listen (session_handle, 10);
956
957   return VPPCOM_OK;
958 }
959
960 int
961 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
962 {
963   vcl_worker_t *wrk = vcl_worker_get_current ();
964   vcl_session_t *listen_session = 0;
965   u64 listen_vpp_handle;
966   int rv;
967
968   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
969   if (!listen_session)
970     return VPPCOM_EBADFD;
971
972   if (q_len == 0 || q_len == ~0)
973     q_len = vcm->cfg.listen_queue_size;
974
975   if (listen_session->is_vep)
976     {
977       clib_warning ("VCL<%d>: ERROR: sid %u: cannot listen on an "
978                     "epoll session!", getpid (), listen_sh);
979       return VPPCOM_EBADFD;
980     }
981
982   listen_vpp_handle = listen_session->vpp_handle;
983   if (listen_session->session_state & STATE_LISTEN)
984     {
985       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: already in listen state!",
986             getpid (), listen_vpp_handle, listen_sh);
987       return VPPCOM_OK;
988     }
989
990   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: sending VPP bind+listen "
991         "request...", getpid (), listen_vpp_handle, listen_sh);
992
993   /*
994    * Send listen request to vpp and wait for reply
995    */
996   vppcom_send_bind_sock (listen_session);
997   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
998                                              STATE_LISTEN,
999                                              vcm->cfg.session_timeout);
1000
1001   if (PREDICT_FALSE (rv))
1002     {
1003       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1004       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: bind+listen failed! "
1005             "returning %d (%s)", getpid (), listen_session->vpp_handle,
1006             listen_sh, rv, vppcom_retval_str (rv));
1007       return rv;
1008     }
1009
1010   return VPPCOM_OK;
1011 }
1012
1013 static int
1014 validate_args_session_accept_ (vcl_worker_t * wrk,
1015                                vcl_session_t * listen_session)
1016 {
1017   /* Input validation - expects spinlock on sessions_lockp */
1018   if (listen_session->is_vep)
1019     {
1020       clib_warning ("VCL<%d>: ERROR: sid %u: cannot accept on an "
1021                     "epoll session!", getpid (),
1022                     listen_session->session_index);
1023       return VPPCOM_EBADFD;
1024     }
1025
1026   if (listen_session->session_state != STATE_LISTEN)
1027     {
1028       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1029                     "not in listen state! state 0x%x (%s)", getpid (),
1030                     listen_session->vpp_handle, listen_session->session_index,
1031                     listen_session->session_state,
1032                     vppcom_session_state_str (listen_session->session_state));
1033       return VPPCOM_EBADFD;
1034     }
1035   return VPPCOM_OK;
1036 }
1037
1038 int
1039 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1040                        uint32_t flags)
1041 {
1042   u32 client_session_index = ~0, listen_session_index;
1043   vcl_worker_t *wrk = vcl_worker_get_current ();
1044   session_accepted_msg_t accepted_msg;
1045   vcl_session_t *listen_session = 0;
1046   vcl_session_t *client_session = 0;
1047   svm_msg_q_t *vpp_evt_q;
1048   vcl_session_msg_t *evt;
1049   u64 listen_vpp_handle;
1050   svm_msg_q_msg_t msg;
1051   session_event_t *e;
1052   u8 is_nonblocking;
1053   int rv;
1054
1055   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1056   if (!listen_session)
1057     return VPPCOM_EBADFD;
1058
1059   listen_session_index = listen_session->session_index;
1060   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1061     return rv;
1062
1063   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1064     {
1065       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1066       accepted_msg = evt->accepted_msg;
1067       goto handle;
1068     }
1069
1070   is_nonblocking = VCL_SESS_ATTR_TEST (listen_session->attr,
1071                                        VCL_SESS_ATTR_NONBLOCK);
1072   if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1073     return VPPCOM_EAGAIN;
1074
1075   while (1)
1076     {
1077       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1078         return VPPCOM_EAGAIN;
1079
1080       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1081       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1082         {
1083           clib_warning ("discarded event: %u", e->event_type);
1084           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1085           continue;
1086         }
1087       clib_memcpy (&accepted_msg, e->data, sizeof (accepted_msg));
1088       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1089       break;
1090     }
1091
1092 handle:
1093
1094   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg);
1095   listen_session = vcl_session_get (wrk, listen_session_index);
1096   client_session = vcl_session_get (wrk, client_session_index);
1097
1098   if (flags & O_NONBLOCK)
1099     VCL_SESS_ATTR_SET (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
1100
1101   listen_vpp_handle = listen_session->vpp_handle;
1102   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: Got a client request! "
1103         "vpp handle 0x%llx, sid %u, flags %d, is_nonblocking %u",
1104         getpid (), listen_vpp_handle, listen_session_handle,
1105         client_session->vpp_handle, client_session_index,
1106         flags, VCL_SESS_ATTR_TEST (client_session->attr,
1107                                    VCL_SESS_ATTR_NONBLOCK));
1108
1109   if (ep)
1110     {
1111       ep->is_ip4 = client_session->transport.is_ip4;
1112       ep->port = client_session->transport.rmt_port;
1113       if (client_session->transport.is_ip4)
1114         clib_memcpy (ep->ip, &client_session->transport.rmt_ip.ip4,
1115                      sizeof (ip4_address_t));
1116       else
1117         clib_memcpy (ep->ip, &client_session->transport.rmt_ip.ip6,
1118                      sizeof (ip6_address_t));
1119     }
1120
1121   if (accepted_msg.server_event_queue_address)
1122     vpp_evt_q = uword_to_pointer (accepted_msg.vpp_event_queue_address,
1123                                   svm_msg_q_t *);
1124   else
1125     vpp_evt_q = client_session->vpp_evt_q;
1126
1127   vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
1128                                    client_session->vpp_handle, 0);
1129
1130   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: accepted vpp handle 0x%llx, "
1131         "sid %u connection from peer %s address %U port %u to local %s "
1132         "address %U port %u", getpid (), listen_vpp_handle,
1133         listen_session_handle, client_session->vpp_handle,
1134         client_session_index,
1135         client_session->transport.is_ip4 ? "IPv4" : "IPv6",
1136         format_ip46_address, &client_session->transport.rmt_ip,
1137         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1138         clib_net_to_host_u16 (client_session->transport.rmt_port),
1139         client_session->transport.is_ip4 ? "IPv4" : "IPv6",
1140         format_ip46_address, &client_session->transport.lcl_ip,
1141         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1142         clib_net_to_host_u16 (client_session->transport.lcl_port));
1143   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1144            client_session_index);
1145
1146   return vcl_session_handle (client_session);
1147 }
1148
1149 int
1150 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1151 {
1152   vcl_worker_t *wrk = vcl_worker_get_current ();
1153   vcl_session_t *session = 0;
1154   u32 session_index;
1155   int rv;
1156
1157   session = vcl_session_get_w_handle (wrk, session_handle);
1158   if (!session)
1159     return VPPCOM_EBADFD;
1160   session_index = session->session_index;
1161
1162   if (PREDICT_FALSE (session->is_vep))
1163     {
1164       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1165                     "connect on an epoll session!", getpid (),
1166                     session_handle);
1167       return VPPCOM_EBADFD;
1168     }
1169
1170   if (PREDICT_FALSE (session->session_state & CLIENT_STATE_OPEN))
1171     {
1172       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: session already "
1173             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1174             getpid (), session->vpp_handle, session_handle,
1175             session->transport.is_ip4 ? "IPv4" : "IPv6",
1176             format_ip46_address,
1177             &session->transport.rmt_ip, session->transport.is_ip4 ?
1178             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1179             clib_net_to_host_u16 (session->transport.rmt_port),
1180             session->session_type ? "UDP" : "TCP", session->session_state,
1181             vppcom_session_state_str (session->session_state));
1182       return VPPCOM_OK;
1183     }
1184
1185   session->transport.is_ip4 = server_ep->is_ip4;
1186   if (session->transport.is_ip4)
1187     clib_memcpy (&session->transport.rmt_ip.ip4, server_ep->ip,
1188                  sizeof (ip4_address_t));
1189   else
1190     clib_memcpy (&session->transport.rmt_ip.ip6, server_ep->ip,
1191                  sizeof (ip6_address_t));
1192   session->transport.rmt_port = server_ep->port;
1193
1194   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connecting to server %s %U "
1195         "port %d proto %s",
1196         getpid (), session->vpp_handle, session_handle,
1197         session->transport.is_ip4 ? "IPv4" : "IPv6",
1198         format_ip46_address,
1199         &session->transport.rmt_ip, session->transport.is_ip4 ?
1200         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1201         clib_net_to_host_u16 (session->transport.rmt_port),
1202         session->session_type ? "UDP" : "TCP");
1203
1204   /*
1205    * Send connect request and wait for reply from vpp
1206    */
1207   vppcom_send_connect_sock (session);
1208   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
1209                                              vcm->cfg.session_timeout);
1210
1211   session = vcl_session_get (wrk, session_index);
1212
1213   if (PREDICT_FALSE (rv))
1214     {
1215       if (VPPCOM_DEBUG > 0)
1216         {
1217           if (session)
1218             clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: connect "
1219                           "failed! returning %d (%s)", getpid (),
1220                           session->vpp_handle, session_handle, rv,
1221                           vppcom_retval_str (rv));
1222           else
1223             clib_warning ("VCL<%d>: no session for sid %u: connect failed! "
1224                           "returning %d (%s)", getpid (),
1225                           session_handle, rv, vppcom_retval_str (rv));
1226         }
1227     }
1228   else
1229     VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connected!",
1230           getpid (), session->vpp_handle, session_handle);
1231
1232   return rv;
1233 }
1234
1235 static u8
1236 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1237 {
1238   if (!is_ct)
1239     return (e->event_type == FIFO_EVENT_APP_RX
1240             && e->fifo->client_session_index == sid);
1241   else
1242     return (e->event_type == SESSION_IO_EVT_CT_TX);
1243 }
1244
1245 static inline u8
1246 vcl_session_is_readable (vcl_session_t * s)
1247 {
1248   return ((s->session_state & STATE_OPEN)
1249           || (s->session_state == STATE_LISTEN
1250               && s->session_type == VPPCOM_PROTO_UDP));
1251 }
1252
1253 static inline int
1254 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1255                               u8 peek)
1256 {
1257   vcl_worker_t *wrk = vcl_worker_get_current ();
1258   int n_read = 0, rv, is_nonblocking;
1259   vcl_session_t *s = 0;
1260   svm_fifo_t *rx_fifo;
1261   svm_msg_q_msg_t msg;
1262   session_event_t *e;
1263   svm_msg_q_t *mq;
1264   u8 is_full;
1265
1266   if (PREDICT_FALSE (!buf))
1267     return VPPCOM_EINVAL;
1268
1269   s = vcl_session_get_w_handle (wrk, session_handle);
1270   if (PREDICT_FALSE (!s))
1271     return VPPCOM_EBADFD;
1272
1273   if (PREDICT_FALSE (s->is_vep))
1274     {
1275       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1276                     "read from an epoll session!", getpid (), session_handle);
1277       return VPPCOM_EBADFD;
1278     }
1279
1280   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1281   rx_fifo = s->rx_fifo;
1282
1283   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
1284     {
1285       session_state_t state = s->session_state;
1286       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1287
1288       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
1289             "state 0x%x (%s), returning %d (%s)",
1290             getpid (), s->vpp_handle, session_handle, state,
1291             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1292       return rv;
1293     }
1294
1295   mq = vcl_session_is_ct (s) ? s->our_evt_q : wrk->app_event_queue;
1296   svm_fifo_unset_event (rx_fifo);
1297   is_full = svm_fifo_is_full (rx_fifo);
1298
1299   if (svm_fifo_is_empty (rx_fifo))
1300     {
1301       if (is_nonblocking)
1302         {
1303           return VPPCOM_OK;
1304         }
1305       while (1)
1306         {
1307           svm_msg_q_lock (mq);
1308           if (svm_msg_q_is_empty (mq))
1309             svm_msg_q_wait (mq);
1310
1311           svm_msg_q_sub_w_lock (mq, &msg);
1312           e = svm_msg_q_msg_data (mq, &msg);
1313           svm_msg_q_unlock (mq);
1314           if (!vcl_is_rx_evt_for_session (e, s->session_index,
1315                                           s->our_evt_q != 0))
1316             {
1317               vcl_handle_mq_ctrl_event (wrk, e);
1318               svm_msg_q_free_msg (mq, &msg);
1319               continue;
1320             }
1321           svm_fifo_unset_event (rx_fifo);
1322           svm_msg_q_free_msg (mq, &msg);
1323           if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
1324             return 0;
1325           if (svm_fifo_is_empty (rx_fifo))
1326             continue;
1327           break;
1328         }
1329     }
1330
1331   if (s->is_dgram)
1332     n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1333   else
1334     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1335
1336   if (vcl_session_is_ct (s) && is_full)
1337     {
1338       /* If the peer is not polling send notification */
1339       if (!svm_fifo_has_event (s->rx_fifo))
1340         app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
1341                                 SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
1342     }
1343
1344   if (VPPCOM_DEBUG > 2)
1345     {
1346       if (n_read > 0)
1347         clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
1348                       "from (%p)", getpid (), s->vpp_handle,
1349                       session_handle, n_read, rx_fifo);
1350       else
1351         clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
1352                       "returning %d (%s)", getpid (), s->vpp_handle,
1353                       session_handle, n_read, vppcom_retval_str (n_read));
1354     }
1355   return n_read;
1356 }
1357
1358 int
1359 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1360 {
1361   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1362 }
1363
1364 static int
1365 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1366 {
1367   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1368 }
1369
1370 static inline int
1371 vppcom_session_read_ready (vcl_session_t * session)
1372 {
1373   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1374   if (PREDICT_FALSE (session->is_vep))
1375     {
1376       clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
1377                     "epoll session!", getpid (), session->session_index);
1378       return VPPCOM_EBADFD;
1379     }
1380
1381   if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
1382     {
1383       session_state_t state = session->session_state;
1384       int rv;
1385
1386       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1387
1388       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
1389             " state 0x%x (%s), returning %d (%s)", getpid (),
1390             session->vpp_handle, session->session_index, state,
1391             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1392       return rv;
1393     }
1394
1395   if (session->session_state & STATE_LISTEN)
1396     return clib_fifo_elts (session->accept_evts_fifo);
1397
1398   return svm_fifo_max_dequeue (session->rx_fifo);
1399 }
1400
1401 static u8
1402 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1403 {
1404   if (!is_ct)
1405     return (e->event_type == FIFO_EVENT_APP_TX
1406             && e->fifo->client_session_index == sid);
1407   else
1408     return (e->event_type == SESSION_IO_EVT_CT_RX);
1409 }
1410
1411 int
1412 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
1413 {
1414   vcl_worker_t *wrk = vcl_worker_get_current ();
1415   int rv, n_write, is_nonblocking;
1416   vcl_session_t *s = 0;
1417   svm_fifo_t *tx_fifo = 0;
1418   session_evt_type_t et;
1419   svm_msg_q_msg_t msg;
1420   session_event_t *e;
1421   svm_msg_q_t *mq;
1422
1423   if (PREDICT_FALSE (!buf))
1424     return VPPCOM_EINVAL;
1425
1426   s = vcl_session_get_w_handle (wrk, session_handle);
1427   if (PREDICT_FALSE (!s))
1428     return VPPCOM_EBADFD;
1429
1430   tx_fifo = s->tx_fifo;
1431   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1432
1433   if (PREDICT_FALSE (s->is_vep))
1434     {
1435       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1436                     "cannot write to an epoll session!",
1437                     getpid (), s->vpp_handle, session_handle);
1438
1439       return VPPCOM_EBADFD;
1440     }
1441
1442   if (!(s->session_state & STATE_OPEN))
1443     {
1444       session_state_t state = s->session_state;
1445       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1446       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
1447             "state 0x%x (%s)",
1448             getpid (), s->vpp_handle, session_handle,
1449             state, vppcom_session_state_str (state));
1450       return rv;
1451     }
1452
1453   mq = vcl_session_is_ct (s) ? s->our_evt_q : wrk->app_event_queue;
1454   if (svm_fifo_is_full (tx_fifo))
1455     {
1456       if (is_nonblocking)
1457         {
1458           return VPPCOM_EWOULDBLOCK;
1459         }
1460       while (svm_fifo_is_full (tx_fifo))
1461         {
1462           svm_msg_q_lock (mq);
1463           while (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
1464             ;
1465           svm_msg_q_sub_w_lock (mq, &msg);
1466           e = svm_msg_q_msg_data (mq, &msg);
1467           svm_msg_q_unlock (mq);
1468
1469           if (!vcl_is_tx_evt_for_session (e, s->session_index,
1470                                           s->our_evt_q != 0))
1471             vcl_handle_mq_ctrl_event (wrk, e);
1472           svm_msg_q_free_msg (mq, &msg);
1473         }
1474     }
1475
1476   ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
1477   et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
1478   if (s->is_dgram)
1479     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
1480                                   s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
1481   else
1482     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
1483                                    SVM_Q_WAIT);
1484
1485   ASSERT (n_write > 0);
1486
1487   if (VPPCOM_DEBUG > 2)
1488     {
1489       if (n_write <= 0)
1490         clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
1491                       "FIFO-FULL (%p)", getpid (), s->vpp_handle,
1492                       session_handle, tx_fifo);
1493       else
1494         clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
1495                       "wrote %d bytes tx-fifo: (%p)", getpid (),
1496                       s->vpp_handle, session_handle, n_write, tx_fifo);
1497     }
1498   return n_write;
1499 }
1500
1501 static vcl_session_t *
1502 vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
1503 {
1504   vcl_session_t *s;
1505   s = vcl_session_get (wrk, f->client_session_index);
1506   if (s)
1507     {
1508       /* rx fifo */
1509       if (type == 0 && s->rx_fifo == f)
1510         return s;
1511       /* tx fifo */
1512       if (type == 1 && s->tx_fifo == f)
1513         return s;
1514     }
1515   s = vcl_session_get (wrk, f->master_session_index);
1516   if (s)
1517     {
1518       if (type == 0 && s->rx_fifo == f)
1519         return s;
1520       if (type == 1 && s->tx_fifo == f)
1521         return s;
1522     }
1523   return 0;
1524 }
1525
1526 static inline int
1527 vppcom_session_write_ready (vcl_session_t * session)
1528 {
1529   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1530   if (PREDICT_FALSE (session->is_vep))
1531     {
1532       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1533                     "cannot write to an epoll session!",
1534                     getpid (), session->vpp_handle, session->session_index);
1535       return VPPCOM_EBADFD;
1536     }
1537
1538   if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
1539     {
1540       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1541                     "cannot write to a listen session!",
1542                     getpid (), session->vpp_handle, session->session_index);
1543       return VPPCOM_EBADFD;
1544     }
1545
1546   if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
1547     {
1548       session_state_t state = session->session_state;
1549       int rv;
1550
1551       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1552       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1553                     "session is not open! state 0x%x (%s), "
1554                     "returning %d (%s)", getpid (), session->vpp_handle,
1555                     session->session_index,
1556                     state, vppcom_session_state_str (state),
1557                     rv, vppcom_retval_str (rv));
1558       return rv;
1559     }
1560
1561   VDBG (3, "VCL<%d>: vpp handle 0x%llx, sid %u: peek %s (%p), ready = %d",
1562         getpid (), session->vpp_handle, session->session_index,
1563         session->tx_fifo, svm_fifo_max_enqueue (session->tx_fifo));
1564
1565   return svm_fifo_max_enqueue (session->tx_fifo);
1566 }
1567
1568 static inline int
1569 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
1570 {
1571   svm_msg_q_msg_t *msg;
1572   u32 n_msgs;
1573   int i;
1574
1575   n_msgs = svm_msg_q_size (mq);
1576   for (i = 0; i < n_msgs; i++)
1577     {
1578       vec_add2 (wrk->mq_msg_vector, msg, 1);
1579       svm_msg_q_sub_w_lock (mq, msg);
1580     }
1581   return n_msgs;
1582 }
1583
1584 #define vcl_fifo_rx_evt_valid_or_break(_fifo)                   \
1585 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                  \
1586   {                                                             \
1587     svm_fifo_unset_event (_fifo);                               \
1588     if (svm_fifo_is_empty (_fifo))                              \
1589         break;                                                  \
1590   }                                                             \
1591
1592 static int
1593 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
1594                       unsigned long n_bits, unsigned long *read_map,
1595                       unsigned long *write_map, unsigned long *except_map,
1596                       double time_to_wait, u32 * bits_set)
1597 {
1598   session_disconnected_msg_t *disconnected_msg;
1599   session_connected_msg_t *connected_msg;
1600   session_accepted_msg_t *accepted_msg;
1601   vcl_session_msg_t *vcl_msg;
1602   vcl_session_t *session;
1603   svm_msg_q_msg_t *msg;
1604   session_event_t *e;
1605   u32 i, sid;
1606   u64 handle;
1607
1608   svm_msg_q_lock (mq);
1609   if (svm_msg_q_is_empty (mq))
1610     {
1611       if (*bits_set)
1612         {
1613           svm_msg_q_unlock (mq);
1614           return 0;
1615         }
1616
1617       if (!time_to_wait)
1618         {
1619           svm_msg_q_unlock (mq);
1620           return 0;
1621         }
1622       else if (time_to_wait < 0)
1623         {
1624           svm_msg_q_wait (mq);
1625         }
1626       else
1627         {
1628           if (svm_msg_q_timedwait (mq, time_to_wait))
1629             {
1630               svm_msg_q_unlock (mq);
1631               return 0;
1632             }
1633         }
1634     }
1635   vcl_mq_dequeue_batch (wrk, mq);
1636   svm_msg_q_unlock (mq);
1637
1638   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
1639     {
1640       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
1641       e = svm_msg_q_msg_data (mq, msg);
1642       switch (e->event_type)
1643         {
1644         case FIFO_EVENT_APP_RX:
1645           vcl_fifo_rx_evt_valid_or_break (e->fifo);
1646           sid = e->fifo->client_session_index;
1647           session = vcl_session_get (wrk, sid);
1648           if (!session)
1649             break;
1650           if (sid < n_bits && read_map)
1651             {
1652               clib_bitmap_set_no_check (read_map, sid, 1);
1653               *bits_set += 1;
1654             }
1655           break;
1656         case FIFO_EVENT_APP_TX:
1657           sid = e->fifo->client_session_index;
1658           session = vcl_session_get (wrk, sid);
1659           if (!session)
1660             break;
1661           if (sid < n_bits && write_map)
1662             {
1663               clib_bitmap_set_no_check (write_map, sid, 1);
1664               *bits_set += 1;
1665             }
1666           break;
1667         case SESSION_IO_EVT_CT_TX:
1668           vcl_fifo_rx_evt_valid_or_break (e->fifo);
1669           session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
1670           if (!session)
1671             break;
1672           sid = session->session_index;
1673           if (sid < n_bits && read_map)
1674             {
1675               clib_bitmap_set_no_check (read_map, sid, 1);
1676               *bits_set += 1;
1677             }
1678           break;
1679         case SESSION_IO_EVT_CT_RX:
1680           session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
1681           if (!session)
1682             break;
1683           sid = session->session_index;
1684           if (sid < n_bits && write_map)
1685             {
1686               clib_bitmap_set_no_check (write_map, sid, 1);
1687               *bits_set += 1;
1688             }
1689           break;
1690         case SESSION_CTRL_EVT_ACCEPTED:
1691           accepted_msg = (session_accepted_msg_t *) e->data;
1692           handle = accepted_msg->listener_handle;
1693           session = vcl_session_table_lookup_listener (wrk, handle);
1694           if (!session)
1695             {
1696               clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
1697                             "listener handle %llx", getpid (), handle);
1698               break;
1699             }
1700
1701           clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
1702           vcl_msg->accepted_msg = *accepted_msg;
1703           sid = session->session_index;
1704           if (sid < n_bits && read_map)
1705             {
1706               clib_bitmap_set_no_check (read_map, sid, 1);
1707               *bits_set += 1;
1708             }
1709           break;
1710         case SESSION_CTRL_EVT_CONNECTED:
1711           connected_msg = (session_connected_msg_t *) e->data;
1712           vcl_session_connected_handler (wrk, connected_msg);
1713           break;
1714         case SESSION_CTRL_EVT_DISCONNECTED:
1715           disconnected_msg = (session_disconnected_msg_t *) e->data;
1716           sid = vcl_session_index_from_vpp_handle (wrk,
1717                                                    disconnected_msg->handle);
1718           if (sid < n_bits && except_map)
1719             {
1720               clib_bitmap_set_no_check (except_map, sid, 1);
1721               *bits_set += 1;
1722             }
1723           break;
1724         case SESSION_CTRL_EVT_RESET:
1725           sid = vcl_session_reset_handler (wrk,
1726                                            (session_reset_msg_t *) e->data);
1727           if (sid < n_bits && except_map)
1728             {
1729               clib_bitmap_set_no_check (except_map, sid, 1);
1730               *bits_set += 1;
1731             }
1732           break;
1733         default:
1734           clib_warning ("unhandled: %u", e->event_type);
1735           break;
1736         }
1737       svm_msg_q_free_msg (mq, msg);
1738     }
1739
1740   vec_reset_length (wrk->mq_msg_vector);
1741   return *bits_set;
1742 }
1743
1744 static int
1745 vppcom_select_condvar (vcl_worker_t * wrk, unsigned long n_bits,
1746                        unsigned long *read_map, unsigned long *write_map,
1747                        unsigned long *except_map, double time_to_wait,
1748                        u32 * bits_set)
1749 {
1750   double total_wait = 0, wait_slice;
1751   vcl_cut_through_registration_t *cr;
1752
1753   time_to_wait = (time_to_wait == -1) ? 10e9 : time_to_wait;
1754   wait_slice = wrk->cut_through_registrations ? 10e-6 : time_to_wait;
1755   do
1756     {
1757       vcl_ct_registration_lock (wrk);
1758       /* *INDENT-OFF* */
1759       pool_foreach (cr, wrk->cut_through_registrations, ({
1760         vcl_select_handle_mq (wrk, cr->mq, n_bits, read_map, write_map, except_map,
1761                               0, bits_set);
1762       }));
1763       /* *INDENT-ON* */
1764       vcl_ct_registration_unlock (wrk);
1765
1766       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
1767                             write_map, except_map, time_to_wait, bits_set);
1768       total_wait += wait_slice;
1769       if (*bits_set)
1770         return *bits_set;
1771     }
1772   while (total_wait < time_to_wait);
1773
1774   return 0;
1775 }
1776
1777 static int
1778 vppcom_select_eventfd (vcl_worker_t * wrk, unsigned long n_bits,
1779                        unsigned long *read_map, unsigned long *write_map,
1780                        unsigned long *except_map, double time_to_wait,
1781                        u32 * bits_set)
1782 {
1783   vcl_mq_evt_conn_t *mqc;
1784   int __clib_unused n_read;
1785   int n_mq_evts, i;
1786   u64 buf;
1787
1788   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
1789   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
1790                           vec_len (wrk->mq_events), time_to_wait);
1791   for (i = 0; i < n_mq_evts; i++)
1792     {
1793       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
1794       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
1795       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
1796                             except_map, 0, bits_set);
1797     }
1798
1799   return (n_mq_evts > 0 ? (int) *bits_set : 0);
1800 }
1801
1802 int
1803 vppcom_select (unsigned long n_bits, unsigned long *read_map,
1804                unsigned long *write_map, unsigned long *except_map,
1805                double time_to_wait)
1806 {
1807   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
1808   vcl_worker_t *wrk = vcl_worker_get_current ();
1809   vcl_session_t *session = 0;
1810   int rv;
1811
1812   ASSERT (sizeof (clib_bitmap_t) == sizeof (long int));
1813
1814   if (n_bits && read_map)
1815     {
1816       clib_bitmap_validate (wrk->rd_bitmap, minbits);
1817       clib_memcpy (wrk->rd_bitmap, read_map,
1818                    vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
1819       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
1820     }
1821   if (n_bits && write_map)
1822     {
1823       clib_bitmap_validate (wrk->wr_bitmap, minbits);
1824       clib_memcpy (wrk->wr_bitmap, write_map,
1825                    vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
1826       memset (write_map, 0,
1827               vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
1828     }
1829   if (n_bits && except_map)
1830     {
1831       clib_bitmap_validate (wrk->ex_bitmap, minbits);
1832       clib_memcpy (wrk->ex_bitmap, except_map,
1833                    vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
1834       memset (except_map, 0,
1835               vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
1836     }
1837
1838   if (!n_bits)
1839     return 0;
1840
1841   if (!write_map)
1842     goto check_rd;
1843
1844   /* *INDENT-OFF* */
1845   clib_bitmap_foreach (sid, wrk->wr_bitmap, ({
1846     if (!(session = vcl_session_get (wrk, sid)))
1847       {
1848         VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
1849               getpid (), sid);
1850         return VPPCOM_EBADFD;
1851       }
1852
1853     rv = svm_fifo_is_full (session->tx_fifo);
1854     if (!rv)
1855       {
1856         clib_bitmap_set_no_check (write_map, sid, 1);
1857         bits_set++;
1858       }
1859   }));
1860
1861 check_rd:
1862   if (!read_map)
1863     goto check_mq;
1864
1865   clib_bitmap_foreach (sid, wrk->rd_bitmap, ({
1866     if (!(session = vcl_session_get (wrk, sid)))
1867       {
1868         VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
1869               getpid (), sid);
1870         return VPPCOM_EBADFD;
1871       }
1872
1873     rv = vppcom_session_read_ready (session);
1874     if (rv)
1875       {
1876         clib_bitmap_set_no_check (read_map, sid, 1);
1877         bits_set++;
1878       }
1879   }));
1880   /* *INDENT-ON* */
1881
1882 check_mq:
1883
1884   if (vcm->cfg.use_mq_eventfd)
1885     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
1886                            time_to_wait, &bits_set);
1887   else
1888     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
1889                            time_to_wait, &bits_set);
1890
1891   return (bits_set);
1892 }
1893
1894 static inline void
1895 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_idx)
1896 {
1897   vcl_session_t *session;
1898   vppcom_epoll_t *vep;
1899   u32 sid = vep_idx;
1900
1901   if (VPPCOM_DEBUG <= 1)
1902     return;
1903
1904   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1905   session = vcl_session_get (wrk, vep_idx);
1906   if (PREDICT_FALSE (!session))
1907     {
1908       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!",
1909                     getpid (), vep_idx);
1910       goto done;
1911     }
1912   if (PREDICT_FALSE (!session->is_vep))
1913     {
1914       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
1915                     getpid (), vep_idx);
1916       goto done;
1917     }
1918   vep = &session->vep;
1919   clib_warning ("VCL<%d>: vep_idx (%u): Dumping epoll chain\n"
1920                 "{\n"
1921                 "   is_vep         = %u\n"
1922                 "   is_vep_session = %u\n"
1923                 "   next_sid       = 0x%x (%u)\n"
1924                 "   wait_cont_idx  = 0x%x (%u)\n"
1925                 "}\n", getpid (), vep_idx,
1926                 session->is_vep, session->is_vep_session,
1927                 vep->next_sh, vep->next_sh,
1928                 session->wait_cont_idx, session->wait_cont_idx);
1929
1930   for (sid = vep->next_sh; sid != ~0; sid = vep->next_sh)
1931     {
1932       session = vcl_session_get (wrk, sid);
1933       if (PREDICT_FALSE (!session))
1934         {
1935           clib_warning ("VCL<%d>: ERROR: Invalid sid (%u)!", getpid (), sid);
1936           goto done;
1937         }
1938       if (PREDICT_FALSE (session->is_vep))
1939         clib_warning ("VCL<%d>: ERROR: sid (%u) is a vep!",
1940                       getpid (), vep_idx);
1941       else if (PREDICT_FALSE (!session->is_vep_session))
1942         {
1943           clib_warning ("VCL<%d>: ERROR: session (%u) "
1944                         "is not a vep session!", getpid (), sid);
1945           goto done;
1946         }
1947       vep = &session->vep;
1948       if (PREDICT_FALSE (vep->vep_sh != vep_idx))
1949         clib_warning ("VCL<%d>: ERROR: session (%u) vep_idx (%u) != "
1950                       "vep_idx (%u)!", getpid (),
1951                       sid, session->vep.vep_sh, vep_idx);
1952       if (session->is_vep_session)
1953         {
1954           clib_warning ("vep_idx[%u]: sid 0x%x (%u)\n"
1955                         "{\n"
1956                         "   next_sid       = 0x%x (%u)\n"
1957                         "   prev_sid       = 0x%x (%u)\n"
1958                         "   vep_idx        = 0x%x (%u)\n"
1959                         "   ev.events      = 0x%x\n"
1960                         "   ev.data.u64    = 0x%llx\n"
1961                         "   et_mask        = 0x%x\n"
1962                         "}\n",
1963                         vep_idx, sid, sid,
1964                         vep->next_sh, vep->next_sh,
1965                         vep->prev_sh, vep->prev_sh,
1966                         vep->vep_sh, vep->vep_sh,
1967                         vep->ev.events, vep->ev.data.u64, vep->et_mask);
1968         }
1969     }
1970
1971 done:
1972   clib_warning ("VCL<%d>: vep_idx (%u): Dump complete!\n",
1973                 getpid (), vep_idx);
1974 }
1975
1976 int
1977 vppcom_epoll_create (void)
1978 {
1979   vcl_worker_t *wrk = vcl_worker_get_current ();
1980   vcl_session_t *vep_session;
1981
1982   vep_session = vcl_session_alloc (wrk);
1983
1984   vep_session->is_vep = 1;
1985   vep_session->vep.vep_sh = ~0;
1986   vep_session->vep.next_sh = ~0;
1987   vep_session->vep.prev_sh = ~0;
1988   vep_session->wait_cont_idx = ~0;
1989   vep_session->vpp_handle = ~0;
1990
1991   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_sh);
1992   VDBG (0, "VCL<%d>: Created vep_idx %u / sid %u!",
1993         getpid (), vep_session->session_index, vep_session->session_index);
1994
1995   return vcl_session_handle (vep_session);
1996 }
1997
1998 int
1999 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2000                   struct epoll_event *event)
2001 {
2002   vcl_worker_t *wrk = vcl_worker_get_current ();
2003   vcl_session_t *vep_session;
2004   vcl_session_t *session;
2005   int rv = VPPCOM_OK;
2006
2007   if (vep_handle == session_handle)
2008     {
2009       clib_warning ("VCL<%d>: ERROR: vep_idx == session_index (%u)!",
2010                     getpid (), vep_handle);
2011       return VPPCOM_EINVAL;
2012     }
2013
2014   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2015   if (PREDICT_FALSE (!vep_session))
2016     {
2017       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!", vep_handle);
2018       return VPPCOM_EBADFD;
2019     }
2020   if (PREDICT_FALSE (!vep_session->is_vep))
2021     {
2022       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2023                     getpid (), vep_handle);
2024       return VPPCOM_EINVAL;
2025     }
2026
2027   ASSERT (vep_session->vep.vep_sh == ~0);
2028   ASSERT (vep_session->vep.prev_sh == ~0);
2029
2030   session = vcl_session_get_w_handle (wrk, session_handle);
2031   if (PREDICT_FALSE (!session))
2032     {
2033       VDBG (0, "VCL<%d>: ERROR: Invalid session_handle (%u)!",
2034             getpid (), session_handle);
2035       return VPPCOM_EBADFD;
2036     }
2037   if (PREDICT_FALSE (session->is_vep))
2038     {
2039       clib_warning ("ERROR: session_handle (%u) is a vep!", vep_handle);
2040       return VPPCOM_EINVAL;
2041     }
2042
2043   switch (op)
2044     {
2045     case EPOLL_CTL_ADD:
2046       if (PREDICT_FALSE (!event))
2047         {
2048           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: NULL pointer to "
2049                         "epoll_event structure!", getpid ());
2050           return VPPCOM_EINVAL;
2051         }
2052       if (vep_session->vep.next_sh != ~0)
2053         {
2054           vcl_session_t *next_session;
2055           next_session = vcl_session_get_w_handle (wrk,
2056                                                    vep_session->vep.next_sh);
2057           if (PREDICT_FALSE (!next_session))
2058             {
2059               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: Invalid "
2060                             "vep.next_sid (%u) on vep_idx (%u)!",
2061                             getpid (), vep_session->vep.next_sh, vep_handle);
2062               return VPPCOM_EBADFD;
2063             }
2064           ASSERT (next_session->vep.prev_sh == vep_handle);
2065           next_session->vep.prev_sh = session_handle;
2066         }
2067       session->vep.next_sh = vep_session->vep.next_sh;
2068       session->vep.prev_sh = vep_handle;
2069       session->vep.vep_sh = vep_handle;
2070       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2071       session->vep.ev = *event;
2072       session->is_vep = 0;
2073       session->is_vep_session = 1;
2074       vep_session->vep.next_sh = session_handle;
2075
2076       VDBG (1, "VCL<%d>: EPOLL_CTL_ADD: vep_idx %u, sid %u, events 0x%x, "
2077             "data 0x%llx!", getpid (), vep_handle, session_handle,
2078             event->events, event->data.u64);
2079       vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
2080       break;
2081
2082     case EPOLL_CTL_MOD:
2083       if (PREDICT_FALSE (!event))
2084         {
2085           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_MOD: NULL pointer to "
2086                         "epoll_event structure!", getpid ());
2087           rv = VPPCOM_EINVAL;
2088           goto done;
2089         }
2090       else if (PREDICT_FALSE (!session->is_vep_session))
2091         {
2092           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2093                         "not a vep session!", getpid (), session_handle);
2094           rv = VPPCOM_EINVAL;
2095           goto done;
2096         }
2097       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2098         {
2099           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2100                         "vep_idx (%u) != vep_idx (%u)!",
2101                         getpid (), session_handle,
2102                         session->vep.vep_sh, vep_handle);
2103           rv = VPPCOM_EINVAL;
2104           goto done;
2105         }
2106       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2107       session->vep.ev = *event;
2108       VDBG (1, "VCL<%d>: EPOLL_CTL_MOD: vep_idx %u, sid %u, events 0x%x,"
2109             " data 0x%llx!", getpid (), vep_handle, session_handle,
2110             event->events, event->data.u64);
2111       break;
2112
2113     case EPOLL_CTL_DEL:
2114       if (PREDICT_FALSE (!session->is_vep_session))
2115         {
2116           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2117                         "not a vep session!", getpid (), session_handle);
2118           rv = VPPCOM_EINVAL;
2119           goto done;
2120         }
2121       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2122         {
2123           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2124                         "vep_idx (%u) != vep_idx (%u)!",
2125                         getpid (), session_handle,
2126                         session->vep.vep_sh, vep_handle);
2127           rv = VPPCOM_EINVAL;
2128           goto done;
2129         }
2130
2131       vep_session->wait_cont_idx =
2132         (vep_session->wait_cont_idx == session_handle) ?
2133         session->vep.next_sh : vep_session->wait_cont_idx;
2134
2135       if (session->vep.prev_sh == vep_handle)
2136         vep_session->vep.next_sh = session->vep.next_sh;
2137       else
2138         {
2139           vcl_session_t *prev_session;
2140           prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh);
2141           if (PREDICT_FALSE (!prev_session))
2142             {
2143               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2144                             "vep.prev_sid (%u) on sid (%u)!",
2145                             getpid (), session->vep.prev_sh, session_handle);
2146               return VPPCOM_EBADFD;
2147             }
2148           ASSERT (prev_session->vep.next_sh == session_handle);
2149           prev_session->vep.next_sh = session->vep.next_sh;
2150         }
2151       if (session->vep.next_sh != ~0)
2152         {
2153           vcl_session_t *next_session;
2154           next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh);
2155           if (PREDICT_FALSE (!next_session))
2156             {
2157               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2158                             "vep.next_sid (%u) on sid (%u)!",
2159                             getpid (), session->vep.next_sh, session_handle);
2160               return VPPCOM_EBADFD;
2161             }
2162           ASSERT (next_session->vep.prev_sh == session_handle);
2163           next_session->vep.prev_sh = session->vep.prev_sh;
2164         }
2165
2166       memset (&session->vep, 0, sizeof (session->vep));
2167       session->vep.next_sh = ~0;
2168       session->vep.prev_sh = ~0;
2169       session->vep.vep_sh = ~0;
2170       session->is_vep_session = 0;
2171       VDBG (1, "VCL<%d>: EPOLL_CTL_DEL: vep_idx %u, sid %u!",
2172             getpid (), vep_handle, session_handle);
2173       vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
2174       break;
2175
2176     default:
2177       clib_warning ("VCL<%d>: ERROR: Invalid operation (%d)!", getpid (), op);
2178       rv = VPPCOM_EINVAL;
2179     }
2180
2181   vep_verify_epoll_chain (wrk, vep_handle);
2182
2183 done:
2184   return rv;
2185 }
2186
2187 static int
2188 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2189                           struct epoll_event *events, u32 maxevents,
2190                           double wait_for_time, u32 * num_ev)
2191 {
2192   session_disconnected_msg_t *disconnected_msg;
2193   session_connected_msg_t *connected_msg;
2194   session_accepted_msg_t *accepted_msg;
2195   u64 session_evt_data = ~0, handle;
2196   u32 sid = ~0, session_events;
2197   vcl_session_msg_t *vcl_msg;
2198   vcl_session_t *session;
2199   svm_msg_q_msg_t *msg;
2200   session_event_t *e;
2201   u8 add_event;
2202   int i;
2203
2204   svm_msg_q_lock (mq);
2205   if (svm_msg_q_is_empty (mq))
2206     {
2207       if (!wait_for_time)
2208         {
2209           svm_msg_q_unlock (mq);
2210           return 0;
2211         }
2212       else if (wait_for_time < 0)
2213         {
2214           svm_msg_q_wait (mq);
2215         }
2216       else
2217         {
2218           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2219             {
2220               svm_msg_q_unlock (mq);
2221               return 0;
2222             }
2223         }
2224     }
2225   vcl_mq_dequeue_batch (wrk, mq);
2226   svm_msg_q_unlock (mq);
2227
2228   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2229     {
2230       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2231       e = svm_msg_q_msg_data (mq, msg);
2232       add_event = 0;
2233       switch (e->event_type)
2234         {
2235         case FIFO_EVENT_APP_RX:
2236           vcl_fifo_rx_evt_valid_or_break (e->fifo);
2237           sid = e->fifo->client_session_index;
2238           session = vcl_session_get (wrk, sid);
2239           session_events = session->vep.ev.events;
2240           if (!(EPOLLIN & session->vep.ev.events))
2241             break;
2242           add_event = 1;
2243           events[*num_ev].events |= EPOLLIN;
2244           session_evt_data = session->vep.ev.data.u64;
2245           break;
2246         case FIFO_EVENT_APP_TX:
2247           sid = e->fifo->client_session_index;
2248           session = vcl_session_get (wrk, sid);
2249           session_events = session->vep.ev.events;
2250           if (!(EPOLLOUT & session_events))
2251             break;
2252           add_event = 1;
2253           events[*num_ev].events |= EPOLLOUT;
2254           session_evt_data = session->vep.ev.data.u64;
2255           break;
2256         case SESSION_IO_EVT_CT_TX:
2257           vcl_fifo_rx_evt_valid_or_break (e->fifo);
2258           session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
2259           sid = session->session_index;
2260           session_events = session->vep.ev.events;
2261           if (!(EPOLLIN & session->vep.ev.events))
2262             break;
2263           add_event = 1;
2264           events[*num_ev].events |= EPOLLIN;
2265           session_evt_data = session->vep.ev.data.u64;
2266           break;
2267         case SESSION_IO_EVT_CT_RX:
2268           session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
2269           sid = session->session_index;
2270           session_events = session->vep.ev.events;
2271           if (!(EPOLLOUT & session_events))
2272             break;
2273           add_event = 1;
2274           events[*num_ev].events |= EPOLLOUT;
2275           session_evt_data = session->vep.ev.data.u64;
2276           break;
2277         case SESSION_CTRL_EVT_ACCEPTED:
2278           accepted_msg = (session_accepted_msg_t *) e->data;
2279           handle = accepted_msg->listener_handle;
2280           session = vcl_session_table_lookup_listener (wrk, handle);
2281           if (!session)
2282             {
2283               clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
2284                             "listener handle %llx", getpid (), handle);
2285               break;
2286             }
2287
2288           clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
2289           vcl_msg->accepted_msg = *accepted_msg;
2290           session_events = session->vep.ev.events;
2291           if (!(EPOLLIN & session_events))
2292             break;
2293
2294           add_event = 1;
2295           events[*num_ev].events |= EPOLLIN;
2296           session_evt_data = session->vep.ev.data.u64;
2297           break;
2298         case SESSION_CTRL_EVT_CONNECTED:
2299           connected_msg = (session_connected_msg_t *) e->data;
2300           vcl_session_connected_handler (wrk, connected_msg);
2301           /* Generate EPOLLOUT because there's no connected event */
2302           sid = vcl_session_index_from_vpp_handle (wrk,
2303                                                    connected_msg->handle);
2304           session = vcl_session_get (wrk, sid);
2305           session_events = session->vep.ev.events;
2306           if (EPOLLOUT & session_events)
2307             {
2308               add_event = 1;
2309               events[*num_ev].events |= EPOLLOUT;
2310               session_evt_data = session->vep.ev.data.u64;
2311             }
2312           break;
2313         case SESSION_CTRL_EVT_DISCONNECTED:
2314           disconnected_msg = (session_disconnected_msg_t *) e->data;
2315           sid = vcl_session_index_from_vpp_handle (wrk,
2316                                                    disconnected_msg->handle);
2317           if (!(session = vcl_session_get (wrk, sid)))
2318             break;
2319           add_event = 1;
2320           events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2321           session_evt_data = session->vep.ev.data.u64;
2322           session_events = session->vep.ev.events;
2323           break;
2324         case SESSION_CTRL_EVT_RESET:
2325           sid = vcl_session_reset_handler (wrk,
2326                                            (session_reset_msg_t *) e->data);
2327           if (!(session = vcl_session_get (wrk, sid)))
2328             break;
2329           add_event = 1;
2330           events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2331           session_evt_data = session->vep.ev.data.u64;
2332           session_events = session->vep.ev.events;
2333           break;
2334         default:
2335           VDBG (0, "unhandled: %u", e->event_type);
2336           svm_msg_q_free_msg (mq, msg);
2337           continue;
2338         }
2339       svm_msg_q_free_msg (mq, msg);
2340
2341       if (add_event)
2342         {
2343           events[*num_ev].data.u64 = session_evt_data;
2344           if (EPOLLONESHOT & session_events)
2345             {
2346               session = vcl_session_get (wrk, sid);
2347               session->vep.ev.events = 0;
2348             }
2349           *num_ev += 1;
2350           if (*num_ev == maxevents)
2351             break;
2352         }
2353     }
2354
2355   vec_reset_length (wrk->mq_msg_vector);
2356   return *num_ev;
2357 }
2358
2359 static int
2360 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
2361                            int maxevents, double wait_for_time)
2362 {
2363   vcl_cut_through_registration_t *cr;
2364   double total_wait = 0, wait_slice;
2365   u32 num_ev = 0;
2366   int rv;
2367
2368   wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
2369   wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time;
2370
2371   do
2372     {
2373       vcl_ct_registration_lock (wrk);
2374       /* *INDENT-OFF* */
2375       pool_foreach (cr, wrk->cut_through_registrations, ({
2376         vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &num_ev);
2377       }));
2378       /* *INDENT-ON* */
2379       vcl_ct_registration_unlock (wrk);
2380
2381       rv = vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events,
2382                                      maxevents, num_ev ? 0 : wait_slice,
2383                                      &num_ev);
2384       if (rv)
2385         total_wait += wait_slice;
2386       if (num_ev)
2387         return num_ev;
2388     }
2389   while (total_wait < wait_for_time);
2390   return (int) num_ev;
2391 }
2392
2393 static int
2394 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
2395                            int maxevents, double wait_for_time)
2396 {
2397   vcl_mq_evt_conn_t *mqc;
2398   int __clib_unused n_read;
2399   int n_mq_evts, i;
2400   u32 n_evts = 0;
2401   u64 buf;
2402
2403   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2404   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2405                           vec_len (wrk->mq_events), wait_for_time);
2406   for (i = 0; i < n_mq_evts; i++)
2407     {
2408       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2409       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2410       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
2411     }
2412
2413   return (int) n_evts;
2414 }
2415
2416 int
2417 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
2418                    int maxevents, double wait_for_time)
2419 {
2420   vcl_worker_t *wrk = vcl_worker_get_current ();
2421   vcl_session_t *vep_session;
2422
2423   if (PREDICT_FALSE (maxevents <= 0))
2424     {
2425       clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
2426                     getpid (), maxevents);
2427       return VPPCOM_EINVAL;
2428     }
2429
2430   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2431   if (!vep_session)
2432     return VPPCOM_EBADFD;
2433
2434   if (PREDICT_FALSE (!vep_session->is_vep))
2435     {
2436       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2437                     getpid (), vep_handle);
2438       return VPPCOM_EINVAL;
2439     }
2440
2441   memset (events, 0, sizeof (*events) * maxevents);
2442
2443   if (vcm->cfg.use_mq_eventfd)
2444     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, wait_for_time);
2445
2446   return vppcom_epoll_wait_condvar (wrk, events, maxevents, wait_for_time);
2447 }
2448
2449 int
2450 vppcom_session_attr (uint32_t session_handle, uint32_t op,
2451                      void *buffer, uint32_t * buflen)
2452 {
2453   vcl_worker_t *wrk = vcl_worker_get_current ();
2454   vcl_session_t *session;
2455   int rv = VPPCOM_OK;
2456   u32 *flags = buffer;
2457   vppcom_endpt_t *ep = buffer;
2458
2459   session = vcl_session_get_w_handle (wrk, session_handle);
2460   if (!session)
2461     return VPPCOM_EBADFD;
2462
2463   switch (op)
2464     {
2465     case VPPCOM_ATTR_GET_NREAD:
2466       rv = vppcom_session_read_ready (session);
2467       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d",
2468             getpid (), rv);
2469       break;
2470
2471     case VPPCOM_ATTR_GET_NWRITE:
2472       rv = vppcom_session_write_ready (session);
2473       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
2474             getpid (), session_handle, rv);
2475       break;
2476
2477     case VPPCOM_ATTR_GET_FLAGS:
2478       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
2479         {
2480           *flags = O_RDWR | (VCL_SESS_ATTR_TEST (session->attr,
2481                                                  VCL_SESS_ATTR_NONBLOCK));
2482           *buflen = sizeof (*flags);
2483           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_FLAGS: sid %u, flags = 0x%08x, "
2484                 "is_nonblocking = %u", getpid (),
2485                 session_handle, *flags,
2486                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2487         }
2488       else
2489         rv = VPPCOM_EINVAL;
2490       break;
2491
2492     case VPPCOM_ATTR_SET_FLAGS:
2493       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
2494         {
2495           if (*flags & O_NONBLOCK)
2496             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
2497           else
2498             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
2499
2500           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_FLAGS: sid %u, flags = 0x%08x,"
2501                 " is_nonblocking = %u",
2502                 getpid (), session_handle, *flags,
2503                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2504         }
2505       else
2506         rv = VPPCOM_EINVAL;
2507       break;
2508
2509     case VPPCOM_ATTR_GET_PEER_ADDR:
2510       if (PREDICT_TRUE (buffer && buflen &&
2511                         (*buflen >= sizeof (*ep)) && ep->ip))
2512         {
2513           ep->is_ip4 = session->transport.is_ip4;
2514           ep->port = session->transport.rmt_port;
2515           if (session->transport.is_ip4)
2516             clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
2517                          sizeof (ip4_address_t));
2518           else
2519             clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
2520                          sizeof (ip6_address_t));
2521           *buflen = sizeof (*ep);
2522           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_PEER_ADDR: sid %u, is_ip4 = %u, "
2523                 "addr = %U, port %u", getpid (),
2524                 session_handle, ep->is_ip4, format_ip46_address,
2525                 &session->transport.rmt_ip,
2526                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2527                 clib_net_to_host_u16 (ep->port));
2528         }
2529       else
2530         rv = VPPCOM_EINVAL;
2531       break;
2532
2533     case VPPCOM_ATTR_GET_LCL_ADDR:
2534       if (PREDICT_TRUE (buffer && buflen &&
2535                         (*buflen >= sizeof (*ep)) && ep->ip))
2536         {
2537           ep->is_ip4 = session->transport.is_ip4;
2538           ep->port = session->transport.lcl_port;
2539           if (session->transport.is_ip4)
2540             clib_memcpy (ep->ip, &session->transport.lcl_ip.ip4,
2541                          sizeof (ip4_address_t));
2542           else
2543             clib_memcpy (ep->ip, &session->transport.lcl_ip.ip6,
2544                          sizeof (ip6_address_t));
2545           *buflen = sizeof (*ep);
2546           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_LCL_ADDR: sid %u, is_ip4 = %u,"
2547                 " addr = %U port %d", getpid (),
2548                 session_handle, ep->is_ip4, format_ip46_address,
2549                 &session->transport.lcl_ip,
2550                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2551                 clib_net_to_host_u16 (ep->port));
2552         }
2553       else
2554         rv = VPPCOM_EINVAL;
2555       break;
2556
2557     case VPPCOM_ATTR_GET_LIBC_EPFD:
2558       rv = session->libc_epfd;
2559       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d",
2560             getpid (), rv);
2561       break;
2562
2563     case VPPCOM_ATTR_SET_LIBC_EPFD:
2564       if (PREDICT_TRUE (buffer && buflen &&
2565                         (*buflen == sizeof (session->libc_epfd))))
2566         {
2567           session->libc_epfd = *(int *) buffer;
2568           *buflen = sizeof (session->libc_epfd);
2569
2570           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, "
2571                 "buflen %d", getpid (), session->libc_epfd, *buflen);
2572         }
2573       else
2574         rv = VPPCOM_EINVAL;
2575       break;
2576
2577     case VPPCOM_ATTR_GET_PROTOCOL:
2578       if (buffer && buflen && (*buflen >= sizeof (int)))
2579         {
2580           *(int *) buffer = session->session_type;
2581           *buflen = sizeof (int);
2582
2583           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
2584                 getpid (), *(int *) buffer, *(int *) buffer ? "UDP" : "TCP",
2585                 *buflen);
2586         }
2587       else
2588         rv = VPPCOM_EINVAL;
2589       break;
2590
2591     case VPPCOM_ATTR_GET_LISTEN:
2592       if (buffer && buflen && (*buflen >= sizeof (int)))
2593         {
2594           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2595                                                 VCL_SESS_ATTR_LISTEN);
2596           *buflen = sizeof (int);
2597
2598           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LISTEN: %d, buflen %d",
2599                 getpid (), *(int *) buffer, *buflen);
2600         }
2601       else
2602         rv = VPPCOM_EINVAL;
2603       break;
2604
2605     case VPPCOM_ATTR_GET_ERROR:
2606       if (buffer && buflen && (*buflen >= sizeof (int)))
2607         {
2608           *(int *) buffer = 0;
2609           *buflen = sizeof (int);
2610
2611           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
2612                 getpid (), *(int *) buffer, *buflen);
2613         }
2614       else
2615         rv = VPPCOM_EINVAL;
2616       break;
2617
2618     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
2619       if (buffer && buflen && (*buflen >= sizeof (u32)))
2620         {
2621
2622           /* VPP-TBD */
2623           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
2624                                 session->tx_fifo ? session->tx_fifo->nitems :
2625                                 vcm->cfg.tx_fifo_size);
2626           *buflen = sizeof (u32);
2627
2628           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), "
2629                 "buflen %d, #VPP-TBD#", getpid (),
2630                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
2631         }
2632       else
2633         rv = VPPCOM_EINVAL;
2634       break;
2635
2636     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
2637       if (buffer && buflen && (*buflen == sizeof (u32)))
2638         {
2639           /* VPP-TBD */
2640           session->sndbuf_size = *(u32 *) buffer;
2641           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), "
2642                 "buflen %d, #VPP-TBD#", getpid (),
2643                 session->sndbuf_size, session->sndbuf_size, *buflen);
2644         }
2645       else
2646         rv = VPPCOM_EINVAL;
2647       break;
2648
2649     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
2650       if (buffer && buflen && (*buflen >= sizeof (u32)))
2651         {
2652
2653           /* VPP-TBD */
2654           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
2655                                 session->rx_fifo ? session->rx_fifo->nitems :
2656                                 vcm->cfg.rx_fifo_size);
2657           *buflen = sizeof (u32);
2658
2659           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), "
2660                 "buflen %d, #VPP-TBD#", getpid (),
2661                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
2662         }
2663       else
2664         rv = VPPCOM_EINVAL;
2665       break;
2666
2667     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
2668       if (buffer && buflen && (*buflen == sizeof (u32)))
2669         {
2670           /* VPP-TBD */
2671           session->rcvbuf_size = *(u32 *) buffer;
2672           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), "
2673                 "buflen %d, #VPP-TBD#", getpid (),
2674                 session->sndbuf_size, session->sndbuf_size, *buflen);
2675         }
2676       else
2677         rv = VPPCOM_EINVAL;
2678       break;
2679
2680     case VPPCOM_ATTR_GET_REUSEADDR:
2681       if (buffer && buflen && (*buflen >= sizeof (int)))
2682         {
2683           /* VPP-TBD */
2684           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2685                                                 VCL_SESS_ATTR_REUSEADDR);
2686           *buflen = sizeof (int);
2687
2688           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEADDR: %d, "
2689                 "buflen %d, #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2690         }
2691       else
2692         rv = VPPCOM_EINVAL;
2693       break;
2694
2695     case VPPCOM_ATTR_SET_REUSEADDR:
2696       if (buffer && buflen && (*buflen == sizeof (int)) &&
2697           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
2698         {
2699           /* VPP-TBD */
2700           if (*(int *) buffer)
2701             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEADDR);
2702           else
2703             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEADDR);
2704
2705           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d,"
2706                 " #VPP-TBD#", getpid (),
2707                 VCL_SESS_ATTR_TEST (session->attr,
2708                                     VCL_SESS_ATTR_REUSEADDR), *buflen);
2709         }
2710       else
2711         rv = VPPCOM_EINVAL;
2712       break;
2713
2714     case VPPCOM_ATTR_GET_REUSEPORT:
2715       if (buffer && buflen && (*buflen >= sizeof (int)))
2716         {
2717           /* VPP-TBD */
2718           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2719                                                 VCL_SESS_ATTR_REUSEPORT);
2720           *buflen = sizeof (int);
2721
2722           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d,"
2723                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2724         }
2725       else
2726         rv = VPPCOM_EINVAL;
2727       break;
2728
2729     case VPPCOM_ATTR_SET_REUSEPORT:
2730       if (buffer && buflen && (*buflen == sizeof (int)) &&
2731           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
2732         {
2733           /* VPP-TBD */
2734           if (*(int *) buffer)
2735             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEPORT);
2736           else
2737             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEPORT);
2738
2739           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d,"
2740                 " #VPP-TBD#", getpid (),
2741                 VCL_SESS_ATTR_TEST (session->attr,
2742                                     VCL_SESS_ATTR_REUSEPORT), *buflen);
2743         }
2744       else
2745         rv = VPPCOM_EINVAL;
2746       break;
2747
2748     case VPPCOM_ATTR_GET_BROADCAST:
2749       if (buffer && buflen && (*buflen >= sizeof (int)))
2750         {
2751           /* VPP-TBD */
2752           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2753                                                 VCL_SESS_ATTR_BROADCAST);
2754           *buflen = sizeof (int);
2755
2756           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d,"
2757                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2758         }
2759       else
2760         rv = VPPCOM_EINVAL;
2761       break;
2762
2763     case VPPCOM_ATTR_SET_BROADCAST:
2764       if (buffer && buflen && (*buflen == sizeof (int)))
2765         {
2766           /* VPP-TBD */
2767           if (*(int *) buffer)
2768             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_BROADCAST);
2769           else
2770             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_BROADCAST);
2771
2772           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, "
2773                 "#VPP-TBD#", getpid (),
2774                 VCL_SESS_ATTR_TEST (session->attr,
2775                                     VCL_SESS_ATTR_BROADCAST), *buflen);
2776         }
2777       else
2778         rv = VPPCOM_EINVAL;
2779       break;
2780
2781     case VPPCOM_ATTR_GET_V6ONLY:
2782       if (buffer && buflen && (*buflen >= sizeof (int)))
2783         {
2784           /* VPP-TBD */
2785           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2786                                                 VCL_SESS_ATTR_V6ONLY);
2787           *buflen = sizeof (int);
2788
2789           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, "
2790                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2791         }
2792       else
2793         rv = VPPCOM_EINVAL;
2794       break;
2795
2796     case VPPCOM_ATTR_SET_V6ONLY:
2797       if (buffer && buflen && (*buflen == sizeof (int)))
2798         {
2799           /* VPP-TBD */
2800           if (*(int *) buffer)
2801             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_V6ONLY);
2802           else
2803             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_V6ONLY);
2804
2805           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, "
2806                 "#VPP-TBD#", getpid (),
2807                 VCL_SESS_ATTR_TEST (session->attr,
2808                                     VCL_SESS_ATTR_V6ONLY), *buflen);
2809         }
2810       else
2811         rv = VPPCOM_EINVAL;
2812       break;
2813
2814     case VPPCOM_ATTR_GET_KEEPALIVE:
2815       if (buffer && buflen && (*buflen >= sizeof (int)))
2816         {
2817           /* VPP-TBD */
2818           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2819                                                 VCL_SESS_ATTR_KEEPALIVE);
2820           *buflen = sizeof (int);
2821
2822           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, "
2823                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2824         }
2825       else
2826         rv = VPPCOM_EINVAL;
2827       break;
2828
2829     case VPPCOM_ATTR_SET_KEEPALIVE:
2830       if (buffer && buflen && (*buflen == sizeof (int)))
2831         {
2832           /* VPP-TBD */
2833           if (*(int *) buffer)
2834             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_KEEPALIVE);
2835           else
2836             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_KEEPALIVE);
2837
2838           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, "
2839                 "#VPP-TBD#", getpid (),
2840                 VCL_SESS_ATTR_TEST (session->attr,
2841                                     VCL_SESS_ATTR_KEEPALIVE), *buflen);
2842         }
2843       else
2844         rv = VPPCOM_EINVAL;
2845       break;
2846
2847     case VPPCOM_ATTR_GET_TCP_NODELAY:
2848       if (buffer && buflen && (*buflen >= sizeof (int)))
2849         {
2850           /* VPP-TBD */
2851           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2852                                                 VCL_SESS_ATTR_TCP_NODELAY);
2853           *buflen = sizeof (int);
2854
2855           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, "
2856                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2857         }
2858       else
2859         rv = VPPCOM_EINVAL;
2860       break;
2861
2862     case VPPCOM_ATTR_SET_TCP_NODELAY:
2863       if (buffer && buflen && (*buflen == sizeof (int)))
2864         {
2865           /* VPP-TBD */
2866           if (*(int *) buffer)
2867             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
2868           else
2869             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
2870
2871           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, "
2872                 "#VPP-TBD#", getpid (),
2873                 VCL_SESS_ATTR_TEST (session->attr,
2874                                     VCL_SESS_ATTR_TCP_NODELAY), *buflen);
2875         }
2876       else
2877         rv = VPPCOM_EINVAL;
2878       break;
2879
2880     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
2881       if (buffer && buflen && (*buflen >= sizeof (int)))
2882         {
2883           /* VPP-TBD */
2884           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2885                                                 VCL_SESS_ATTR_TCP_KEEPIDLE);
2886           *buflen = sizeof (int);
2887
2888           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, "
2889                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2890         }
2891       else
2892         rv = VPPCOM_EINVAL;
2893       break;
2894
2895     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
2896       if (buffer && buflen && (*buflen == sizeof (int)))
2897         {
2898           /* VPP-TBD */
2899           if (*(int *) buffer)
2900             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
2901           else
2902             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
2903
2904           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, "
2905                 "#VPP-TBD#", getpid (),
2906                 VCL_SESS_ATTR_TEST (session->attr,
2907                                     VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
2908         }
2909       else
2910         rv = VPPCOM_EINVAL;
2911       break;
2912
2913     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
2914       if (buffer && buflen && (*buflen >= sizeof (int)))
2915         {
2916           /* VPP-TBD */
2917           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2918                                                 VCL_SESS_ATTR_TCP_KEEPINTVL);
2919           *buflen = sizeof (int);
2920
2921           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, "
2922                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2923         }
2924       else
2925         rv = VPPCOM_EINVAL;
2926       break;
2927
2928     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
2929       if (buffer && buflen && (*buflen == sizeof (int)))
2930         {
2931           /* VPP-TBD */
2932           if (*(int *) buffer)
2933             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
2934           else
2935             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
2936
2937           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, "
2938                 "#VPP-TBD#", getpid (),
2939                 VCL_SESS_ATTR_TEST (session->attr,
2940                                     VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
2941         }
2942       else
2943         rv = VPPCOM_EINVAL;
2944       break;
2945
2946     case VPPCOM_ATTR_GET_TCP_USER_MSS:
2947       if (buffer && buflen && (*buflen >= sizeof (u32)))
2948         {
2949           /* VPP-TBD */
2950           *(u32 *) buffer = session->user_mss;
2951           *buflen = sizeof (int);
2952
2953           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d,"
2954                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2955         }
2956       else
2957         rv = VPPCOM_EINVAL;
2958       break;
2959
2960     case VPPCOM_ATTR_SET_TCP_USER_MSS:
2961       if (buffer && buflen && (*buflen == sizeof (u32)))
2962         {
2963           /* VPP-TBD */
2964           session->user_mss = *(u32 *) buffer;
2965
2966           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, "
2967                 "#VPP-TBD#", getpid (), session->user_mss, *buflen);
2968         }
2969       else
2970         rv = VPPCOM_EINVAL;
2971       break;
2972
2973     default:
2974       rv = VPPCOM_EINVAL;
2975       break;
2976     }
2977
2978   return rv;
2979 }
2980
2981 int
2982 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
2983                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
2984 {
2985   vcl_worker_t *wrk = vcl_worker_get_current ();
2986   int rv = VPPCOM_OK;
2987   vcl_session_t *session = 0;
2988
2989   if (ep)
2990     {
2991       session = vcl_session_get_w_handle (wrk, session_handle);
2992       if (PREDICT_FALSE (!session))
2993         {
2994           VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
2995                 getpid (), session_handle);
2996           return VPPCOM_EBADFD;
2997         }
2998       ep->is_ip4 = session->transport.is_ip4;
2999       ep->port = session->transport.rmt_port;
3000     }
3001
3002   if (flags == 0)
3003     rv = vppcom_session_read (session_handle, buffer, buflen);
3004   else if (flags & MSG_PEEK)
3005     rv = vppcom_session_peek (session_handle, buffer, buflen);
3006   else
3007     {
3008       clib_warning ("VCL<%d>: Unsupport flags for recvfrom %d",
3009                     getpid (), flags);
3010       return VPPCOM_EAFNOSUPPORT;
3011     }
3012
3013   if (ep)
3014     {
3015       if (session->transport.is_ip4)
3016         clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
3017                      sizeof (ip4_address_t));
3018       else
3019         clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
3020                      sizeof (ip6_address_t));
3021     }
3022
3023   return rv;
3024 }
3025
3026 int
3027 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3028                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3029 {
3030   if (!buffer)
3031     return VPPCOM_EINVAL;
3032
3033   if (ep)
3034     {
3035       // TBD
3036       return VPPCOM_EINVAL;
3037     }
3038
3039   if (flags)
3040     {
3041       // TBD check the flags and do the right thing
3042       VDBG (2, "VCL<%d>: handling flags 0x%u (%d) not implemented yet.",
3043             getpid (), flags, flags);
3044     }
3045
3046   return (vppcom_session_write (session_handle, buffer, buflen));
3047 }
3048
3049 int
3050 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3051 {
3052   vcl_worker_t *wrk = vcl_worker_get_current ();
3053   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3054   u32 i, keep_trying = 1;
3055   int rv, num_ev = 0;
3056
3057   VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
3058         getpid (), vp, n_sids, wait_for_time);
3059
3060   if (!vp)
3061     return VPPCOM_EFAULT;
3062
3063   do
3064     {
3065       vcl_session_t *session;
3066
3067       for (i = 0; i < n_sids; i++)
3068         {
3069           ASSERT (vp[i].revents);
3070
3071           session = vcl_session_get (wrk, vp[i].sid);
3072           if (!session)
3073             continue;
3074
3075           if (*vp[i].revents)
3076             *vp[i].revents = 0;
3077
3078           if (POLLIN & vp[i].events)
3079             {
3080               rv = vppcom_session_read_ready (session);
3081               if (rv > 0)
3082                 {
3083                   *vp[i].revents |= POLLIN;
3084                   num_ev++;
3085                 }
3086               else if (rv < 0)
3087                 {
3088                   switch (rv)
3089                     {
3090                     case VPPCOM_ECONNRESET:
3091                       *vp[i].revents = POLLHUP;
3092                       break;
3093
3094                     default:
3095                       *vp[i].revents = POLLERR;
3096                       break;
3097                     }
3098                   num_ev++;
3099                 }
3100             }
3101
3102           if (POLLOUT & vp[i].events)
3103             {
3104               rv = vppcom_session_write_ready (session);
3105               if (rv > 0)
3106                 {
3107                   *vp[i].revents |= POLLOUT;
3108                   num_ev++;
3109                 }
3110               else if (rv < 0)
3111                 {
3112                   switch (rv)
3113                     {
3114                     case VPPCOM_ECONNRESET:
3115                       *vp[i].revents = POLLHUP;
3116                       break;
3117
3118                     default:
3119                       *vp[i].revents = POLLERR;
3120                       break;
3121                     }
3122                   num_ev++;
3123                 }
3124             }
3125
3126           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3127             {
3128               *vp[i].revents = POLLNVAL;
3129               num_ev++;
3130             }
3131         }
3132       if (wait_for_time != -1)
3133         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3134     }
3135   while ((num_ev == 0) && keep_trying);
3136
3137   if (VPPCOM_DEBUG > 3)
3138     {
3139       clib_warning ("VCL<%d>: returning %d", getpid (), num_ev);
3140       for (i = 0; i < n_sids; i++)
3141         {
3142           clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
3143                         ".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
3144                         vp[i].events, *vp[i].revents);
3145         }
3146     }
3147   return num_ev;
3148 }
3149
3150 int
3151 vppcom_mq_epoll_fd (void)
3152 {
3153   vcl_worker_t *wrk = vcl_worker_get_current ();
3154   return wrk->mqs_epfd;
3155 }
3156
3157 int
3158 vppcom_session_index (uint32_t session_handle)
3159 {
3160   return session_handle & 0xFFFFFF;
3161 }
3162
3163 int
3164 vppcom_worker_register (void)
3165 {
3166   if (vcl_worker_alloc_and_init ())
3167     return VPPCOM_OK;
3168   return VPPCOM_EEXIST;
3169 }
3170
3171 /*
3172  * fd.io coding-style-patch-verification: ON
3173  *
3174  * Local Variables:
3175  * eval: (c-set-style "gnu")
3176  * End:
3177  */