vcl/session: handle reset/disconnect before app accept
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <svm/svm_fifo_segment.h>
19 #include <vcl/vppcom.h>
20 #include <vcl/vcl_debug.h>
21 #include <vcl/vcl_private.h>
22
23 __thread uword __vcl_worker_index = ~0;
24
25
26 static int
27 vcl_wait_for_segment (u64 segment_handle)
28 {
29   vcl_worker_t *wrk = vcl_worker_get_current ();
30   u32 wait_for_seconds = 10, segment_index;
31   f64 timeout;
32
33   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
34     return 1;
35
36   timeout = clib_time_now (&wrk->clib_time) + wait_for_seconds;
37   while (clib_time_now (&wrk->clib_time) < timeout)
38     {
39       segment_index = vcl_segment_table_lookup (segment_handle);
40       if (segment_index != VCL_INVALID_SEGMENT_INDEX)
41         return 0;
42       usleep (10);
43     }
44   return 1;
45 }
46
47 const char *
48 vppcom_session_state_str (session_state_t state)
49 {
50   char *st;
51
52   switch (state)
53     {
54     case STATE_START:
55       st = "STATE_START";
56       break;
57
58     case STATE_CONNECT:
59       st = "STATE_CONNECT";
60       break;
61
62     case STATE_LISTEN:
63       st = "STATE_LISTEN";
64       break;
65
66     case STATE_ACCEPT:
67       st = "STATE_ACCEPT";
68       break;
69
70     case STATE_VPP_CLOSING:
71       st = "STATE_VPP_CLOSING";
72       break;
73
74     case STATE_DISCONNECT:
75       st = "STATE_DISCONNECT";
76       break;
77
78     case STATE_FAILED:
79       st = "STATE_FAILED";
80       break;
81
82     default:
83       st = "UNKNOWN_STATE";
84       break;
85     }
86
87   return st;
88 }
89
90 u8 *
91 format_ip4_address (u8 * s, va_list * args)
92 {
93   u8 *a = va_arg (*args, u8 *);
94   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
95 }
96
97 u8 *
98 format_ip6_address (u8 * s, va_list * args)
99 {
100   ip6_address_t *a = va_arg (*args, ip6_address_t *);
101   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
102
103   i_max_n_zero = ARRAY_LEN (a->as_u16);
104   max_n_zeros = 0;
105   i_first_zero = i_max_n_zero;
106   n_zeros = 0;
107   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
108     {
109       u32 is_zero = a->as_u16[i] == 0;
110       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
111         {
112           i_first_zero = i;
113           n_zeros = 0;
114         }
115       n_zeros += is_zero;
116       if ((!is_zero && n_zeros > max_n_zeros)
117           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
118         {
119           i_max_n_zero = i_first_zero;
120           max_n_zeros = n_zeros;
121           i_first_zero = ARRAY_LEN (a->as_u16);
122           n_zeros = 0;
123         }
124     }
125
126   last_double_colon = 0;
127   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
128     {
129       if (i == i_max_n_zero && max_n_zeros > 1)
130         {
131           s = format (s, "::");
132           i += max_n_zeros - 1;
133           last_double_colon = 1;
134         }
135       else
136         {
137           s = format (s, "%s%x",
138                       (last_double_colon || i == 0) ? "" : ":",
139                       clib_net_to_host_u16 (a->as_u16[i]));
140           last_double_colon = 0;
141         }
142     }
143
144   return s;
145 }
146
147 /* Format an IP46 address. */
148 u8 *
149 format_ip46_address (u8 * s, va_list * args)
150 {
151   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
152   ip46_type_t type = va_arg (*args, ip46_type_t);
153   int is_ip4 = 1;
154
155   switch (type)
156     {
157     case IP46_TYPE_ANY:
158       is_ip4 = ip46_address_is_ip4 (ip46);
159       break;
160     case IP46_TYPE_IP4:
161       is_ip4 = 1;
162       break;
163     case IP46_TYPE_IP6:
164       is_ip4 = 0;
165       break;
166     }
167
168   return is_ip4 ?
169     format (s, "%U", format_ip4_address, &ip46->ip4) :
170     format (s, "%U", format_ip6_address, &ip46->ip6);
171 }
172
173 /*
174  * VPPCOM Utility Functions
175  */
176
177
178 static svm_msg_q_t *
179 vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
180 {
181   if (vcl_session_is_ct (s))
182     return wrk->vpp_event_queues[0];
183   else
184     return wrk->vpp_event_queues[s->vpp_thread_index];
185 }
186
187 static void
188 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
189                                  session_handle_t handle, int retval)
190 {
191   app_session_evt_t _app_evt, *app_evt = &_app_evt;
192   session_accepted_reply_msg_t *rmp;
193   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
194   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
195   rmp->handle = handle;
196   rmp->context = context;
197   rmp->retval = retval;
198   app_send_ctrl_evt_to_vpp (mq, app_evt);
199 }
200
201 static void
202 vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
203                                      session_handle_t handle, int retval)
204 {
205   app_session_evt_t _app_evt, *app_evt = &_app_evt;
206   session_disconnected_reply_msg_t *rmp;
207   app_alloc_ctrl_evt_to_vpp (mq, app_evt,
208                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
209   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
210   rmp->handle = handle;
211   rmp->context = context;
212   rmp->retval = retval;
213   app_send_ctrl_evt_to_vpp (mq, app_evt);
214 }
215
216 static void
217 vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
218                               session_handle_t handle, int retval)
219 {
220   app_session_evt_t _app_evt, *app_evt = &_app_evt;
221   session_reset_reply_msg_t *rmp;
222   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_RESET_REPLY);
223   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
224   rmp->handle = handle;
225   rmp->context = context;
226   rmp->retval = retval;
227   app_send_ctrl_evt_to_vpp (mq, app_evt);
228 }
229
230 static u32
231 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
232 {
233   vcl_session_t *session, *listen_session;
234   svm_fifo_t *rx_fifo, *tx_fifo;
235   u32 vpp_wrk_index;
236   svm_msg_q_t *evt_q;
237
238   session = vcl_session_alloc (wrk);
239
240   listen_session = vcl_session_table_lookup_listener (wrk,
241                                                       mp->listener_handle);
242   if (!listen_session)
243     {
244       svm_msg_q_t *evt_q;
245       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
246       clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
247                     "unknown vpp listener handle %llx",
248                     getpid (), mp->listener_handle);
249       vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
250                                        VNET_API_ERROR_INVALID_ARGUMENT);
251       vcl_session_free (wrk, session);
252       return VCL_INVALID_SESSION_INDEX;
253     }
254
255   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
256   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
257
258   if (mp->server_event_queue_address)
259     {
260       session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address,
261                                              svm_msg_q_t *);
262       session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
263                                              svm_msg_q_t *);
264       if (vcl_wait_for_segment (mp->segment_handle))
265         {
266           clib_warning ("segment for session %u couldn't be mounted!",
267                         session->session_index);
268           return VCL_INVALID_SESSION_INDEX;
269         }
270       rx_fifo->master_session_index = session->session_index;
271       tx_fifo->master_session_index = session->session_index;
272       rx_fifo->master_thread_index = vcl_get_worker_index ();
273       tx_fifo->master_thread_index = vcl_get_worker_index ();
274       vec_validate (wrk->vpp_event_queues, 0);
275       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
276       wrk->vpp_event_queues[0] = evt_q;
277     }
278   else
279     {
280       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
281                                              svm_msg_q_t *);
282       rx_fifo->client_session_index = session->session_index;
283       tx_fifo->client_session_index = session->session_index;
284       rx_fifo->client_thread_index = vcl_get_worker_index ();
285       tx_fifo->client_thread_index = vcl_get_worker_index ();
286       vpp_wrk_index = tx_fifo->master_thread_index;
287       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
288       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
289     }
290
291   session->vpp_handle = mp->handle;
292   session->vpp_thread_index = rx_fifo->master_thread_index;
293   session->client_context = mp->context;
294   session->rx_fifo = rx_fifo;
295   session->tx_fifo = tx_fifo;
296
297   session->session_state = STATE_ACCEPT;
298   session->transport.rmt_port = mp->port;
299   session->transport.is_ip4 = mp->is_ip4;
300   clib_memcpy_fast (&session->transport.rmt_ip, mp->ip,
301                     sizeof (ip46_address_t));
302
303   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
304   session->transport.lcl_port = listen_session->transport.lcl_port;
305   session->transport.lcl_ip = listen_session->transport.lcl_ip;
306   session->session_type = listen_session->session_type;
307   session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
308
309   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
310         " address %U port %d queue %p!", getpid (), mp->handle,
311         session->session_index,
312         mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip,
313         mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
314         clib_net_to_host_u16 (mp->port), session->vpp_evt_q);
315   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
316
317   return session->session_index;
318 }
319
320 static u32
321 vcl_session_connected_handler (vcl_worker_t * wrk,
322                                session_connected_msg_t * mp)
323 {
324   u32 session_index, vpp_wrk_index;
325   svm_fifo_t *rx_fifo, *tx_fifo;
326   vcl_session_t *session = 0;
327   svm_msg_q_t *evt_q;
328
329   session_index = mp->context;
330   session = vcl_session_get (wrk, session_index);
331   if (!session)
332     {
333       clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
334                     "Invalid session index (%u)!",
335                     getpid (), mp->handle, session_index);
336       return VCL_INVALID_SESSION_INDEX;
337     }
338   if (mp->retval)
339     {
340       clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
341                     session_index, format_api_error, ntohl (mp->retval));
342       session->session_state = STATE_FAILED;
343       session->vpp_handle = mp->handle;
344       return session_index;
345     }
346
347   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
348   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
349   if (vcl_wait_for_segment (mp->segment_handle))
350     {
351       clib_warning ("segment for session %u couldn't be mounted!",
352                     session->session_index);
353       return VCL_INVALID_SESSION_INDEX;
354     }
355
356   rx_fifo->client_session_index = session_index;
357   tx_fifo->client_session_index = session_index;
358   rx_fifo->client_thread_index = vcl_get_worker_index ();
359   tx_fifo->client_thread_index = vcl_get_worker_index ();
360
361   if (mp->client_event_queue_address)
362     {
363       session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
364                                              svm_msg_q_t *);
365       session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
366                                              svm_msg_q_t *);
367
368       vec_validate (wrk->vpp_event_queues, 0);
369       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
370       wrk->vpp_event_queues[0] = evt_q;
371     }
372   else
373     {
374       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
375                                              svm_msg_q_t *);
376       vpp_wrk_index = tx_fifo->master_thread_index;
377       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
378       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
379     }
380
381   session->rx_fifo = rx_fifo;
382   session->tx_fifo = tx_fifo;
383   session->vpp_handle = mp->handle;
384   session->vpp_thread_index = rx_fifo->master_thread_index;
385   session->transport.is_ip4 = mp->is_ip4;
386   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
387                     sizeof (session->transport.lcl_ip));
388   session->transport.lcl_port = mp->lcl_port;
389   session->session_state = STATE_CONNECT;
390
391   /* Add it to lookup table */
392   vcl_session_table_add_vpp_handle (wrk, mp->handle, session_index);
393
394   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: connect succeeded! "
395         "session_rx_fifo %p, refcnt %d, session_tx_fifo %p, refcnt %d",
396         getpid (), mp->handle, session_index, session->rx_fifo,
397         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
398
399   return session_index;
400 }
401
402 static int
403 vcl_flag_accepted_session (vcl_session_t * session, u64 handle, u32 flags)
404 {
405   vcl_session_msg_t *accepted_msg;
406   int i;
407
408   for (i = 0; i < vec_len (session->accept_evts_fifo); i++)
409     {
410       accepted_msg = &session->accept_evts_fifo[i];
411       if (accepted_msg->accepted_msg.handle == handle)
412         {
413           accepted_msg->flags = flags;
414           return 1;
415         }
416     }
417   return 0;
418 }
419
420 static u32
421 vcl_session_reset_handler (vcl_worker_t * wrk,
422                            session_reset_msg_t * reset_msg)
423 {
424   vcl_session_t *session;
425   u32 sid;
426
427   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
428   session = vcl_session_get (wrk, sid);
429   if (!session)
430     {
431       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
432       return VCL_INVALID_SESSION_INDEX;
433     }
434   if (session->session_state >= STATE_VPP_CLOSING)
435     return sid;
436
437   /* Caught a reset before actually accepting the session */
438   if (session->session_state == STATE_LISTEN)
439     {
440
441       if (!vcl_flag_accepted_session (session, reset_msg->handle,
442                                       VCL_ACCEPTED_F_RESET))
443         VDBG (0, "session was not accepted!");
444       return VCL_INVALID_SESSION_INDEX;
445     }
446
447   session->session_state = STATE_DISCONNECT;
448   VDBG (0, "reset session %u [0x%llx]", sid, reset_msg->handle);
449   vcl_send_session_reset_reply (vcl_session_vpp_evt_q (wrk, session),
450                                 wrk->my_client_index, reset_msg->handle, 0);
451   return sid;
452 }
453
454 static u32
455 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
456 {
457   vcl_session_t *session;
458   u32 sid = mp->context;
459
460   session = vcl_session_get (wrk, sid);
461   if (mp->retval)
462     {
463       VERR ("vpp handle 0x%llx, sid %u: bind failed: %U", mp->handle, sid,
464             format_api_error, mp->retval);
465       if (session)
466         {
467           session->session_state = STATE_FAILED;
468           session->vpp_handle = mp->handle;
469           return sid;
470         }
471       else
472         {
473           clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
474                         "Invalid session index (%u)!",
475                         getpid (), mp->handle, sid);
476           return VCL_INVALID_SESSION_INDEX;
477         }
478     }
479
480   session->vpp_handle = mp->handle;
481   session->transport.is_ip4 = mp->lcl_is_ip4;
482   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
483                     sizeof (ip46_address_t));
484   session->transport.lcl_port = mp->lcl_port;
485   vcl_session_table_add_listener (wrk, mp->handle, sid);
486   session->session_state = STATE_LISTEN;
487
488   if (session->is_dgram)
489     {
490       svm_fifo_t *rx_fifo, *tx_fifo;
491       session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
492       rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
493       rx_fifo->client_session_index = sid;
494       tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
495       tx_fifo->client_session_index = sid;
496       session->rx_fifo = rx_fifo;
497       session->tx_fifo = tx_fifo;
498     }
499
500   VDBG (0, "session %u [0x%llx]: listen succeeded!", sid, mp->handle);
501   return sid;
502 }
503
504 static vcl_session_t *
505 vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg)
506 {
507   vcl_session_msg_t *vcl_msg;
508   vcl_session_t *session;
509
510   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
511   if (PREDICT_FALSE (session != 0))
512     VWRN ("session handle overlap %lu!", msg->handle);
513
514   session = vcl_session_table_lookup_listener (wrk, msg->listener_handle);
515   if (!session)
516     {
517       VERR ("couldn't find listen session: listener handle %llx",
518             msg->listener_handle);
519       return 0;
520     }
521
522   clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
523   vcl_msg->accepted_msg = *msg;
524   /* Session handle points to listener until fully accepted by app */
525   vcl_session_table_add_vpp_handle (wrk, msg->handle, session->session_index);
526
527   return session;
528 }
529
530 static vcl_session_t *
531 vcl_session_disconnected_handler (vcl_worker_t * wrk,
532                                   session_disconnected_msg_t * msg)
533 {
534   vcl_session_t *session;
535
536   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
537   if (!session)
538     {
539       VDBG (0, "request to disconnect unknown handle 0x%llx", msg->handle);
540       return 0;
541     }
542
543   /* Caught a disconnect before actually accepting the session */
544   if (session->session_state == STATE_LISTEN)
545     {
546
547       if (!vcl_flag_accepted_session (session, msg->handle,
548                                       VCL_ACCEPTED_F_CLOSED))
549         VDBG (0, "session was not accepted!");
550       return 0;
551     }
552
553   session->session_state = STATE_VPP_CLOSING;
554   return session;
555 }
556
557 static int
558 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
559 {
560   session_disconnected_msg_t *disconnected_msg;
561   vcl_session_t *session;
562
563   switch (e->event_type)
564     {
565     case FIFO_EVENT_APP_RX:
566     case FIFO_EVENT_APP_TX:
567     case SESSION_IO_EVT_CT_RX:
568     case SESSION_IO_EVT_CT_TX:
569       vec_add1 (wrk->unhandled_evts_vector, *e);
570       break;
571     case SESSION_CTRL_EVT_ACCEPTED:
572       vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
573       break;
574     case SESSION_CTRL_EVT_CONNECTED:
575       vcl_session_connected_handler (wrk,
576                                      (session_connected_msg_t *) e->data);
577       break;
578     case SESSION_CTRL_EVT_DISCONNECTED:
579       disconnected_msg = (session_disconnected_msg_t *) e->data;
580       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
581       if (!session)
582         break;
583       session->session_state = STATE_DISCONNECT;
584       VDBG (0, "disconnected session %u [0x%llx]", session->session_index,
585             session->vpp_handle);
586       break;
587     case SESSION_CTRL_EVT_RESET:
588       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
589       break;
590     case SESSION_CTRL_EVT_BOUND:
591       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
592       break;
593     default:
594       clib_warning ("unhandled %u", e->event_type);
595     }
596   return VPPCOM_OK;
597 }
598
599 static inline int
600 vppcom_wait_for_session_state_change (u32 session_index,
601                                       session_state_t state,
602                                       f64 wait_for_time)
603 {
604   vcl_worker_t *wrk = vcl_worker_get_current ();
605   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
606   vcl_session_t *volatile session;
607   svm_msg_q_msg_t msg;
608   session_event_t *e;
609
610   do
611     {
612       session = vcl_session_get (wrk, session_index);
613       if (PREDICT_FALSE (!session))
614         {
615           return VPPCOM_EBADFD;
616         }
617       if (session->session_state & state)
618         {
619           return VPPCOM_OK;
620         }
621       if (session->session_state & STATE_FAILED)
622         {
623           return VPPCOM_ECONNREFUSED;
624         }
625
626       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
627         {
628           usleep (100);
629           continue;
630         }
631       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
632       vcl_handle_mq_event (wrk, e);
633       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
634     }
635   while (clib_time_now (&wrk->clib_time) < timeout);
636
637   VDBG (0, "timeout waiting for state 0x%x (%s)", state,
638         vppcom_session_state_str (state));
639   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
640
641   return VPPCOM_ETIMEDOUT;
642 }
643
644 static int
645 vppcom_app_session_enable (void)
646 {
647   int rv;
648
649   if (vcm->app_state != STATE_APP_ENABLED)
650     {
651       vppcom_send_session_enable_disable (1 /* is_enabled == TRUE */ );
652       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
653       if (PREDICT_FALSE (rv))
654         {
655           VDBG (0, "VCL<%d>: application session enable timed out! "
656                 "returning %d (%s)", getpid (), rv, vppcom_retval_str (rv));
657           return rv;
658         }
659     }
660   return VPPCOM_OK;
661 }
662
663 static int
664 vppcom_app_attach (void)
665 {
666   int rv;
667
668   vppcom_app_send_attach ();
669   rv = vcl_wait_for_app_state_change (STATE_APP_ATTACHED);
670   if (PREDICT_FALSE (rv))
671     {
672       VDBG (0, "VCL<%d>: application attach timed out! returning %d (%s)",
673             getpid (), rv, vppcom_retval_str (rv));
674       return rv;
675     }
676
677   return VPPCOM_OK;
678 }
679
680 static int
681 vppcom_session_unbind (u32 session_handle)
682 {
683   vcl_worker_t *wrk = vcl_worker_get_current ();
684   vcl_session_t *session = 0;
685   u64 vpp_handle;
686
687   session = vcl_session_get_w_handle (wrk, session_handle);
688   if (!session)
689     return VPPCOM_EBADFD;
690
691   vpp_handle = session->vpp_handle;
692   vcl_session_table_del_listener (wrk, vpp_handle);
693   session->vpp_handle = ~0;
694   session->session_state = STATE_DISCONNECT;
695
696   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
697         " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
698         vppcom_session_state_str (STATE_DISCONNECT));
699   vcl_evt (VCL_EVT_UNBIND, session);
700   vppcom_send_unbind_sock (vpp_handle);
701
702   return VPPCOM_OK;
703 }
704
705 static int
706 vppcom_session_disconnect (u32 session_handle)
707 {
708   vcl_worker_t *wrk = vcl_worker_get_current ();
709   svm_msg_q_t *vpp_evt_q;
710   vcl_session_t *session;
711   session_state_t state;
712   u64 vpp_handle;
713
714   session = vcl_session_get_w_handle (wrk, session_handle);
715   if (!session)
716     return VPPCOM_EBADFD;
717
718   vpp_handle = session->vpp_handle;
719   state = session->session_state;
720
721   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u state 0x%x (%s)", getpid (),
722         vpp_handle, session_handle, state, vppcom_session_state_str (state));
723
724   if (PREDICT_FALSE (state & STATE_LISTEN))
725     {
726       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
727                     "Cannot disconnect a listen socket!",
728                     getpid (), vpp_handle, session_handle);
729       return VPPCOM_EBADFD;
730     }
731
732   if (state & STATE_VPP_CLOSING)
733     {
734       vpp_evt_q = vcl_session_vpp_evt_q (wrk, session);
735       vcl_send_session_disconnected_reply (vpp_evt_q, wrk->my_client_index,
736                                            vpp_handle, 0);
737       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
738             "REPLY...", getpid (), vpp_handle, session_handle);
739     }
740   else
741     {
742       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...",
743             getpid (), vpp_handle, session_handle);
744       vppcom_send_disconnect_session (vpp_handle);
745     }
746
747   return VPPCOM_OK;
748 }
749
750 static void
751 vcl_cleanup_bapi (void)
752 {
753   socket_client_main_t *scm = &socket_client_main;
754   api_main_t *am = &api_main;
755
756   am->my_client_index = ~0;
757   am->my_registration = 0;
758   am->vl_input_queue = 0;
759   am->msg_index_by_name_and_crc = 0;
760   scm->socket_fd = 0;
761
762   vl_client_api_unmap ();
763 }
764
765 static void
766 vcl_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
767 {
768   vcl_worker_t *sub_child;
769   int tries = 0;
770
771   if (child_wrk->forked_child != ~0)
772     {
773       sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
774       if (sub_child)
775         {
776           /* Wait a bit, maybe the process is going away */
777           while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
778             usleep (1e3);
779           if (kill (sub_child->current_pid, 0) < 0)
780             vcl_cleanup_forked_child (child_wrk, sub_child);
781         }
782     }
783   vcl_worker_cleanup (child_wrk, 1 /* notify vpp */ );
784   VDBG (0, "Cleaned up wrk %u", child_wrk->wrk_index);
785   wrk->forked_child = ~0;
786 }
787
788 static struct sigaction old_sa;
789
790 static void
791 vcl_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
792 {
793   vcl_worker_t *wrk, *child_wrk;
794
795   if (vcl_get_worker_index () == ~0)
796     return;
797
798   sigaction (SIGCHLD, &old_sa, 0);
799
800   wrk = vcl_worker_get_current ();
801   if (wrk->forked_child == ~0)
802     return;
803
804   child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
805   if (!child_wrk)
806     goto done;
807
808   if (si && si->si_pid != child_wrk->current_pid)
809     {
810       VDBG (0, "unexpected child pid %u", si->si_pid);
811       goto done;
812     }
813   vcl_cleanup_forked_child (wrk, child_wrk);
814
815 done:
816   if (old_sa.sa_flags & SA_SIGINFO)
817     {
818       void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
819       fn (signum, si, uc);
820     }
821   else
822     {
823       void (*fn) (int) = old_sa.sa_handler;
824       if (fn)
825         fn (signum);
826     }
827 }
828
829 static void
830 vcl_incercept_sigchld ()
831 {
832   struct sigaction sa;
833   clib_memset (&sa, 0, sizeof (sa));
834   sa.sa_sigaction = vcl_intercept_sigchld_handler;
835   sa.sa_flags = SA_SIGINFO;
836   if (sigaction (SIGCHLD, &sa, &old_sa))
837     {
838       VERR ("couldn't intercept sigchld");
839       exit (-1);
840     }
841 }
842
843 static void
844 vcl_app_pre_fork (void)
845 {
846   vcl_incercept_sigchld ();
847 }
848
849 static void
850 vcl_app_fork_child_handler (void)
851 {
852   int rv, parent_wrk_index;
853   vcl_worker_t *parent_wrk;
854   u8 *child_name;
855
856   parent_wrk_index = vcl_get_worker_index ();
857   VDBG (0, "initializing forked child with parent wrk %u", parent_wrk_index);
858
859   /*
860    * Allocate worker
861    */
862   vcl_set_worker_index (~0);
863   if (!vcl_worker_alloc_and_init ())
864     VERR ("couldn't allocate new worker");
865
866   /*
867    * Attach to binary api
868    */
869   child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
870   vcl_cleanup_bapi ();
871   vppcom_api_hookup ();
872   vcm->app_state = STATE_APP_START;
873   rv = vppcom_connect_to_vpp ((char *) child_name);
874   vec_free (child_name);
875   if (rv)
876     {
877       VERR ("couldn't connect to VPP!");
878       return;
879     }
880
881   /*
882    * Register worker with vpp and share sessions
883    */
884   vcl_worker_register_with_vpp ();
885   parent_wrk = vcl_worker_get (parent_wrk_index);
886   vcl_worker_share_sessions (parent_wrk);
887   parent_wrk->forked_child = vcl_get_worker_index ();
888
889   VDBG (0, "forked child main worker initialized");
890   vcm->forking = 0;
891 }
892
893 static void
894 vcl_app_fork_parent_handler (void)
895 {
896   vcm->forking = 1;
897   while (vcm->forking)
898     ;
899 }
900
901 /**
902  * Handle app exit
903  *
904  * Notify vpp of the disconnect and mark the worker as free. If we're the
905  * last worker, do a full cleanup otherwise, since we're probably a forked
906  * child, avoid syscalls as much as possible. We might've lost privileges.
907  */
908 void
909 vppcom_app_exit (void)
910 {
911   if (!pool_elts (vcm->workers))
912     return;
913   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
914   vcl_set_worker_index (~0);
915   vcl_elog_stop (vcm);
916   if (vec_len (vcm->workers) == 1)
917     vl_client_disconnect_from_vlib ();
918   else
919     vl_client_send_disconnect (1 /* vpp should cleanup */ );
920 }
921
922 /*
923  * VPPCOM Public API functions
924  */
925 int
926 vppcom_app_create (char *app_name)
927 {
928   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
929   int rv;
930
931   if (vcm->is_init)
932     {
933       VDBG (1, "already initialized");
934       return VPPCOM_EEXIST;
935     }
936
937   vcm->is_init = 1;
938   vppcom_cfg (&vcm->cfg);
939   vcl_cfg = &vcm->cfg;
940
941   vcm->main_cpu = pthread_self ();
942   vcm->main_pid = getpid ();
943   vcm->app_name = format (0, "%s", app_name);
944   vppcom_init_error_string_table ();
945   svm_fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
946                               20 /* timeout in secs */ );
947   pool_alloc (vcm->workers, vcl_cfg->max_workers);
948   clib_spinlock_init (&vcm->workers_lock);
949   clib_rwlock_init (&vcm->segment_table_lock);
950   pthread_atfork (vcl_app_pre_fork, vcl_app_fork_parent_handler,
951                   vcl_app_fork_child_handler);
952   atexit (vppcom_app_exit);
953
954   /* Allocate default worker */
955   vcl_worker_alloc_and_init ();
956
957   /* API hookup and connect to VPP */
958   vppcom_api_hookup ();
959   vcl_elog_init (vcm);
960   vcm->app_state = STATE_APP_START;
961   rv = vppcom_connect_to_vpp (app_name);
962   if (rv)
963     {
964       VERR ("couldn't connect to VPP!");
965       return rv;
966     }
967   VDBG (0, "sending session enable");
968   rv = vppcom_app_session_enable ();
969   if (rv)
970     {
971       VERR ("vppcom_app_session_enable() failed!");
972       return rv;
973     }
974
975   VDBG (0, "sending app attach");
976   rv = vppcom_app_attach ();
977   if (rv)
978     {
979       VERR ("vppcom_app_attach() failed!");
980       return rv;
981     }
982
983   VDBG (0, "app_name '%s', my_client_index %d (0x%x)", app_name,
984         vcm->workers[0].my_client_index, vcm->workers[0].my_client_index);
985
986   return VPPCOM_OK;
987 }
988
989 void
990 vppcom_app_destroy (void)
991 {
992   int rv;
993   f64 orig_app_timeout;
994
995   if (!pool_elts (vcm->workers))
996     return;
997
998   vcl_evt (VCL_EVT_DETACH, vcm);
999
1000   if (pool_elts (vcm->workers) == 1)
1001     {
1002       vppcom_app_send_detach ();
1003       orig_app_timeout = vcm->cfg.app_timeout;
1004       vcm->cfg.app_timeout = 2.0;
1005       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
1006       vcm->cfg.app_timeout = orig_app_timeout;
1007       if (PREDICT_FALSE (rv))
1008         VDBG (0, "application detach timed out! returning %d (%s)", rv,
1009               vppcom_retval_str (rv));
1010       vec_free (vcm->app_name);
1011       vcl_worker_cleanup (vcl_worker_get_current (), 0 /* notify vpp */ );
1012     }
1013   else
1014     {
1015       vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
1016     }
1017
1018   vcl_set_worker_index (~0);
1019   vcl_elog_stop (vcm);
1020   vl_client_disconnect_from_vlib ();
1021 }
1022
1023 int
1024 vppcom_session_create (u8 proto, u8 is_nonblocking)
1025 {
1026   vcl_worker_t *wrk = vcl_worker_get_current ();
1027   vcl_session_t *session;
1028
1029   session = vcl_session_alloc (wrk);
1030
1031   session->session_type = proto;
1032   session->session_state = STATE_START;
1033   session->vpp_handle = ~0;
1034   session->is_dgram = proto == VPPCOM_PROTO_UDP;
1035
1036   if (is_nonblocking)
1037     VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
1038
1039   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
1040            is_nonblocking, session_index);
1041
1042   VDBG (0, "created sid %u", session->session_index);
1043
1044   return vcl_session_handle (session);
1045 }
1046
1047 int
1048 vppcom_session_close (uint32_t session_handle)
1049 {
1050   vcl_worker_t *wrk = vcl_worker_get_current ();
1051   u8 is_vep, do_disconnect = 1;
1052   vcl_session_t *session = 0;
1053   session_state_t state;
1054   u32 next_sh, vep_sh;
1055   int rv = VPPCOM_OK;
1056   u64 vpp_handle;
1057
1058   session = vcl_session_get_w_handle (wrk, session_handle);
1059   if (!session)
1060     return VPPCOM_EBADFD;
1061
1062   if (session->shared_index != ~0)
1063     do_disconnect = vcl_worker_unshare_session (wrk, session);
1064
1065   is_vep = session->is_vep;
1066   next_sh = session->vep.next_sh;
1067   vep_sh = session->vep.vep_sh;
1068   state = session->session_state;
1069   vpp_handle = session->vpp_handle;
1070
1071   VDBG (1, "closing session handle %u vpp handle %u", session_handle,
1072         vpp_handle);
1073
1074   if (is_vep)
1075     {
1076       while (next_sh != ~0)
1077         {
1078           rv = vppcom_epoll_ctl (session_handle, EPOLL_CTL_DEL, next_sh, 0);
1079           if (PREDICT_FALSE (rv < 0))
1080             VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u"
1081                   " failed! rv %d (%s)", vpp_handle, next_sh, vep_sh, rv,
1082                   vppcom_retval_str (rv));
1083
1084           next_sh = session->vep.next_sh;
1085         }
1086     }
1087   else
1088     {
1089       if (session->is_vep_session)
1090         {
1091           rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, session_handle, 0);
1092           if (rv < 0)
1093             VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u "
1094                   "failed! rv %d (%s)", vpp_handle, session_handle, vep_sh,
1095                   rv, vppcom_retval_str (rv));
1096         }
1097
1098       if (!do_disconnect)
1099         goto cleanup;
1100
1101       if (state & STATE_LISTEN)
1102         {
1103           rv = vppcom_session_unbind (session_handle);
1104           if (PREDICT_FALSE (rv < 0))
1105             VDBG (0, "vpp handle 0x%llx, sid %u: listener unbind failed! "
1106                   "rv %d (%s)", vpp_handle, session_handle, rv,
1107                   vppcom_retval_str (rv));
1108         }
1109       else if (state & STATE_OPEN)
1110         {
1111           rv = vppcom_session_disconnect (session_handle);
1112           if (PREDICT_FALSE (rv < 0))
1113             clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1114                           "session disconnect failed! rv %d (%s)",
1115                           getpid (), vpp_handle, session_handle,
1116                           rv, vppcom_retval_str (rv));
1117         }
1118     }
1119
1120 cleanup:
1121
1122   if (vcl_session_is_ct (session))
1123     {
1124       vcl_cut_through_registration_t *ctr;
1125       uword mq_addr;
1126
1127       mq_addr = pointer_to_uword (session->our_evt_q);
1128       ctr = vcl_ct_registration_lock_and_lookup (wrk, mq_addr);
1129       ASSERT (ctr);
1130       if (ctr->epoll_evt_conn_index != ~0)
1131         vcl_mq_epoll_del_evfd (wrk, ctr->epoll_evt_conn_index);
1132       VDBG (0, "Removing ct registration %u",
1133             vcl_ct_registration_index (wrk, ctr));
1134       vcl_ct_registration_del (wrk, ctr);
1135       vcl_ct_registration_lookup_del (wrk, mq_addr);
1136       vcl_ct_registration_unlock (wrk);
1137     }
1138
1139   if (vpp_handle != ~0)
1140     {
1141       vcl_session_table_del_vpp_handle (wrk, vpp_handle);
1142     }
1143   vcl_session_free (wrk, session);
1144
1145   VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle);
1146
1147   vcl_evt (VCL_EVT_CLOSE, session, rv);
1148
1149   return rv;
1150 }
1151
1152 int
1153 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
1154 {
1155   vcl_worker_t *wrk = vcl_worker_get_current ();
1156   vcl_session_t *session = 0;
1157
1158   if (!ep || !ep->ip)
1159     return VPPCOM_EINVAL;
1160
1161   session = vcl_session_get_w_handle (wrk, session_handle);
1162   if (!session)
1163     return VPPCOM_EBADFD;
1164
1165   if (session->is_vep)
1166     {
1167       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1168                     "bind to an epoll session!", getpid (), session_handle);
1169       return VPPCOM_EBADFD;
1170     }
1171
1172   session->transport.is_ip4 = ep->is_ip4;
1173   if (ep->is_ip4)
1174     clib_memcpy_fast (&session->transport.lcl_ip.ip4, ep->ip,
1175                       sizeof (ip4_address_t));
1176   else
1177     clib_memcpy_fast (&session->transport.lcl_ip.ip6, ep->ip,
1178                       sizeof (ip6_address_t));
1179   session->transport.lcl_port = ep->port;
1180
1181   VDBG (0, "VCL<%d>: sid %u: binding to local %s address %U port %u, "
1182         "proto %s", getpid (), session_handle,
1183         session->transport.is_ip4 ? "IPv4" : "IPv6",
1184         format_ip46_address, &session->transport.lcl_ip,
1185         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1186         clib_net_to_host_u16 (session->transport.lcl_port),
1187         session->session_type ? "UDP" : "TCP");
1188   vcl_evt (VCL_EVT_BIND, session);
1189
1190   if (session->session_type == VPPCOM_PROTO_UDP)
1191     vppcom_session_listen (session_handle, 10);
1192
1193   return VPPCOM_OK;
1194 }
1195
1196 int
1197 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
1198 {
1199   vcl_worker_t *wrk = vcl_worker_get_current ();
1200   vcl_session_t *listen_session = 0;
1201   u64 listen_vpp_handle;
1202   int rv;
1203
1204   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1205   if (!listen_session || listen_session->is_vep)
1206     return VPPCOM_EBADFD;
1207
1208   if (q_len == 0 || q_len == ~0)
1209     q_len = vcm->cfg.listen_queue_size;
1210
1211   listen_vpp_handle = listen_session->vpp_handle;
1212   if (listen_session->session_state & STATE_LISTEN)
1213     {
1214       VDBG (0, "session %u [0x%llx]: already in listen state!",
1215             listen_sh, listen_vpp_handle);
1216       return VPPCOM_OK;
1217     }
1218
1219   VDBG (0, "session %u [0x%llx]: sending vpp listen request...",
1220         listen_sh, listen_vpp_handle);
1221
1222   /*
1223    * Send listen request to vpp and wait for reply
1224    */
1225   vppcom_send_bind_sock (listen_session);
1226   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1227                                              STATE_LISTEN,
1228                                              vcm->cfg.session_timeout);
1229
1230   if (PREDICT_FALSE (rv))
1231     {
1232       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1233       VDBG (0, "session %u [0x%llx]: listen failed! returning %d (%s)",
1234             listen_sh, listen_session->vpp_handle, rv,
1235             vppcom_retval_str (rv));
1236       return rv;
1237     }
1238
1239   return VPPCOM_OK;
1240 }
1241
1242 static int
1243 validate_args_session_accept_ (vcl_worker_t * wrk,
1244                                vcl_session_t * listen_session)
1245 {
1246   /* Input validation - expects spinlock on sessions_lockp */
1247   if (listen_session->is_vep)
1248     {
1249       clib_warning ("VCL<%d>: ERROR: sid %u: cannot accept on an "
1250                     "epoll session!", getpid (),
1251                     listen_session->session_index);
1252       return VPPCOM_EBADFD;
1253     }
1254
1255   if (listen_session->session_state != STATE_LISTEN)
1256     {
1257       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1258                     "not in listen state! state 0x%x (%s)", getpid (),
1259                     listen_session->vpp_handle, listen_session->session_index,
1260                     listen_session->session_state,
1261                     vppcom_session_state_str (listen_session->session_state));
1262       return VPPCOM_EBADFD;
1263     }
1264   return VPPCOM_OK;
1265 }
1266
1267 int
1268 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1269                        uint32_t flags)
1270 {
1271   u32 client_session_index = ~0, listen_session_index, accept_flags = 0;
1272   vcl_worker_t *wrk = vcl_worker_get_current ();
1273   session_accepted_msg_t accepted_msg;
1274   vcl_session_t *listen_session = 0;
1275   vcl_session_t *client_session = 0;
1276   svm_msg_q_t *vpp_evt_q;
1277   vcl_session_msg_t *evt;
1278   u64 listen_vpp_handle;
1279   svm_msg_q_msg_t msg;
1280   session_event_t *e;
1281   u8 is_nonblocking;
1282   int rv;
1283
1284   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1285   if (!listen_session)
1286     return VPPCOM_EBADFD;
1287
1288   listen_session_index = listen_session->session_index;
1289   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1290     return rv;
1291
1292   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1293     {
1294       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1295       accept_flags = evt->flags;
1296       accepted_msg = evt->accepted_msg;
1297       goto handle;
1298     }
1299
1300   is_nonblocking = VCL_SESS_ATTR_TEST (listen_session->attr,
1301                                        VCL_SESS_ATTR_NONBLOCK);
1302   if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1303     return VPPCOM_EAGAIN;
1304
1305   while (1)
1306     {
1307       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1308         return VPPCOM_EAGAIN;
1309
1310       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1311       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1312         {
1313           clib_warning ("discarded event: %u", e->event_type);
1314           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1315           continue;
1316         }
1317       clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
1318       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1319       break;
1320     }
1321
1322 handle:
1323
1324   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg);
1325   listen_session = vcl_session_get (wrk, listen_session_index);
1326   client_session = vcl_session_get (wrk, client_session_index);
1327
1328   if (flags & O_NONBLOCK)
1329     VCL_SESS_ATTR_SET (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
1330
1331   listen_vpp_handle = listen_session->vpp_handle;
1332   VDBG (1, "vpp handle 0x%llx, sid %u: Got a client request! "
1333         "vpp handle 0x%llx, sid %u, flags %d, is_nonblocking %u",
1334         listen_vpp_handle, listen_session_handle,
1335         client_session->vpp_handle, client_session_index,
1336         flags, VCL_SESS_ATTR_TEST (client_session->attr,
1337                                    VCL_SESS_ATTR_NONBLOCK));
1338
1339   if (ep)
1340     {
1341       ep->is_ip4 = client_session->transport.is_ip4;
1342       ep->port = client_session->transport.rmt_port;
1343       if (client_session->transport.is_ip4)
1344         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip4,
1345                           sizeof (ip4_address_t));
1346       else
1347         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip6,
1348                           sizeof (ip6_address_t));
1349     }
1350
1351   if (accepted_msg.server_event_queue_address)
1352     vpp_evt_q = uword_to_pointer (accepted_msg.vpp_event_queue_address,
1353                                   svm_msg_q_t *);
1354   else
1355     vpp_evt_q = client_session->vpp_evt_q;
1356
1357   vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
1358                                    client_session->vpp_handle, 0);
1359
1360   VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
1361         "local: %U:%u", listen_session_handle, listen_vpp_handle,
1362         client_session_index, client_session->vpp_handle,
1363         format_ip46_address, &client_session->transport.rmt_ip,
1364         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1365         clib_net_to_host_u16 (client_session->transport.rmt_port),
1366         format_ip46_address, &client_session->transport.lcl_ip,
1367         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1368         clib_net_to_host_u16 (client_session->transport.lcl_port));
1369   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1370            client_session_index);
1371
1372   /*
1373    * Session might have been closed already
1374    */
1375   if (accept_flags)
1376     {
1377       svm_msg_q_t *mq = vcl_session_vpp_evt_q (wrk, client_session);
1378       if (accept_flags & VCL_ACCEPTED_F_CLOSED)
1379         {
1380           client_session->session_state = STATE_DISCONNECT;
1381           vcl_send_session_disconnected_reply (mq, wrk->my_client_index,
1382                                                client_session->vpp_handle, 0);
1383         }
1384       else if (accept_flags & VCL_ACCEPTED_F_RESET)
1385         {
1386           client_session->session_state = STATE_DISCONNECT;
1387           vcl_send_session_reset_reply (mq, wrk->my_client_index,
1388                                         client_session->vpp_handle, 0);
1389         }
1390     }
1391   return vcl_session_handle (client_session);
1392 }
1393
1394 int
1395 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1396 {
1397   vcl_worker_t *wrk = vcl_worker_get_current ();
1398   vcl_session_t *session = 0;
1399   u32 session_index;
1400   int rv;
1401
1402   session = vcl_session_get_w_handle (wrk, session_handle);
1403   if (!session)
1404     return VPPCOM_EBADFD;
1405   session_index = session->session_index;
1406
1407   if (PREDICT_FALSE (session->is_vep))
1408     {
1409       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1410                     "connect on an epoll session!", getpid (),
1411                     session_handle);
1412       return VPPCOM_EBADFD;
1413     }
1414
1415   if (PREDICT_FALSE (session->session_state & CLIENT_STATE_OPEN))
1416     {
1417       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: session already "
1418             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1419             getpid (), session->vpp_handle, session_handle,
1420             session->transport.is_ip4 ? "IPv4" : "IPv6",
1421             format_ip46_address,
1422             &session->transport.rmt_ip, session->transport.is_ip4 ?
1423             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1424             clib_net_to_host_u16 (session->transport.rmt_port),
1425             session->session_type ? "UDP" : "TCP", session->session_state,
1426             vppcom_session_state_str (session->session_state));
1427       return VPPCOM_OK;
1428     }
1429
1430   session->transport.is_ip4 = server_ep->is_ip4;
1431   if (session->transport.is_ip4)
1432     clib_memcpy_fast (&session->transport.rmt_ip.ip4, server_ep->ip,
1433                       sizeof (ip4_address_t));
1434   else
1435     clib_memcpy_fast (&session->transport.rmt_ip.ip6, server_ep->ip,
1436                       sizeof (ip6_address_t));
1437   session->transport.rmt_port = server_ep->port;
1438
1439   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connecting to server %s %U "
1440         "port %d proto %s",
1441         getpid (), session->vpp_handle, session_handle,
1442         session->transport.is_ip4 ? "IPv4" : "IPv6",
1443         format_ip46_address,
1444         &session->transport.rmt_ip, session->transport.is_ip4 ?
1445         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1446         clib_net_to_host_u16 (session->transport.rmt_port),
1447         session->session_type ? "UDP" : "TCP");
1448
1449   /*
1450    * Send connect request and wait for reply from vpp
1451    */
1452   vppcom_send_connect_sock (session);
1453   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
1454                                              vcm->cfg.session_timeout);
1455
1456   session = vcl_session_get (wrk, session_index);
1457
1458   if (PREDICT_FALSE (rv))
1459     {
1460       if (VPPCOM_DEBUG > 0)
1461         {
1462           if (session)
1463             clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: connect "
1464                           "failed! returning %d (%s)", getpid (),
1465                           session->vpp_handle, session_handle, rv,
1466                           vppcom_retval_str (rv));
1467           else
1468             clib_warning ("VCL<%d>: no session for sid %u: connect failed! "
1469                           "returning %d (%s)", getpid (),
1470                           session_handle, rv, vppcom_retval_str (rv));
1471         }
1472     }
1473   else
1474     VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connected!",
1475           getpid (), session->vpp_handle, session_handle);
1476
1477   return rv;
1478 }
1479
1480 static u8
1481 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1482 {
1483   if (!is_ct)
1484     return (e->event_type == FIFO_EVENT_APP_RX
1485             && e->fifo->client_session_index == sid);
1486   else
1487     return (e->event_type == SESSION_IO_EVT_CT_TX);
1488 }
1489
1490 static inline u8
1491 vcl_session_is_readable (vcl_session_t * s)
1492 {
1493   return ((s->session_state & STATE_OPEN)
1494           || (s->session_state == STATE_LISTEN
1495               && s->session_type == VPPCOM_PROTO_UDP));
1496 }
1497
1498 static inline int
1499 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1500                               u8 peek)
1501 {
1502   vcl_worker_t *wrk = vcl_worker_get_current ();
1503   int n_read = 0, rv, is_nonblocking;
1504   vcl_session_t *s = 0;
1505   svm_fifo_t *rx_fifo;
1506   svm_msg_q_msg_t msg;
1507   session_event_t *e;
1508   svm_msg_q_t *mq;
1509   u8 is_ct;
1510
1511   if (PREDICT_FALSE (!buf))
1512     return VPPCOM_EINVAL;
1513
1514   s = vcl_session_get_w_handle (wrk, session_handle);
1515   if (PREDICT_FALSE (!s || s->is_vep))
1516     return VPPCOM_EBADFD;
1517
1518   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
1519     {
1520       session_state_t state = s->session_state;
1521       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1522
1523       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
1524             "state 0x%x (%s), returning %d (%s)",
1525             getpid (), s->vpp_handle, session_handle, state,
1526             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1527       return rv;
1528     }
1529
1530   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1531   is_ct = vcl_session_is_ct (s);
1532   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1533   rx_fifo = s->rx_fifo;
1534   s->has_rx_evt = 0;
1535
1536   if (svm_fifo_is_empty (rx_fifo))
1537     {
1538       if (is_nonblocking)
1539         {
1540           svm_fifo_unset_event (rx_fifo);
1541           return VPPCOM_EWOULDBLOCK;
1542         }
1543       while (svm_fifo_is_empty (rx_fifo))
1544         {
1545           svm_fifo_unset_event (rx_fifo);
1546           svm_msg_q_lock (mq);
1547           if (svm_msg_q_is_empty (mq))
1548             svm_msg_q_wait (mq);
1549
1550           svm_msg_q_sub_w_lock (mq, &msg);
1551           e = svm_msg_q_msg_data (mq, &msg);
1552           svm_msg_q_unlock (mq);
1553           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1554             {
1555               vcl_handle_mq_event (wrk, e);
1556               svm_msg_q_free_msg (mq, &msg);
1557               continue;
1558             }
1559           svm_msg_q_free_msg (mq, &msg);
1560
1561           if (PREDICT_FALSE (s->session_state == STATE_VPP_CLOSING))
1562             return 0;
1563         }
1564     }
1565
1566   if (s->is_dgram)
1567     n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1568   else
1569     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1570
1571   if (svm_fifo_is_empty (rx_fifo))
1572     svm_fifo_unset_event (rx_fifo);
1573
1574   if (is_ct && svm_fifo_want_tx_evt (rx_fifo))
1575     {
1576       svm_fifo_set_want_tx_evt (s->rx_fifo, 0);
1577       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
1578                               SVM_Q_WAIT);
1579     }
1580
1581   VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
1582         getpid (), s->vpp_handle, session_handle, n_read, rx_fifo);
1583
1584   return n_read;
1585 }
1586
1587 int
1588 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1589 {
1590   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1591 }
1592
1593 static int
1594 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1595 {
1596   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1597 }
1598
1599 int
1600 vppcom_session_read_segments (uint32_t session_handle,
1601                               vppcom_data_segments_t ds)
1602 {
1603   vcl_worker_t *wrk = vcl_worker_get_current ();
1604   int n_read = 0, rv, is_nonblocking;
1605   vcl_session_t *s = 0;
1606   svm_fifo_t *rx_fifo;
1607   svm_msg_q_msg_t msg;
1608   session_event_t *e;
1609   svm_msg_q_t *mq;
1610   u8 is_ct;
1611
1612   s = vcl_session_get_w_handle (wrk, session_handle);
1613   if (PREDICT_FALSE (!s || s->is_vep))
1614     return VPPCOM_EBADFD;
1615
1616   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
1617     {
1618       session_state_t state = s->session_state;
1619       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1620       return rv;
1621     }
1622
1623   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1624   is_ct = vcl_session_is_ct (s);
1625   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1626   rx_fifo = s->rx_fifo;
1627   s->has_rx_evt = 0;
1628
1629   if (svm_fifo_is_empty (rx_fifo))
1630     {
1631       if (is_nonblocking)
1632         {
1633           svm_fifo_unset_event (rx_fifo);
1634           return VPPCOM_EWOULDBLOCK;
1635         }
1636       while (svm_fifo_is_empty (rx_fifo))
1637         {
1638           svm_fifo_unset_event (rx_fifo);
1639           svm_msg_q_lock (mq);
1640           if (svm_msg_q_is_empty (mq))
1641             svm_msg_q_wait (mq);
1642
1643           svm_msg_q_sub_w_lock (mq, &msg);
1644           e = svm_msg_q_msg_data (mq, &msg);
1645           svm_msg_q_unlock (mq);
1646           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1647             {
1648               vcl_handle_mq_event (wrk, e);
1649               svm_msg_q_free_msg (mq, &msg);
1650               continue;
1651             }
1652           svm_msg_q_free_msg (mq, &msg);
1653
1654           if (PREDICT_FALSE (s->session_state == STATE_VPP_CLOSING))
1655             return 0;
1656         }
1657     }
1658
1659   n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
1660   svm_fifo_unset_event (rx_fifo);
1661
1662   if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
1663     {
1664       /* If the peer is not polling send notification */
1665       if (!svm_fifo_has_event (s->rx_fifo))
1666         app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
1667                                 SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
1668     }
1669
1670   return n_read;
1671 }
1672
1673 void
1674 vppcom_session_free_segments (uint32_t session_handle,
1675                               vppcom_data_segments_t ds)
1676 {
1677   vcl_worker_t *wrk = vcl_worker_get_current ();
1678   vcl_session_t *s;
1679
1680   s = vcl_session_get_w_handle (wrk, session_handle);
1681   if (PREDICT_FALSE (!s || s->is_vep))
1682     return;
1683
1684   svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
1685 }
1686
1687 static inline int
1688 vppcom_session_read_ready (vcl_session_t * session)
1689 {
1690   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1691   if (PREDICT_FALSE (session->is_vep))
1692     {
1693       clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
1694                     "epoll session!", getpid (), session->session_index);
1695       return VPPCOM_EBADFD;
1696     }
1697
1698   if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
1699     {
1700       session_state_t state = session->session_state;
1701       int rv;
1702
1703       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1704
1705       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
1706             " state 0x%x (%s), returning %d (%s)", getpid (),
1707             session->vpp_handle, session->session_index, state,
1708             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1709       return rv;
1710     }
1711
1712   if (session->session_state & STATE_LISTEN)
1713     return clib_fifo_elts (session->accept_evts_fifo);
1714
1715   return svm_fifo_max_dequeue (session->rx_fifo);
1716 }
1717
1718 int
1719 vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
1720 {
1721   u32 first_copy = clib_min (ds[0].len, max_bytes);
1722   clib_memcpy_fast (buf, ds[0].data, first_copy);
1723   if (first_copy < max_bytes)
1724     {
1725       clib_memcpy_fast (buf + first_copy, ds[1].data,
1726                         clib_min (ds[1].len, max_bytes - first_copy));
1727     }
1728   return 0;
1729 }
1730
1731 static u8
1732 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1733 {
1734   if (!is_ct)
1735     return (e->event_type == FIFO_EVENT_APP_TX
1736             && e->fifo->client_session_index == sid);
1737   else
1738     return (e->event_type == SESSION_IO_EVT_CT_RX);
1739 }
1740
1741 static inline int
1742 vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
1743                              u8 is_flush)
1744 {
1745   vcl_worker_t *wrk = vcl_worker_get_current ();
1746   int rv, n_write, is_nonblocking;
1747   vcl_session_t *s = 0;
1748   svm_fifo_t *tx_fifo = 0;
1749   session_evt_type_t et;
1750   svm_msg_q_msg_t msg;
1751   session_event_t *e;
1752   svm_msg_q_t *mq;
1753   u8 is_ct;
1754
1755   if (PREDICT_FALSE (!buf))
1756     return VPPCOM_EINVAL;
1757
1758   s = vcl_session_get_w_handle (wrk, session_handle);
1759   if (PREDICT_FALSE (!s))
1760     return VPPCOM_EBADFD;
1761
1762   if (PREDICT_FALSE (s->is_vep))
1763     {
1764       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1765                     "cannot write to an epoll session!",
1766                     getpid (), s->vpp_handle, session_handle);
1767
1768       return VPPCOM_EBADFD;
1769     }
1770
1771   if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
1772     {
1773       session_state_t state = s->session_state;
1774       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1775       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
1776             "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
1777             state, vppcom_session_state_str (state));
1778       return rv;
1779     }
1780
1781   tx_fifo = s->tx_fifo;
1782   is_ct = vcl_session_is_ct (s);
1783   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1784   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1785   if (svm_fifo_is_full (tx_fifo))
1786     {
1787       if (is_nonblocking)
1788         {
1789           return VPPCOM_EWOULDBLOCK;
1790         }
1791       while (svm_fifo_is_full (tx_fifo))
1792         {
1793           svm_fifo_set_want_tx_evt (tx_fifo, 1);
1794           svm_msg_q_lock (mq);
1795           if (svm_msg_q_is_empty (mq))
1796             svm_msg_q_wait (mq);
1797
1798           svm_msg_q_sub_w_lock (mq, &msg);
1799           e = svm_msg_q_msg_data (mq, &msg);
1800           svm_msg_q_unlock (mq);
1801
1802           if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
1803             vcl_handle_mq_event (wrk, e);
1804           svm_msg_q_free_msg (mq, &msg);
1805         }
1806     }
1807
1808   if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
1809     return VPPCOM_ECONNRESET;
1810
1811   ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
1812   et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
1813   if (is_flush && !vcl_session_is_ct (s))
1814     et = SESSION_IO_EVT_TX_FLUSH;
1815
1816   if (s->is_dgram)
1817     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
1818                                   s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
1819   else
1820     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
1821                                    SVM_Q_WAIT);
1822
1823   ASSERT (n_write > 0);
1824
1825   VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
1826         s->vpp_handle, session_handle, n_write);
1827
1828   return n_write;
1829 }
1830
1831 int
1832 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
1833 {
1834   return vppcom_session_write_inline (session_handle, buf, n,
1835                                       0 /* is_flush */ );
1836 }
1837
1838 static vcl_session_t *
1839 vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
1840 {
1841   vcl_session_t *s;
1842   s = vcl_session_get (wrk, f->client_session_index);
1843   if (s)
1844     {
1845       /* rx fifo */
1846       if (type == 0 && s->rx_fifo == f)
1847         return s;
1848       /* tx fifo */
1849       if (type == 1 && s->tx_fifo == f)
1850         return s;
1851     }
1852   s = vcl_session_get (wrk, f->master_session_index);
1853   if (s)
1854     {
1855       if (type == 0 && s->rx_fifo == f)
1856         return s;
1857       if (type == 1 && s->tx_fifo == f)
1858         return s;
1859     }
1860   return 0;
1861 }
1862
1863 static inline int
1864 vppcom_session_write_ready (vcl_session_t * session)
1865 {
1866   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1867   if (PREDICT_FALSE (session->is_vep))
1868     {
1869       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1870                     "cannot write to an epoll session!",
1871                     getpid (), session->vpp_handle, session->session_index);
1872       return VPPCOM_EBADFD;
1873     }
1874
1875   if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
1876     {
1877       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1878                     "cannot write to a listen session!",
1879                     getpid (), session->vpp_handle, session->session_index);
1880       return VPPCOM_EBADFD;
1881     }
1882
1883   if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
1884     {
1885       session_state_t state = session->session_state;
1886       int rv;
1887
1888       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1889       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1890                     "session is not open! state 0x%x (%s), "
1891                     "returning %d (%s)", getpid (), session->vpp_handle,
1892                     session->session_index,
1893                     state, vppcom_session_state_str (state),
1894                     rv, vppcom_retval_str (rv));
1895       return rv;
1896     }
1897
1898   VDBG (3, "VCL<%d>: vpp handle 0x%llx, sid %u: peek %s (%p), ready = %d",
1899         getpid (), session->vpp_handle, session->session_index,
1900         session->tx_fifo, svm_fifo_max_enqueue (session->tx_fifo));
1901
1902   return svm_fifo_max_enqueue (session->tx_fifo);
1903 }
1904
1905 static inline int
1906 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
1907 {
1908   svm_msg_q_msg_t *msg;
1909   u32 n_msgs;
1910   int i;
1911
1912   n_msgs = svm_msg_q_size (mq);
1913   for (i = 0; i < n_msgs; i++)
1914     {
1915       vec_add2 (wrk->mq_msg_vector, msg, 1);
1916       svm_msg_q_sub_w_lock (mq, msg);
1917     }
1918   return n_msgs;
1919 }
1920
1921 #define vcl_fifo_rx_evt_valid_or_break(_fifo)                   \
1922 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                  \
1923   {                                                             \
1924     svm_fifo_unset_event (_fifo);                               \
1925     if (svm_fifo_is_empty (_fifo))                              \
1926       break;                                                    \
1927   }                                                             \
1928
1929 static void
1930 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
1931                             unsigned long n_bits, unsigned long *read_map,
1932                             unsigned long *write_map,
1933                             unsigned long *except_map, u32 * bits_set)
1934 {
1935   session_disconnected_msg_t *disconnected_msg;
1936   session_connected_msg_t *connected_msg;
1937   vcl_session_t *session;
1938   u32 sid;
1939
1940   switch (e->event_type)
1941     {
1942     case FIFO_EVENT_APP_RX:
1943       vcl_fifo_rx_evt_valid_or_break (e->fifo);
1944       sid = e->fifo->client_session_index;
1945       session = vcl_session_get (wrk, sid);
1946       if (!session)
1947         break;
1948       if (sid < n_bits && read_map)
1949         {
1950           clib_bitmap_set_no_check (read_map, sid, 1);
1951           *bits_set += 1;
1952         }
1953       break;
1954     case FIFO_EVENT_APP_TX:
1955       sid = e->fifo->client_session_index;
1956       session = vcl_session_get (wrk, sid);
1957       if (!session)
1958         break;
1959       if (sid < n_bits && write_map)
1960         {
1961           clib_bitmap_set_no_check (write_map, sid, 1);
1962           *bits_set += 1;
1963         }
1964       break;
1965     case SESSION_IO_EVT_CT_TX:
1966       vcl_fifo_rx_evt_valid_or_break (e->fifo);
1967       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
1968       if (!session)
1969         break;
1970       sid = session->session_index;
1971       if (sid < n_bits && read_map)
1972         {
1973           clib_bitmap_set_no_check (read_map, sid, 1);
1974           *bits_set += 1;
1975         }
1976       break;
1977     case SESSION_IO_EVT_CT_RX:
1978       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
1979       if (!session)
1980         break;
1981       sid = session->session_index;
1982       if (sid < n_bits && write_map)
1983         {
1984           clib_bitmap_set_no_check (write_map, sid, 1);
1985           *bits_set += 1;
1986         }
1987       break;
1988     case SESSION_CTRL_EVT_ACCEPTED:
1989       session = vcl_session_accepted (wrk,
1990                                       (session_accepted_msg_t *) e->data);
1991       if (!session)
1992         break;
1993       sid = session->session_index;
1994       if (sid < n_bits && read_map)
1995         {
1996           clib_bitmap_set_no_check (read_map, sid, 1);
1997           *bits_set += 1;
1998         }
1999       break;
2000     case SESSION_CTRL_EVT_CONNECTED:
2001       connected_msg = (session_connected_msg_t *) e->data;
2002       vcl_session_connected_handler (wrk, connected_msg);
2003       break;
2004     case SESSION_CTRL_EVT_DISCONNECTED:
2005       disconnected_msg = (session_disconnected_msg_t *) e->data;
2006       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
2007       if (!session)
2008         break;
2009       sid = session->session_index;
2010       if (sid < n_bits && except_map)
2011         {
2012           clib_bitmap_set_no_check (except_map, sid, 1);
2013           *bits_set += 1;
2014         }
2015       break;
2016     case SESSION_CTRL_EVT_RESET:
2017       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2018       if (sid < n_bits && except_map)
2019         {
2020           clib_bitmap_set_no_check (except_map, sid, 1);
2021           *bits_set += 1;
2022         }
2023       break;
2024     default:
2025       clib_warning ("unhandled: %u", e->event_type);
2026       break;
2027     }
2028 }
2029
2030 static int
2031 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2032                       unsigned long n_bits, unsigned long *read_map,
2033                       unsigned long *write_map, unsigned long *except_map,
2034                       double time_to_wait, u32 * bits_set)
2035 {
2036   svm_msg_q_msg_t *msg;
2037   session_event_t *e;
2038   u32 i;
2039
2040   svm_msg_q_lock (mq);
2041   if (svm_msg_q_is_empty (mq))
2042     {
2043       if (*bits_set)
2044         {
2045           svm_msg_q_unlock (mq);
2046           return 0;
2047         }
2048
2049       if (!time_to_wait)
2050         {
2051           svm_msg_q_unlock (mq);
2052           return 0;
2053         }
2054       else if (time_to_wait < 0)
2055         {
2056           svm_msg_q_wait (mq);
2057         }
2058       else
2059         {
2060           if (svm_msg_q_timedwait (mq, time_to_wait))
2061             {
2062               svm_msg_q_unlock (mq);
2063               return 0;
2064             }
2065         }
2066     }
2067   vcl_mq_dequeue_batch (wrk, mq);
2068   svm_msg_q_unlock (mq);
2069
2070   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2071     {
2072       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2073       e = svm_msg_q_msg_data (mq, msg);
2074       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
2075                                   except_map, bits_set);
2076       svm_msg_q_free_msg (mq, msg);
2077     }
2078   vec_reset_length (wrk->mq_msg_vector);
2079   return *bits_set;
2080 }
2081
2082 static int
2083 vppcom_select_condvar (vcl_worker_t * wrk, unsigned long n_bits,
2084                        unsigned long *read_map, unsigned long *write_map,
2085                        unsigned long *except_map, double time_to_wait,
2086                        u32 * bits_set)
2087 {
2088   double total_wait = 0, wait_slice;
2089   vcl_cut_through_registration_t *cr;
2090
2091   time_to_wait = (time_to_wait == -1) ? 10e9 : time_to_wait;
2092   wait_slice = wrk->cut_through_registrations ? 10e-6 : time_to_wait;
2093   do
2094     {
2095       vcl_ct_registration_lock (wrk);
2096       /* *INDENT-OFF* */
2097       pool_foreach (cr, wrk->cut_through_registrations, ({
2098         vcl_select_handle_mq (wrk, cr->mq, n_bits, read_map, write_map, except_map,
2099                               0, bits_set);
2100       }));
2101       /* *INDENT-ON* */
2102       vcl_ct_registration_unlock (wrk);
2103
2104       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
2105                             write_map, except_map, time_to_wait, bits_set);
2106       total_wait += wait_slice;
2107       if (*bits_set)
2108         return *bits_set;
2109     }
2110   while (total_wait < time_to_wait);
2111
2112   return 0;
2113 }
2114
2115 static int
2116 vppcom_select_eventfd (vcl_worker_t * wrk, unsigned long n_bits,
2117                        unsigned long *read_map, unsigned long *write_map,
2118                        unsigned long *except_map, double time_to_wait,
2119                        u32 * bits_set)
2120 {
2121   vcl_mq_evt_conn_t *mqc;
2122   int __clib_unused n_read;
2123   int n_mq_evts, i;
2124   u64 buf;
2125
2126   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2127   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2128                           vec_len (wrk->mq_events), time_to_wait);
2129   for (i = 0; i < n_mq_evts; i++)
2130     {
2131       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2132       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2133       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
2134                             except_map, 0, bits_set);
2135     }
2136
2137   return (n_mq_evts > 0 ? (int) *bits_set : 0);
2138 }
2139
2140 int
2141 vppcom_select (unsigned long n_bits, unsigned long *read_map,
2142                unsigned long *write_map, unsigned long *except_map,
2143                double time_to_wait)
2144 {
2145   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
2146   vcl_worker_t *wrk = vcl_worker_get_current ();
2147   vcl_session_t *session = 0;
2148   int rv, i;
2149
2150   ASSERT (sizeof (clib_bitmap_t) == sizeof (long int));
2151
2152   if (n_bits && read_map)
2153     {
2154       clib_bitmap_validate (wrk->rd_bitmap, minbits);
2155       clib_memcpy_fast (wrk->rd_bitmap, read_map,
2156                         vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
2157       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
2158     }
2159   if (n_bits && write_map)
2160     {
2161       clib_bitmap_validate (wrk->wr_bitmap, minbits);
2162       clib_memcpy_fast (wrk->wr_bitmap, write_map,
2163                         vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
2164       memset (write_map, 0,
2165               vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
2166     }
2167   if (n_bits && except_map)
2168     {
2169       clib_bitmap_validate (wrk->ex_bitmap, minbits);
2170       clib_memcpy_fast (wrk->ex_bitmap, except_map,
2171                         vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
2172       memset (except_map, 0,
2173               vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
2174     }
2175
2176   if (!n_bits)
2177     return 0;
2178
2179   if (!write_map)
2180     goto check_rd;
2181
2182   /* *INDENT-OFF* */
2183   clib_bitmap_foreach (sid, wrk->wr_bitmap, ({
2184     if (!(session = vcl_session_get (wrk, sid)))
2185       {
2186         if (except_map && sid < minbits)
2187           clib_bitmap_set_no_check (except_map, sid, 1);
2188         continue;
2189       }
2190
2191     rv = svm_fifo_is_full (session->tx_fifo);
2192     if (!rv)
2193       {
2194         clib_bitmap_set_no_check (write_map, sid, 1);
2195         bits_set++;
2196       }
2197   }));
2198
2199 check_rd:
2200   if (!read_map)
2201     goto check_mq;
2202
2203   clib_bitmap_foreach (sid, wrk->rd_bitmap, ({
2204     if (!(session = vcl_session_get (wrk, sid)))
2205       {
2206         if (except_map && sid < minbits)
2207           clib_bitmap_set_no_check (except_map, sid, 1);
2208         continue;
2209       }
2210
2211     rv = vppcom_session_read_ready (session);
2212     if (rv)
2213       {
2214         clib_bitmap_set_no_check (read_map, sid, 1);
2215         bits_set++;
2216       }
2217   }));
2218   /* *INDENT-ON* */
2219
2220 check_mq:
2221
2222   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2223     {
2224       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
2225                                   read_map, write_map, except_map, &bits_set);
2226     }
2227   vec_reset_length (wrk->unhandled_evts_vector);
2228
2229   if (vcm->cfg.use_mq_eventfd)
2230     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
2231                            time_to_wait, &bits_set);
2232   else
2233     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
2234                            time_to_wait, &bits_set);
2235
2236   return (bits_set);
2237 }
2238
2239 static inline void
2240 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_idx)
2241 {
2242   vcl_session_t *session;
2243   vppcom_epoll_t *vep;
2244   u32 sid = vep_idx;
2245
2246   if (VPPCOM_DEBUG <= 1)
2247     return;
2248
2249   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
2250   session = vcl_session_get (wrk, vep_idx);
2251   if (PREDICT_FALSE (!session))
2252     {
2253       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!",
2254                     getpid (), vep_idx);
2255       goto done;
2256     }
2257   if (PREDICT_FALSE (!session->is_vep))
2258     {
2259       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2260                     getpid (), vep_idx);
2261       goto done;
2262     }
2263   vep = &session->vep;
2264   clib_warning ("VCL<%d>: vep_idx (%u): Dumping epoll chain\n"
2265                 "{\n"
2266                 "   is_vep         = %u\n"
2267                 "   is_vep_session = %u\n"
2268                 "   next_sid       = 0x%x (%u)\n"
2269                 "   wait_cont_idx  = 0x%x (%u)\n"
2270                 "}\n", getpid (), vep_idx,
2271                 session->is_vep, session->is_vep_session,
2272                 vep->next_sh, vep->next_sh,
2273                 session->wait_cont_idx, session->wait_cont_idx);
2274
2275   for (sid = vep->next_sh; sid != ~0; sid = vep->next_sh)
2276     {
2277       session = vcl_session_get (wrk, sid);
2278       if (PREDICT_FALSE (!session))
2279         {
2280           clib_warning ("VCL<%d>: ERROR: Invalid sid (%u)!", getpid (), sid);
2281           goto done;
2282         }
2283       if (PREDICT_FALSE (session->is_vep))
2284         clib_warning ("VCL<%d>: ERROR: sid (%u) is a vep!",
2285                       getpid (), vep_idx);
2286       else if (PREDICT_FALSE (!session->is_vep_session))
2287         {
2288           clib_warning ("VCL<%d>: ERROR: session (%u) "
2289                         "is not a vep session!", getpid (), sid);
2290           goto done;
2291         }
2292       vep = &session->vep;
2293       if (PREDICT_FALSE (vep->vep_sh != vep_idx))
2294         clib_warning ("VCL<%d>: ERROR: session (%u) vep_idx (%u) != "
2295                       "vep_idx (%u)!", getpid (),
2296                       sid, session->vep.vep_sh, vep_idx);
2297       if (session->is_vep_session)
2298         {
2299           clib_warning ("vep_idx[%u]: sid 0x%x (%u)\n"
2300                         "{\n"
2301                         "   next_sid       = 0x%x (%u)\n"
2302                         "   prev_sid       = 0x%x (%u)\n"
2303                         "   vep_idx        = 0x%x (%u)\n"
2304                         "   ev.events      = 0x%x\n"
2305                         "   ev.data.u64    = 0x%llx\n"
2306                         "   et_mask        = 0x%x\n"
2307                         "}\n",
2308                         vep_idx, sid, sid,
2309                         vep->next_sh, vep->next_sh,
2310                         vep->prev_sh, vep->prev_sh,
2311                         vep->vep_sh, vep->vep_sh,
2312                         vep->ev.events, vep->ev.data.u64, vep->et_mask);
2313         }
2314     }
2315
2316 done:
2317   clib_warning ("VCL<%d>: vep_idx (%u): Dump complete!\n",
2318                 getpid (), vep_idx);
2319 }
2320
2321 int
2322 vppcom_epoll_create (void)
2323 {
2324   vcl_worker_t *wrk = vcl_worker_get_current ();
2325   vcl_session_t *vep_session;
2326
2327   vep_session = vcl_session_alloc (wrk);
2328
2329   vep_session->is_vep = 1;
2330   vep_session->vep.vep_sh = ~0;
2331   vep_session->vep.next_sh = ~0;
2332   vep_session->vep.prev_sh = ~0;
2333   vep_session->wait_cont_idx = ~0;
2334   vep_session->vpp_handle = ~0;
2335
2336   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_sh);
2337   VDBG (0, "VCL<%d>: Created vep_idx %u / sid %u!",
2338         getpid (), vep_session->session_index, vep_session->session_index);
2339
2340   return vcl_session_handle (vep_session);
2341 }
2342
2343 int
2344 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2345                   struct epoll_event *event)
2346 {
2347   vcl_worker_t *wrk = vcl_worker_get_current ();
2348   vcl_session_t *vep_session;
2349   vcl_session_t *session;
2350   int rv = VPPCOM_OK;
2351
2352   if (vep_handle == session_handle)
2353     {
2354       clib_warning ("VCL<%d>: ERROR: vep_idx == session_index (%u)!",
2355                     getpid (), vep_handle);
2356       return VPPCOM_EINVAL;
2357     }
2358
2359   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2360   if (PREDICT_FALSE (!vep_session))
2361     {
2362       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!", vep_handle);
2363       return VPPCOM_EBADFD;
2364     }
2365   if (PREDICT_FALSE (!vep_session->is_vep))
2366     {
2367       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2368                     getpid (), vep_handle);
2369       return VPPCOM_EINVAL;
2370     }
2371
2372   ASSERT (vep_session->vep.vep_sh == ~0);
2373   ASSERT (vep_session->vep.prev_sh == ~0);
2374
2375   session = vcl_session_get_w_handle (wrk, session_handle);
2376   if (PREDICT_FALSE (!session))
2377     {
2378       VDBG (0, "VCL<%d>: ERROR: Invalid session_handle (%u)!",
2379             getpid (), session_handle);
2380       return VPPCOM_EBADFD;
2381     }
2382   if (PREDICT_FALSE (session->is_vep))
2383     {
2384       clib_warning ("ERROR: session_handle (%u) is a vep!", vep_handle);
2385       return VPPCOM_EINVAL;
2386     }
2387
2388   switch (op)
2389     {
2390     case EPOLL_CTL_ADD:
2391       if (PREDICT_FALSE (!event))
2392         {
2393           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: NULL pointer to "
2394                         "epoll_event structure!", getpid ());
2395           return VPPCOM_EINVAL;
2396         }
2397       if (vep_session->vep.next_sh != ~0)
2398         {
2399           vcl_session_t *next_session;
2400           next_session = vcl_session_get_w_handle (wrk,
2401                                                    vep_session->vep.next_sh);
2402           if (PREDICT_FALSE (!next_session))
2403             {
2404               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: Invalid "
2405                             "vep.next_sid (%u) on vep_idx (%u)!",
2406                             getpid (), vep_session->vep.next_sh, vep_handle);
2407               return VPPCOM_EBADFD;
2408             }
2409           ASSERT (next_session->vep.prev_sh == vep_handle);
2410           next_session->vep.prev_sh = session_handle;
2411         }
2412       session->vep.next_sh = vep_session->vep.next_sh;
2413       session->vep.prev_sh = vep_handle;
2414       session->vep.vep_sh = vep_handle;
2415       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2416       session->vep.ev = *event;
2417       session->is_vep = 0;
2418       session->is_vep_session = 1;
2419       vep_session->vep.next_sh = session_handle;
2420
2421       VDBG (1, "VCL<%d>: EPOLL_CTL_ADD: vep_idx %u, sid %u, events 0x%x, "
2422             "data 0x%llx!", getpid (), vep_handle, session_handle,
2423             event->events, event->data.u64);
2424       vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
2425       break;
2426
2427     case EPOLL_CTL_MOD:
2428       if (PREDICT_FALSE (!event))
2429         {
2430           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_MOD: NULL pointer to "
2431                         "epoll_event structure!", getpid ());
2432           rv = VPPCOM_EINVAL;
2433           goto done;
2434         }
2435       else if (PREDICT_FALSE (!session->is_vep_session))
2436         {
2437           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2438                         "not a vep session!", getpid (), session_handle);
2439           rv = VPPCOM_EINVAL;
2440           goto done;
2441         }
2442       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2443         {
2444           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2445                         "vep_idx (%u) != vep_idx (%u)!",
2446                         getpid (), session_handle,
2447                         session->vep.vep_sh, vep_handle);
2448           rv = VPPCOM_EINVAL;
2449           goto done;
2450         }
2451       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2452       session->vep.ev = *event;
2453       VDBG (1, "VCL<%d>: EPOLL_CTL_MOD: vep_idx %u, sid %u, events 0x%x,"
2454             " data 0x%llx!", getpid (), vep_handle, session_handle,
2455             event->events, event->data.u64);
2456       break;
2457
2458     case EPOLL_CTL_DEL:
2459       if (PREDICT_FALSE (!session->is_vep_session))
2460         {
2461           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2462                         "not a vep session!", getpid (), session_handle);
2463           rv = VPPCOM_EINVAL;
2464           goto done;
2465         }
2466       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2467         {
2468           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2469                         "vep_idx (%u) != vep_idx (%u)!",
2470                         getpid (), session_handle,
2471                         session->vep.vep_sh, vep_handle);
2472           rv = VPPCOM_EINVAL;
2473           goto done;
2474         }
2475
2476       vep_session->wait_cont_idx =
2477         (vep_session->wait_cont_idx == session_handle) ?
2478         session->vep.next_sh : vep_session->wait_cont_idx;
2479
2480       if (session->vep.prev_sh == vep_handle)
2481         vep_session->vep.next_sh = session->vep.next_sh;
2482       else
2483         {
2484           vcl_session_t *prev_session;
2485           prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh);
2486           if (PREDICT_FALSE (!prev_session))
2487             {
2488               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2489                             "vep.prev_sid (%u) on sid (%u)!",
2490                             getpid (), session->vep.prev_sh, session_handle);
2491               return VPPCOM_EBADFD;
2492             }
2493           ASSERT (prev_session->vep.next_sh == session_handle);
2494           prev_session->vep.next_sh = session->vep.next_sh;
2495         }
2496       if (session->vep.next_sh != ~0)
2497         {
2498           vcl_session_t *next_session;
2499           next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh);
2500           if (PREDICT_FALSE (!next_session))
2501             {
2502               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2503                             "vep.next_sid (%u) on sid (%u)!",
2504                             getpid (), session->vep.next_sh, session_handle);
2505               return VPPCOM_EBADFD;
2506             }
2507           ASSERT (next_session->vep.prev_sh == session_handle);
2508           next_session->vep.prev_sh = session->vep.prev_sh;
2509         }
2510
2511       memset (&session->vep, 0, sizeof (session->vep));
2512       session->vep.next_sh = ~0;
2513       session->vep.prev_sh = ~0;
2514       session->vep.vep_sh = ~0;
2515       session->is_vep_session = 0;
2516       VDBG (1, "VCL<%d>: EPOLL_CTL_DEL: vep_idx %u, sid %u!",
2517             getpid (), vep_handle, session_handle);
2518       vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
2519       break;
2520
2521     default:
2522       clib_warning ("VCL<%d>: ERROR: Invalid operation (%d)!", getpid (), op);
2523       rv = VPPCOM_EINVAL;
2524     }
2525
2526   vep_verify_epoll_chain (wrk, vep_handle);
2527
2528 done:
2529   return rv;
2530 }
2531
2532 static inline void
2533 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2534                                 struct epoll_event *events, u32 * num_ev)
2535 {
2536   session_disconnected_msg_t *disconnected_msg;
2537   session_connected_msg_t *connected_msg;
2538   u32 sid = ~0, session_events;
2539   u64 session_evt_data = ~0;
2540   vcl_session_t *session;
2541   u8 add_event = 0;
2542
2543   switch (e->event_type)
2544     {
2545     case FIFO_EVENT_APP_RX:
2546       ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
2547       vcl_fifo_rx_evt_valid_or_break (e->fifo);
2548       sid = e->fifo->client_session_index;
2549       session = vcl_session_get (wrk, sid);
2550       session_events = session->vep.ev.events;
2551       if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
2552         break;
2553       add_event = 1;
2554       events[*num_ev].events |= EPOLLIN;
2555       session_evt_data = session->vep.ev.data.u64;
2556       session->has_rx_evt = 1;
2557       break;
2558     case FIFO_EVENT_APP_TX:
2559       sid = e->fifo->client_session_index;
2560       session = vcl_session_get (wrk, sid);
2561       session_events = session->vep.ev.events;
2562       if (!(EPOLLOUT & session_events))
2563         break;
2564       add_event = 1;
2565       events[*num_ev].events |= EPOLLOUT;
2566       session_evt_data = session->vep.ev.data.u64;
2567       break;
2568     case SESSION_IO_EVT_CT_TX:
2569       vcl_fifo_rx_evt_valid_or_break (e->fifo);
2570       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
2571       sid = session->session_index;
2572       session_events = session->vep.ev.events;
2573       if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
2574         break;
2575       add_event = 1;
2576       events[*num_ev].events |= EPOLLIN;
2577       session_evt_data = session->vep.ev.data.u64;
2578       session->has_rx_evt = 1;
2579       break;
2580     case SESSION_IO_EVT_CT_RX:
2581       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
2582       sid = session->session_index;
2583       session_events = session->vep.ev.events;
2584       if (!(EPOLLOUT & session_events))
2585         break;
2586       add_event = 1;
2587       events[*num_ev].events |= EPOLLOUT;
2588       session_evt_data = session->vep.ev.data.u64;
2589       break;
2590     case SESSION_CTRL_EVT_ACCEPTED:
2591       session = vcl_session_accepted (wrk,
2592                                       (session_accepted_msg_t *) e->data);
2593       if (!session)
2594         break;
2595
2596       session_events = session->vep.ev.events;
2597       if (!(EPOLLIN & session_events))
2598         break;
2599
2600       add_event = 1;
2601       events[*num_ev].events |= EPOLLIN;
2602       session_evt_data = session->vep.ev.data.u64;
2603       break;
2604     case SESSION_CTRL_EVT_CONNECTED:
2605       connected_msg = (session_connected_msg_t *) e->data;
2606       vcl_session_connected_handler (wrk, connected_msg);
2607       /* Generate EPOLLOUT because there's no connected event */
2608       sid = vcl_session_index_from_vpp_handle (wrk, connected_msg->handle);
2609       session = vcl_session_get (wrk, sid);
2610       session_events = session->vep.ev.events;
2611       if (EPOLLOUT & session_events)
2612         {
2613           add_event = 1;
2614           events[*num_ev].events |= EPOLLOUT;
2615           session_evt_data = session->vep.ev.data.u64;
2616         }
2617       break;
2618     case SESSION_CTRL_EVT_DISCONNECTED:
2619       disconnected_msg = (session_disconnected_msg_t *) e->data;
2620       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
2621       if (!session)
2622         break;
2623       add_event = 1;
2624       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2625       session_evt_data = session->vep.ev.data.u64;
2626       session_events = session->vep.ev.events;
2627       break;
2628     case SESSION_CTRL_EVT_RESET:
2629       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2630       if (!(session = vcl_session_get (wrk, sid)))
2631         break;
2632       add_event = 1;
2633       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2634       session_evt_data = session->vep.ev.data.u64;
2635       session_events = session->vep.ev.events;
2636       break;
2637     default:
2638       VDBG (0, "unhandled: %u", e->event_type);
2639       break;
2640     }
2641
2642   if (add_event)
2643     {
2644       events[*num_ev].data.u64 = session_evt_data;
2645       if (EPOLLONESHOT & session_events)
2646         {
2647           session = vcl_session_get (wrk, sid);
2648           session->vep.ev.events = 0;
2649         }
2650       *num_ev += 1;
2651     }
2652 }
2653
2654 static int
2655 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2656                           struct epoll_event *events, u32 maxevents,
2657                           double wait_for_time, u32 * num_ev)
2658 {
2659   svm_msg_q_msg_t *msg;
2660   session_event_t *e;
2661   int i;
2662
2663   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
2664     goto handle_dequeued;
2665
2666   svm_msg_q_lock (mq);
2667   if (svm_msg_q_is_empty (mq))
2668     {
2669       if (!wait_for_time)
2670         {
2671           svm_msg_q_unlock (mq);
2672           return 0;
2673         }
2674       else if (wait_for_time < 0)
2675         {
2676           svm_msg_q_wait (mq);
2677         }
2678       else
2679         {
2680           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2681             {
2682               svm_msg_q_unlock (mq);
2683               return 0;
2684             }
2685         }
2686     }
2687   vcl_mq_dequeue_batch (wrk, mq);
2688   svm_msg_q_unlock (mq);
2689
2690 handle_dequeued:
2691   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2692     {
2693       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2694       e = svm_msg_q_msg_data (mq, msg);
2695       if (*num_ev < maxevents)
2696         vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
2697       else
2698         vec_add1 (wrk->unhandled_evts_vector, *e);
2699       svm_msg_q_free_msg (mq, msg);
2700     }
2701   vec_reset_length (wrk->mq_msg_vector);
2702
2703   return *num_ev;
2704 }
2705
2706 static int
2707 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
2708                            int maxevents, u32 n_evts, double wait_for_time)
2709 {
2710   vcl_cut_through_registration_t *cr;
2711   double total_wait = 0, wait_slice;
2712   int rv;
2713
2714   wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
2715   wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time;
2716
2717   do
2718     {
2719       vcl_ct_registration_lock (wrk);
2720       /* *INDENT-OFF* */
2721       pool_foreach (cr, wrk->cut_through_registrations, ({
2722         vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &n_evts);
2723       }));
2724       /* *INDENT-ON* */
2725       vcl_ct_registration_unlock (wrk);
2726
2727       rv = vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events,
2728                                      maxevents, n_evts ? 0 : wait_slice,
2729                                      &n_evts);
2730       if (rv)
2731         total_wait += wait_slice;
2732       if (n_evts)
2733         return n_evts;
2734     }
2735   while (total_wait < wait_for_time);
2736   return n_evts;
2737 }
2738
2739 static int
2740 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
2741                            int maxevents, u32 n_evts, double wait_for_time)
2742 {
2743   vcl_mq_evt_conn_t *mqc;
2744   int __clib_unused n_read;
2745   int n_mq_evts, i;
2746   u64 buf;
2747
2748   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2749 again:
2750   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2751                           vec_len (wrk->mq_events), wait_for_time);
2752   for (i = 0; i < n_mq_evts; i++)
2753     {
2754       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2755       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2756       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
2757     }
2758   if (!n_evts && n_mq_evts > 0)
2759     goto again;
2760
2761   return (int) n_evts;
2762 }
2763
2764 int
2765 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
2766                    int maxevents, double wait_for_time)
2767 {
2768   vcl_worker_t *wrk = vcl_worker_get_current ();
2769   vcl_session_t *vep_session;
2770   u32 n_evts = 0;
2771   int i;
2772
2773   if (PREDICT_FALSE (maxevents <= 0))
2774     {
2775       clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
2776                     getpid (), maxevents);
2777       return VPPCOM_EINVAL;
2778     }
2779
2780   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2781   if (!vep_session)
2782     return VPPCOM_EBADFD;
2783
2784   if (PREDICT_FALSE (!vep_session->is_vep))
2785     {
2786       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2787                     getpid (), vep_handle);
2788       return VPPCOM_EINVAL;
2789     }
2790
2791   memset (events, 0, sizeof (*events) * maxevents);
2792
2793   if (vec_len (wrk->unhandled_evts_vector))
2794     {
2795       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2796         {
2797           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
2798                                           events, &n_evts);
2799           if (n_evts == maxevents)
2800             {
2801               i += 1;
2802               break;
2803             }
2804         }
2805
2806       vec_delete (wrk->unhandled_evts_vector, i, 0);
2807     }
2808
2809   if (vcm->cfg.use_mq_eventfd)
2810     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
2811                                       wait_for_time);
2812
2813   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
2814                                     wait_for_time);
2815 }
2816
2817 int
2818 vppcom_session_attr (uint32_t session_handle, uint32_t op,
2819                      void *buffer, uint32_t * buflen)
2820 {
2821   vcl_worker_t *wrk = vcl_worker_get_current ();
2822   vcl_session_t *session;
2823   int rv = VPPCOM_OK;
2824   u32 *flags = buffer;
2825   vppcom_endpt_t *ep = buffer;
2826
2827   session = vcl_session_get_w_handle (wrk, session_handle);
2828   if (!session)
2829     return VPPCOM_EBADFD;
2830
2831   switch (op)
2832     {
2833     case VPPCOM_ATTR_GET_NREAD:
2834       rv = vppcom_session_read_ready (session);
2835       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d",
2836             getpid (), rv);
2837       break;
2838
2839     case VPPCOM_ATTR_GET_NWRITE:
2840       rv = vppcom_session_write_ready (session);
2841       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
2842             getpid (), session_handle, rv);
2843       break;
2844
2845     case VPPCOM_ATTR_GET_FLAGS:
2846       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
2847         {
2848           *flags = O_RDWR | (VCL_SESS_ATTR_TEST (session->attr,
2849                                                  VCL_SESS_ATTR_NONBLOCK));
2850           *buflen = sizeof (*flags);
2851           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_FLAGS: sid %u, flags = 0x%08x, "
2852                 "is_nonblocking = %u", getpid (),
2853                 session_handle, *flags,
2854                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2855         }
2856       else
2857         rv = VPPCOM_EINVAL;
2858       break;
2859
2860     case VPPCOM_ATTR_SET_FLAGS:
2861       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
2862         {
2863           if (*flags & O_NONBLOCK)
2864             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
2865           else
2866             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
2867
2868           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_FLAGS: sid %u, flags = 0x%08x,"
2869                 " is_nonblocking = %u",
2870                 getpid (), session_handle, *flags,
2871                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2872         }
2873       else
2874         rv = VPPCOM_EINVAL;
2875       break;
2876
2877     case VPPCOM_ATTR_GET_PEER_ADDR:
2878       if (PREDICT_TRUE (buffer && buflen &&
2879                         (*buflen >= sizeof (*ep)) && ep->ip))
2880         {
2881           ep->is_ip4 = session->transport.is_ip4;
2882           ep->port = session->transport.rmt_port;
2883           if (session->transport.is_ip4)
2884             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
2885                               sizeof (ip4_address_t));
2886           else
2887             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
2888                               sizeof (ip6_address_t));
2889           *buflen = sizeof (*ep);
2890           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_PEER_ADDR: sid %u, is_ip4 = %u, "
2891                 "addr = %U, port %u", getpid (),
2892                 session_handle, ep->is_ip4, format_ip46_address,
2893                 &session->transport.rmt_ip,
2894                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2895                 clib_net_to_host_u16 (ep->port));
2896         }
2897       else
2898         rv = VPPCOM_EINVAL;
2899       break;
2900
2901     case VPPCOM_ATTR_GET_LCL_ADDR:
2902       if (PREDICT_TRUE (buffer && buflen &&
2903                         (*buflen >= sizeof (*ep)) && ep->ip))
2904         {
2905           ep->is_ip4 = session->transport.is_ip4;
2906           ep->port = session->transport.lcl_port;
2907           if (session->transport.is_ip4)
2908             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip4,
2909                               sizeof (ip4_address_t));
2910           else
2911             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip6,
2912                               sizeof (ip6_address_t));
2913           *buflen = sizeof (*ep);
2914           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_LCL_ADDR: sid %u, is_ip4 = %u,"
2915                 " addr = %U port %d", getpid (),
2916                 session_handle, ep->is_ip4, format_ip46_address,
2917                 &session->transport.lcl_ip,
2918                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2919                 clib_net_to_host_u16 (ep->port));
2920         }
2921       else
2922         rv = VPPCOM_EINVAL;
2923       break;
2924
2925     case VPPCOM_ATTR_GET_LIBC_EPFD:
2926       rv = session->libc_epfd;
2927       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d",
2928             getpid (), rv);
2929       break;
2930
2931     case VPPCOM_ATTR_SET_LIBC_EPFD:
2932       if (PREDICT_TRUE (buffer && buflen &&
2933                         (*buflen == sizeof (session->libc_epfd))))
2934         {
2935           session->libc_epfd = *(int *) buffer;
2936           *buflen = sizeof (session->libc_epfd);
2937
2938           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, "
2939                 "buflen %d", getpid (), session->libc_epfd, *buflen);
2940         }
2941       else
2942         rv = VPPCOM_EINVAL;
2943       break;
2944
2945     case VPPCOM_ATTR_GET_PROTOCOL:
2946       if (buffer && buflen && (*buflen >= sizeof (int)))
2947         {
2948           *(int *) buffer = session->session_type;
2949           *buflen = sizeof (int);
2950
2951           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
2952                 getpid (), *(int *) buffer, *(int *) buffer ? "UDP" : "TCP",
2953                 *buflen);
2954         }
2955       else
2956         rv = VPPCOM_EINVAL;
2957       break;
2958
2959     case VPPCOM_ATTR_GET_LISTEN:
2960       if (buffer && buflen && (*buflen >= sizeof (int)))
2961         {
2962           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2963                                                 VCL_SESS_ATTR_LISTEN);
2964           *buflen = sizeof (int);
2965
2966           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LISTEN: %d, buflen %d",
2967                 getpid (), *(int *) buffer, *buflen);
2968         }
2969       else
2970         rv = VPPCOM_EINVAL;
2971       break;
2972
2973     case VPPCOM_ATTR_GET_ERROR:
2974       if (buffer && buflen && (*buflen >= sizeof (int)))
2975         {
2976           *(int *) buffer = 0;
2977           *buflen = sizeof (int);
2978
2979           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
2980                 getpid (), *(int *) buffer, *buflen);
2981         }
2982       else
2983         rv = VPPCOM_EINVAL;
2984       break;
2985
2986     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
2987       if (buffer && buflen && (*buflen >= sizeof (u32)))
2988         {
2989
2990           /* VPP-TBD */
2991           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
2992                                 session->tx_fifo ? session->tx_fifo->nitems :
2993                                 vcm->cfg.tx_fifo_size);
2994           *buflen = sizeof (u32);
2995
2996           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), "
2997                 "buflen %d, #VPP-TBD#", getpid (),
2998                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
2999         }
3000       else
3001         rv = VPPCOM_EINVAL;
3002       break;
3003
3004     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
3005       if (buffer && buflen && (*buflen == sizeof (u32)))
3006         {
3007           /* VPP-TBD */
3008           session->sndbuf_size = *(u32 *) buffer;
3009           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), "
3010                 "buflen %d, #VPP-TBD#", getpid (),
3011                 session->sndbuf_size, session->sndbuf_size, *buflen);
3012         }
3013       else
3014         rv = VPPCOM_EINVAL;
3015       break;
3016
3017     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
3018       if (buffer && buflen && (*buflen >= sizeof (u32)))
3019         {
3020
3021           /* VPP-TBD */
3022           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
3023                                 session->rx_fifo ? session->rx_fifo->nitems :
3024                                 vcm->cfg.rx_fifo_size);
3025           *buflen = sizeof (u32);
3026
3027           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), "
3028                 "buflen %d, #VPP-TBD#", getpid (),
3029                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
3030         }
3031       else
3032         rv = VPPCOM_EINVAL;
3033       break;
3034
3035     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
3036       if (buffer && buflen && (*buflen == sizeof (u32)))
3037         {
3038           /* VPP-TBD */
3039           session->rcvbuf_size = *(u32 *) buffer;
3040           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), "
3041                 "buflen %d, #VPP-TBD#", getpid (),
3042                 session->sndbuf_size, session->sndbuf_size, *buflen);
3043         }
3044       else
3045         rv = VPPCOM_EINVAL;
3046       break;
3047
3048     case VPPCOM_ATTR_GET_REUSEADDR:
3049       if (buffer && buflen && (*buflen >= sizeof (int)))
3050         {
3051           /* VPP-TBD */
3052           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3053                                                 VCL_SESS_ATTR_REUSEADDR);
3054           *buflen = sizeof (int);
3055
3056           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEADDR: %d, "
3057                 "buflen %d, #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3058         }
3059       else
3060         rv = VPPCOM_EINVAL;
3061       break;
3062
3063     case VPPCOM_ATTR_SET_REUSEADDR:
3064       if (buffer && buflen && (*buflen == sizeof (int)) &&
3065           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
3066         {
3067           /* VPP-TBD */
3068           if (*(int *) buffer)
3069             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEADDR);
3070           else
3071             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEADDR);
3072
3073           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d,"
3074                 " #VPP-TBD#", getpid (),
3075                 VCL_SESS_ATTR_TEST (session->attr,
3076                                     VCL_SESS_ATTR_REUSEADDR), *buflen);
3077         }
3078       else
3079         rv = VPPCOM_EINVAL;
3080       break;
3081
3082     case VPPCOM_ATTR_GET_REUSEPORT:
3083       if (buffer && buflen && (*buflen >= sizeof (int)))
3084         {
3085           /* VPP-TBD */
3086           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3087                                                 VCL_SESS_ATTR_REUSEPORT);
3088           *buflen = sizeof (int);
3089
3090           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d,"
3091                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3092         }
3093       else
3094         rv = VPPCOM_EINVAL;
3095       break;
3096
3097     case VPPCOM_ATTR_SET_REUSEPORT:
3098       if (buffer && buflen && (*buflen == sizeof (int)) &&
3099           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
3100         {
3101           /* VPP-TBD */
3102           if (*(int *) buffer)
3103             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEPORT);
3104           else
3105             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEPORT);
3106
3107           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d,"
3108                 " #VPP-TBD#", getpid (),
3109                 VCL_SESS_ATTR_TEST (session->attr,
3110                                     VCL_SESS_ATTR_REUSEPORT), *buflen);
3111         }
3112       else
3113         rv = VPPCOM_EINVAL;
3114       break;
3115
3116     case VPPCOM_ATTR_GET_BROADCAST:
3117       if (buffer && buflen && (*buflen >= sizeof (int)))
3118         {
3119           /* VPP-TBD */
3120           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3121                                                 VCL_SESS_ATTR_BROADCAST);
3122           *buflen = sizeof (int);
3123
3124           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d,"
3125                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3126         }
3127       else
3128         rv = VPPCOM_EINVAL;
3129       break;
3130
3131     case VPPCOM_ATTR_SET_BROADCAST:
3132       if (buffer && buflen && (*buflen == sizeof (int)))
3133         {
3134           /* VPP-TBD */
3135           if (*(int *) buffer)
3136             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_BROADCAST);
3137           else
3138             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_BROADCAST);
3139
3140           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, "
3141                 "#VPP-TBD#", getpid (),
3142                 VCL_SESS_ATTR_TEST (session->attr,
3143                                     VCL_SESS_ATTR_BROADCAST), *buflen);
3144         }
3145       else
3146         rv = VPPCOM_EINVAL;
3147       break;
3148
3149     case VPPCOM_ATTR_GET_V6ONLY:
3150       if (buffer && buflen && (*buflen >= sizeof (int)))
3151         {
3152           /* VPP-TBD */
3153           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3154                                                 VCL_SESS_ATTR_V6ONLY);
3155           *buflen = sizeof (int);
3156
3157           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, "
3158                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3159         }
3160       else
3161         rv = VPPCOM_EINVAL;
3162       break;
3163
3164     case VPPCOM_ATTR_SET_V6ONLY:
3165       if (buffer && buflen && (*buflen == sizeof (int)))
3166         {
3167           /* VPP-TBD */
3168           if (*(int *) buffer)
3169             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_V6ONLY);
3170           else
3171             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_V6ONLY);
3172
3173           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, "
3174                 "#VPP-TBD#", getpid (),
3175                 VCL_SESS_ATTR_TEST (session->attr,
3176                                     VCL_SESS_ATTR_V6ONLY), *buflen);
3177         }
3178       else
3179         rv = VPPCOM_EINVAL;
3180       break;
3181
3182     case VPPCOM_ATTR_GET_KEEPALIVE:
3183       if (buffer && buflen && (*buflen >= sizeof (int)))
3184         {
3185           /* VPP-TBD */
3186           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3187                                                 VCL_SESS_ATTR_KEEPALIVE);
3188           *buflen = sizeof (int);
3189
3190           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, "
3191                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3192         }
3193       else
3194         rv = VPPCOM_EINVAL;
3195       break;
3196
3197     case VPPCOM_ATTR_SET_KEEPALIVE:
3198       if (buffer && buflen && (*buflen == sizeof (int)))
3199         {
3200           /* VPP-TBD */
3201           if (*(int *) buffer)
3202             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3203           else
3204             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3205
3206           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, "
3207                 "#VPP-TBD#", getpid (),
3208                 VCL_SESS_ATTR_TEST (session->attr,
3209                                     VCL_SESS_ATTR_KEEPALIVE), *buflen);
3210         }
3211       else
3212         rv = VPPCOM_EINVAL;
3213       break;
3214
3215     case VPPCOM_ATTR_GET_TCP_NODELAY:
3216       if (buffer && buflen && (*buflen >= sizeof (int)))
3217         {
3218           /* VPP-TBD */
3219           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3220                                                 VCL_SESS_ATTR_TCP_NODELAY);
3221           *buflen = sizeof (int);
3222
3223           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, "
3224                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3225         }
3226       else
3227         rv = VPPCOM_EINVAL;
3228       break;
3229
3230     case VPPCOM_ATTR_SET_TCP_NODELAY:
3231       if (buffer && buflen && (*buflen == sizeof (int)))
3232         {
3233           /* VPP-TBD */
3234           if (*(int *) buffer)
3235             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3236           else
3237             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3238
3239           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, "
3240                 "#VPP-TBD#", getpid (),
3241                 VCL_SESS_ATTR_TEST (session->attr,
3242                                     VCL_SESS_ATTR_TCP_NODELAY), *buflen);
3243         }
3244       else
3245         rv = VPPCOM_EINVAL;
3246       break;
3247
3248     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3249       if (buffer && buflen && (*buflen >= sizeof (int)))
3250         {
3251           /* VPP-TBD */
3252           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3253                                                 VCL_SESS_ATTR_TCP_KEEPIDLE);
3254           *buflen = sizeof (int);
3255
3256           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, "
3257                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3258         }
3259       else
3260         rv = VPPCOM_EINVAL;
3261       break;
3262
3263     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3264       if (buffer && buflen && (*buflen == sizeof (int)))
3265         {
3266           /* VPP-TBD */
3267           if (*(int *) buffer)
3268             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3269           else
3270             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3271
3272           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, "
3273                 "#VPP-TBD#", getpid (),
3274                 VCL_SESS_ATTR_TEST (session->attr,
3275                                     VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3276         }
3277       else
3278         rv = VPPCOM_EINVAL;
3279       break;
3280
3281     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3282       if (buffer && buflen && (*buflen >= sizeof (int)))
3283         {
3284           /* VPP-TBD */
3285           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3286                                                 VCL_SESS_ATTR_TCP_KEEPINTVL);
3287           *buflen = sizeof (int);
3288
3289           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, "
3290                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3291         }
3292       else
3293         rv = VPPCOM_EINVAL;
3294       break;
3295
3296     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3297       if (buffer && buflen && (*buflen == sizeof (int)))
3298         {
3299           /* VPP-TBD */
3300           if (*(int *) buffer)
3301             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3302           else
3303             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3304
3305           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, "
3306                 "#VPP-TBD#", getpid (),
3307                 VCL_SESS_ATTR_TEST (session->attr,
3308                                     VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3309         }
3310       else
3311         rv = VPPCOM_EINVAL;
3312       break;
3313
3314     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3315       if (buffer && buflen && (*buflen >= sizeof (u32)))
3316         {
3317           /* VPP-TBD */
3318           *(u32 *) buffer = session->user_mss;
3319           *buflen = sizeof (int);
3320
3321           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d,"
3322                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3323         }
3324       else
3325         rv = VPPCOM_EINVAL;
3326       break;
3327
3328     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3329       if (buffer && buflen && (*buflen == sizeof (u32)))
3330         {
3331           /* VPP-TBD */
3332           session->user_mss = *(u32 *) buffer;
3333
3334           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, "
3335                 "#VPP-TBD#", getpid (), session->user_mss, *buflen);
3336         }
3337       else
3338         rv = VPPCOM_EINVAL;
3339       break;
3340
3341     case VPPCOM_ATTR_GET_REFCNT:
3342       rv = vcl_session_get_refcnt (session);
3343       break;
3344
3345     default:
3346       rv = VPPCOM_EINVAL;
3347       break;
3348     }
3349
3350   return rv;
3351 }
3352
3353 int
3354 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3355                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3356 {
3357   vcl_worker_t *wrk = vcl_worker_get_current ();
3358   int rv = VPPCOM_OK;
3359   vcl_session_t *session = 0;
3360
3361   if (ep)
3362     {
3363       session = vcl_session_get_w_handle (wrk, session_handle);
3364       if (PREDICT_FALSE (!session))
3365         {
3366           VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
3367                 getpid (), session_handle);
3368           return VPPCOM_EBADFD;
3369         }
3370       ep->is_ip4 = session->transport.is_ip4;
3371       ep->port = session->transport.rmt_port;
3372     }
3373
3374   if (flags == 0)
3375     rv = vppcom_session_read (session_handle, buffer, buflen);
3376   else if (flags & MSG_PEEK)
3377     rv = vppcom_session_peek (session_handle, buffer, buflen);
3378   else
3379     {
3380       clib_warning ("VCL<%d>: Unsupport flags for recvfrom %d",
3381                     getpid (), flags);
3382       return VPPCOM_EAFNOSUPPORT;
3383     }
3384
3385   if (ep)
3386     {
3387       if (session->transport.is_ip4)
3388         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3389                           sizeof (ip4_address_t));
3390       else
3391         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3392                           sizeof (ip6_address_t));
3393     }
3394
3395   return rv;
3396 }
3397
3398 int
3399 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3400                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3401 {
3402   if (!buffer)
3403     return VPPCOM_EINVAL;
3404
3405   if (ep)
3406     {
3407       // TBD
3408       return VPPCOM_EINVAL;
3409     }
3410
3411   if (flags)
3412     {
3413       // TBD check the flags and do the right thing
3414       VDBG (2, "VCL<%d>: handling flags 0x%u (%d) not implemented yet.",
3415             getpid (), flags, flags);
3416     }
3417
3418   return (vppcom_session_write_inline (session_handle, buffer, buflen, 1));
3419 }
3420
3421 int
3422 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3423 {
3424   vcl_worker_t *wrk = vcl_worker_get_current ();
3425   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3426   u32 i, keep_trying = 1;
3427   svm_msg_q_msg_t msg;
3428   session_event_t *e;
3429   int rv, num_ev = 0;
3430
3431   VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
3432         getpid (), vp, n_sids, wait_for_time);
3433
3434   if (!vp)
3435     return VPPCOM_EFAULT;
3436
3437   do
3438     {
3439       vcl_session_t *session;
3440
3441       /* Dequeue all events and drop all unhandled io events */
3442       while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
3443         {
3444           e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
3445           vcl_handle_mq_event (wrk, e);
3446           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
3447         }
3448       vec_reset_length (wrk->unhandled_evts_vector);
3449
3450       for (i = 0; i < n_sids; i++)
3451         {
3452           session = vcl_session_get (wrk, vp[i].sid);
3453           if (!session)
3454             {
3455               vp[i].revents = POLLHUP;
3456               num_ev++;
3457               continue;
3458             }
3459
3460           vp[i].revents = 0;
3461
3462           if (POLLIN & vp[i].events)
3463             {
3464               rv = vppcom_session_read_ready (session);
3465               if (rv > 0)
3466                 {
3467                   vp[i].revents |= POLLIN;
3468                   num_ev++;
3469                 }
3470               else if (rv < 0)
3471                 {
3472                   switch (rv)
3473                     {
3474                     case VPPCOM_ECONNRESET:
3475                       vp[i].revents = POLLHUP;
3476                       break;
3477
3478                     default:
3479                       vp[i].revents = POLLERR;
3480                       break;
3481                     }
3482                   num_ev++;
3483                 }
3484             }
3485
3486           if (POLLOUT & vp[i].events)
3487             {
3488               rv = vppcom_session_write_ready (session);
3489               if (rv > 0)
3490                 {
3491                   vp[i].revents |= POLLOUT;
3492                   num_ev++;
3493                 }
3494               else if (rv < 0)
3495                 {
3496                   switch (rv)
3497                     {
3498                     case VPPCOM_ECONNRESET:
3499                       vp[i].revents = POLLHUP;
3500                       break;
3501
3502                     default:
3503                       vp[i].revents = POLLERR;
3504                       break;
3505                     }
3506                   num_ev++;
3507                 }
3508             }
3509
3510           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3511             {
3512               vp[i].revents = POLLNVAL;
3513               num_ev++;
3514             }
3515         }
3516       if (wait_for_time != -1)
3517         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3518     }
3519   while ((num_ev == 0) && keep_trying);
3520
3521   if (VPPCOM_DEBUG > 3)
3522     {
3523       clib_warning ("VCL<%d>: returning %d", getpid (), num_ev);
3524       for (i = 0; i < n_sids; i++)
3525         {
3526           clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
3527                         ".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
3528                         vp[i].events, vp[i].revents);
3529         }
3530     }
3531   return num_ev;
3532 }
3533
3534 int
3535 vppcom_mq_epoll_fd (void)
3536 {
3537   vcl_worker_t *wrk = vcl_worker_get_current ();
3538   return wrk->mqs_epfd;
3539 }
3540
3541 int
3542 vppcom_session_index (uint32_t session_handle)
3543 {
3544   return session_handle & 0xFFFFFF;
3545 }
3546
3547 int
3548 vppcom_session_handle (uint32_t session_index)
3549 {
3550   return (vcl_get_worker_index () << 24) | session_index;
3551 }
3552
3553 int
3554 vppcom_worker_register (void)
3555 {
3556   if (!vcl_worker_alloc_and_init ())
3557     return VPPCOM_EEXIST;
3558
3559   if (vcl_worker_set_bapi ())
3560     return VPPCOM_EEXIST;
3561
3562   if (vcl_worker_register_with_vpp ())
3563     return VPPCOM_EEXIST;
3564
3565   return VPPCOM_OK;
3566 }
3567
3568 int
3569 vppcom_worker_index (void)
3570 {
3571   return vcl_get_worker_index ();
3572 }
3573
3574 /*
3575  * fd.io coding-style-patch-verification: ON
3576  *
3577  * Local Variables:
3578  * eval: (c-set-style "gnu")
3579  * End:
3580  */