vcl/ldp: add write msg function and fine tuning
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <svm/svm_fifo_segment.h>
19 #include <vcl/vppcom.h>
20 #include <vcl/vcl_debug.h>
21 #include <vcl/vcl_private.h>
22
23 __thread uword __vcl_worker_index = ~0;
24
25
26 static int
27 vcl_wait_for_segment (u64 segment_handle)
28 {
29   vcl_worker_t *wrk = vcl_worker_get_current ();
30   u32 wait_for_seconds = 10, segment_index;
31   f64 timeout;
32
33   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
34     return 1;
35
36   timeout = clib_time_now (&wrk->clib_time) + wait_for_seconds;
37   while (clib_time_now (&wrk->clib_time) < timeout)
38     {
39       segment_index = vcl_segment_table_lookup (segment_handle);
40       if (segment_index != VCL_INVALID_SEGMENT_INDEX)
41         return 0;
42       usleep (10);
43     }
44   return 1;
45 }
46
47 const char *
48 vppcom_session_state_str (session_state_t state)
49 {
50   char *st;
51
52   switch (state)
53     {
54     case STATE_START:
55       st = "STATE_START";
56       break;
57
58     case STATE_CONNECT:
59       st = "STATE_CONNECT";
60       break;
61
62     case STATE_LISTEN:
63       st = "STATE_LISTEN";
64       break;
65
66     case STATE_ACCEPT:
67       st = "STATE_ACCEPT";
68       break;
69
70     case STATE_VPP_CLOSING:
71       st = "STATE_VPP_CLOSING";
72       break;
73
74     case STATE_DISCONNECT:
75       st = "STATE_DISCONNECT";
76       break;
77
78     case STATE_FAILED:
79       st = "STATE_FAILED";
80       break;
81
82     default:
83       st = "UNKNOWN_STATE";
84       break;
85     }
86
87   return st;
88 }
89
90 u8 *
91 format_ip4_address (u8 * s, va_list * args)
92 {
93   u8 *a = va_arg (*args, u8 *);
94   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
95 }
96
97 u8 *
98 format_ip6_address (u8 * s, va_list * args)
99 {
100   ip6_address_t *a = va_arg (*args, ip6_address_t *);
101   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
102
103   i_max_n_zero = ARRAY_LEN (a->as_u16);
104   max_n_zeros = 0;
105   i_first_zero = i_max_n_zero;
106   n_zeros = 0;
107   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
108     {
109       u32 is_zero = a->as_u16[i] == 0;
110       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
111         {
112           i_first_zero = i;
113           n_zeros = 0;
114         }
115       n_zeros += is_zero;
116       if ((!is_zero && n_zeros > max_n_zeros)
117           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
118         {
119           i_max_n_zero = i_first_zero;
120           max_n_zeros = n_zeros;
121           i_first_zero = ARRAY_LEN (a->as_u16);
122           n_zeros = 0;
123         }
124     }
125
126   last_double_colon = 0;
127   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
128     {
129       if (i == i_max_n_zero && max_n_zeros > 1)
130         {
131           s = format (s, "::");
132           i += max_n_zeros - 1;
133           last_double_colon = 1;
134         }
135       else
136         {
137           s = format (s, "%s%x",
138                       (last_double_colon || i == 0) ? "" : ":",
139                       clib_net_to_host_u16 (a->as_u16[i]));
140           last_double_colon = 0;
141         }
142     }
143
144   return s;
145 }
146
147 /* Format an IP46 address. */
148 u8 *
149 format_ip46_address (u8 * s, va_list * args)
150 {
151   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
152   ip46_type_t type = va_arg (*args, ip46_type_t);
153   int is_ip4 = 1;
154
155   switch (type)
156     {
157     case IP46_TYPE_ANY:
158       is_ip4 = ip46_address_is_ip4 (ip46);
159       break;
160     case IP46_TYPE_IP4:
161       is_ip4 = 1;
162       break;
163     case IP46_TYPE_IP6:
164       is_ip4 = 0;
165       break;
166     }
167
168   return is_ip4 ?
169     format (s, "%U", format_ip4_address, &ip46->ip4) :
170     format (s, "%U", format_ip6_address, &ip46->ip6);
171 }
172
173 /*
174  * VPPCOM Utility Functions
175  */
176
177
178 static svm_msg_q_t *
179 vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
180 {
181   if (vcl_session_is_ct (s))
182     return wrk->vpp_event_queues[0];
183   else
184     return wrk->vpp_event_queues[s->vpp_thread_index];
185 }
186
187 static void
188 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
189                                  session_handle_t handle, int retval)
190 {
191   app_session_evt_t _app_evt, *app_evt = &_app_evt;
192   session_accepted_reply_msg_t *rmp;
193   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
194   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
195   rmp->handle = handle;
196   rmp->context = context;
197   rmp->retval = retval;
198   app_send_ctrl_evt_to_vpp (mq, app_evt);
199 }
200
201 static void
202 vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
203                                      session_handle_t handle, int retval)
204 {
205   app_session_evt_t _app_evt, *app_evt = &_app_evt;
206   session_disconnected_reply_msg_t *rmp;
207   app_alloc_ctrl_evt_to_vpp (mq, app_evt,
208                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
209   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
210   rmp->handle = handle;
211   rmp->context = context;
212   rmp->retval = retval;
213   app_send_ctrl_evt_to_vpp (mq, app_evt);
214 }
215
216 static void
217 vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
218                               session_handle_t handle, int retval)
219 {
220   app_session_evt_t _app_evt, *app_evt = &_app_evt;
221   session_reset_reply_msg_t *rmp;
222   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_RESET_REPLY);
223   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
224   rmp->handle = handle;
225   rmp->context = context;
226   rmp->retval = retval;
227   app_send_ctrl_evt_to_vpp (mq, app_evt);
228 }
229
230 static u32
231 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
232 {
233   vcl_session_t *session, *listen_session;
234   svm_fifo_t *rx_fifo, *tx_fifo;
235   u32 vpp_wrk_index;
236   svm_msg_q_t *evt_q;
237
238   session = vcl_session_alloc (wrk);
239
240   listen_session = vcl_session_table_lookup_listener (wrk,
241                                                       mp->listener_handle);
242   if (!listen_session)
243     {
244       svm_msg_q_t *evt_q;
245       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
246       clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
247                     "unknown vpp listener handle %llx",
248                     getpid (), mp->listener_handle);
249       vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
250                                        VNET_API_ERROR_INVALID_ARGUMENT);
251       vcl_session_free (wrk, session);
252       return VCL_INVALID_SESSION_INDEX;
253     }
254
255   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
256   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
257
258   if (mp->server_event_queue_address)
259     {
260       session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address,
261                                              svm_msg_q_t *);
262       session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
263                                              svm_msg_q_t *);
264       if (vcl_wait_for_segment (mp->segment_handle))
265         {
266           clib_warning ("segment for session %u couldn't be mounted!",
267                         session->session_index);
268           return VCL_INVALID_SESSION_INDEX;
269         }
270       rx_fifo->master_session_index = session->session_index;
271       tx_fifo->master_session_index = session->session_index;
272       rx_fifo->master_thread_index = vcl_get_worker_index ();
273       tx_fifo->master_thread_index = vcl_get_worker_index ();
274       vec_validate (wrk->vpp_event_queues, 0);
275       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
276       wrk->vpp_event_queues[0] = evt_q;
277     }
278   else
279     {
280       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
281                                              svm_msg_q_t *);
282       rx_fifo->client_session_index = session->session_index;
283       tx_fifo->client_session_index = session->session_index;
284       rx_fifo->client_thread_index = vcl_get_worker_index ();
285       tx_fifo->client_thread_index = vcl_get_worker_index ();
286       vpp_wrk_index = tx_fifo->master_thread_index;
287       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
288       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
289     }
290
291   session->vpp_handle = mp->handle;
292   session->vpp_thread_index = rx_fifo->master_thread_index;
293   session->client_context = mp->context;
294   session->rx_fifo = rx_fifo;
295   session->tx_fifo = tx_fifo;
296
297   session->session_state = STATE_ACCEPT;
298   session->transport.rmt_port = mp->port;
299   session->transport.is_ip4 = mp->is_ip4;
300   clib_memcpy_fast (&session->transport.rmt_ip, mp->ip,
301                     sizeof (ip46_address_t));
302
303   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
304   session->transport.lcl_port = listen_session->transport.lcl_port;
305   session->transport.lcl_ip = listen_session->transport.lcl_ip;
306   session->session_type = listen_session->session_type;
307   session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
308
309   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
310         " address %U port %d queue %p!", getpid (), mp->handle,
311         session->session_index,
312         mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip,
313         mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
314         clib_net_to_host_u16 (mp->port), session->vpp_evt_q);
315   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
316
317   return session->session_index;
318 }
319
320 static u32
321 vcl_session_connected_handler (vcl_worker_t * wrk,
322                                session_connected_msg_t * mp)
323 {
324   u32 session_index, vpp_wrk_index;
325   svm_fifo_t *rx_fifo, *tx_fifo;
326   vcl_session_t *session = 0;
327   svm_msg_q_t *evt_q;
328
329   session_index = mp->context;
330   session = vcl_session_get (wrk, session_index);
331   if (!session)
332     {
333       clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
334                     "Invalid session index (%u)!",
335                     getpid (), mp->handle, session_index);
336       return VCL_INVALID_SESSION_INDEX;
337     }
338   if (mp->retval)
339     {
340       clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
341                     session_index, format_api_error, ntohl (mp->retval));
342       session->session_state = STATE_FAILED;
343       session->vpp_handle = mp->handle;
344       return session_index;
345     }
346
347   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
348   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
349   if (vcl_wait_for_segment (mp->segment_handle))
350     {
351       clib_warning ("segment for session %u couldn't be mounted!",
352                     session->session_index);
353       return VCL_INVALID_SESSION_INDEX;
354     }
355
356   rx_fifo->client_session_index = session_index;
357   tx_fifo->client_session_index = session_index;
358   rx_fifo->client_thread_index = vcl_get_worker_index ();
359   tx_fifo->client_thread_index = vcl_get_worker_index ();
360
361   if (mp->client_event_queue_address)
362     {
363       session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
364                                              svm_msg_q_t *);
365       session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
366                                              svm_msg_q_t *);
367
368       vec_validate (wrk->vpp_event_queues, 0);
369       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
370       wrk->vpp_event_queues[0] = evt_q;
371     }
372   else
373     {
374       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
375                                              svm_msg_q_t *);
376       vpp_wrk_index = tx_fifo->master_thread_index;
377       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
378       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
379     }
380
381   session->rx_fifo = rx_fifo;
382   session->tx_fifo = tx_fifo;
383   session->vpp_handle = mp->handle;
384   session->vpp_thread_index = rx_fifo->master_thread_index;
385   session->transport.is_ip4 = mp->is_ip4;
386   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
387                     sizeof (session->transport.lcl_ip));
388   session->transport.lcl_port = mp->lcl_port;
389   session->session_state = STATE_CONNECT;
390
391   /* Add it to lookup table */
392   vcl_session_table_add_vpp_handle (wrk, mp->handle, session_index);
393
394   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: connect succeeded! "
395         "session_rx_fifo %p, refcnt %d, session_tx_fifo %p, refcnt %d",
396         getpid (), mp->handle, session_index, session->rx_fifo,
397         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
398
399   return session_index;
400 }
401
402 static int
403 vcl_flag_accepted_session (vcl_session_t * session, u64 handle, u32 flags)
404 {
405   vcl_session_msg_t *accepted_msg;
406   int i;
407
408   for (i = 0; i < vec_len (session->accept_evts_fifo); i++)
409     {
410       accepted_msg = &session->accept_evts_fifo[i];
411       if (accepted_msg->accepted_msg.handle == handle)
412         {
413           accepted_msg->flags |= flags;
414           return 1;
415         }
416     }
417   return 0;
418 }
419
420 static u32
421 vcl_session_reset_handler (vcl_worker_t * wrk,
422                            session_reset_msg_t * reset_msg)
423 {
424   vcl_session_t *session;
425   u32 sid;
426
427   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
428   session = vcl_session_get (wrk, sid);
429   if (!session)
430     {
431       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
432       return VCL_INVALID_SESSION_INDEX;
433     }
434
435   /* Caught a reset before actually accepting the session */
436   if (session->session_state == STATE_LISTEN)
437     {
438
439       if (!vcl_flag_accepted_session (session, reset_msg->handle,
440                                       VCL_ACCEPTED_F_RESET))
441         VDBG (0, "session was not accepted!");
442       return VCL_INVALID_SESSION_INDEX;
443     }
444
445   session->session_state = STATE_DISCONNECT;
446   VDBG (0, "reset session %u [0x%llx]", sid, reset_msg->handle);
447   return sid;
448 }
449
450 static u32
451 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
452 {
453   vcl_session_t *session;
454   u32 sid = mp->context;
455
456   session = vcl_session_get (wrk, sid);
457   if (mp->retval)
458     {
459       VERR ("vpp handle 0x%llx, sid %u: bind failed: %U", mp->handle, sid,
460             format_api_error, mp->retval);
461       if (session)
462         {
463           session->session_state = STATE_FAILED;
464           session->vpp_handle = mp->handle;
465           return sid;
466         }
467       else
468         {
469           clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
470                         "Invalid session index (%u)!",
471                         getpid (), mp->handle, sid);
472           return VCL_INVALID_SESSION_INDEX;
473         }
474     }
475
476   session->vpp_handle = mp->handle;
477   session->transport.is_ip4 = mp->lcl_is_ip4;
478   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
479                     sizeof (ip46_address_t));
480   session->transport.lcl_port = mp->lcl_port;
481   vcl_session_table_add_listener (wrk, mp->handle, sid);
482   session->session_state = STATE_LISTEN;
483
484   if (session->is_dgram)
485     {
486       svm_fifo_t *rx_fifo, *tx_fifo;
487       session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
488       rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
489       rx_fifo->client_session_index = sid;
490       tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
491       tx_fifo->client_session_index = sid;
492       session->rx_fifo = rx_fifo;
493       session->tx_fifo = tx_fifo;
494     }
495
496   VDBG (0, "session %u [0x%llx]: listen succeeded!", sid, mp->handle);
497   return sid;
498 }
499
500 static vcl_session_t *
501 vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg)
502 {
503   vcl_session_msg_t *vcl_msg;
504   vcl_session_t *session;
505
506   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
507   if (PREDICT_FALSE (session != 0))
508     VWRN ("session overlap handle %lu state %u!", msg->handle,
509           session->session_state);
510
511   session = vcl_session_table_lookup_listener (wrk, msg->listener_handle);
512   if (!session)
513     {
514       VERR ("couldn't find listen session: listener handle %llx",
515             msg->listener_handle);
516       return 0;
517     }
518
519   clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
520   vcl_msg->accepted_msg = *msg;
521   /* Session handle points to listener until fully accepted by app */
522   vcl_session_table_add_vpp_handle (wrk, msg->handle, session->session_index);
523
524   return session;
525 }
526
527 static vcl_session_t *
528 vcl_session_disconnected_handler (vcl_worker_t * wrk,
529                                   session_disconnected_msg_t * msg)
530 {
531   vcl_session_t *session;
532
533   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
534   if (!session)
535     {
536       VDBG (0, "request to disconnect unknown handle 0x%llx", msg->handle);
537       return 0;
538     }
539
540   /* Caught a disconnect before actually accepting the session */
541   if (session->session_state == STATE_LISTEN)
542     {
543
544       if (!vcl_flag_accepted_session (session, msg->handle,
545                                       VCL_ACCEPTED_F_CLOSED))
546         VDBG (0, "session was not accepted!");
547       return 0;
548     }
549
550   session->session_state = STATE_VPP_CLOSING;
551   return session;
552 }
553
554 static int
555 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
556 {
557   session_disconnected_msg_t *disconnected_msg;
558   vcl_session_t *session;
559
560   switch (e->event_type)
561     {
562     case FIFO_EVENT_APP_RX:
563     case FIFO_EVENT_APP_TX:
564     case SESSION_IO_EVT_CT_RX:
565     case SESSION_IO_EVT_CT_TX:
566       vec_add1 (wrk->unhandled_evts_vector, *e);
567       break;
568     case SESSION_CTRL_EVT_ACCEPTED:
569       vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
570       break;
571     case SESSION_CTRL_EVT_CONNECTED:
572       vcl_session_connected_handler (wrk,
573                                      (session_connected_msg_t *) e->data);
574       break;
575     case SESSION_CTRL_EVT_DISCONNECTED:
576       disconnected_msg = (session_disconnected_msg_t *) e->data;
577       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
578       if (!session)
579         break;
580       session->session_state = STATE_DISCONNECT;
581       VDBG (0, "disconnected session %u [0x%llx]", session->session_index,
582             session->vpp_handle);
583       break;
584     case SESSION_CTRL_EVT_RESET:
585       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
586       break;
587     case SESSION_CTRL_EVT_BOUND:
588       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
589       break;
590     default:
591       clib_warning ("unhandled %u", e->event_type);
592     }
593   return VPPCOM_OK;
594 }
595
596 static inline int
597 vppcom_wait_for_session_state_change (u32 session_index,
598                                       session_state_t state,
599                                       f64 wait_for_time)
600 {
601   vcl_worker_t *wrk = vcl_worker_get_current ();
602   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
603   vcl_session_t *volatile session;
604   svm_msg_q_msg_t msg;
605   session_event_t *e;
606
607   do
608     {
609       session = vcl_session_get (wrk, session_index);
610       if (PREDICT_FALSE (!session))
611         {
612           return VPPCOM_EBADFD;
613         }
614       if (session->session_state & state)
615         {
616           return VPPCOM_OK;
617         }
618       if (session->session_state & STATE_FAILED)
619         {
620           return VPPCOM_ECONNREFUSED;
621         }
622
623       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
624         {
625           usleep (100);
626           continue;
627         }
628       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
629       vcl_handle_mq_event (wrk, e);
630       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
631     }
632   while (clib_time_now (&wrk->clib_time) < timeout);
633
634   VDBG (0, "timeout waiting for state 0x%x (%s)", state,
635         vppcom_session_state_str (state));
636   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
637
638   return VPPCOM_ETIMEDOUT;
639 }
640
641 static int
642 vppcom_app_session_enable (void)
643 {
644   int rv;
645
646   if (vcm->app_state != STATE_APP_ENABLED)
647     {
648       vppcom_send_session_enable_disable (1 /* is_enabled == TRUE */ );
649       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
650       if (PREDICT_FALSE (rv))
651         {
652           VDBG (0, "VCL<%d>: application session enable timed out! "
653                 "returning %d (%s)", getpid (), rv, vppcom_retval_str (rv));
654           return rv;
655         }
656     }
657   return VPPCOM_OK;
658 }
659
660 static int
661 vppcom_app_attach (void)
662 {
663   int rv;
664
665   vppcom_app_send_attach ();
666   rv = vcl_wait_for_app_state_change (STATE_APP_ATTACHED);
667   if (PREDICT_FALSE (rv))
668     {
669       VDBG (0, "VCL<%d>: application attach timed out! returning %d (%s)",
670             getpid (), rv, vppcom_retval_str (rv));
671       return rv;
672     }
673
674   return VPPCOM_OK;
675 }
676
677 static int
678 vppcom_session_unbind (u32 session_handle)
679 {
680   vcl_worker_t *wrk = vcl_worker_get_current ();
681   vcl_session_t *session = 0;
682   u64 vpp_handle;
683
684   session = vcl_session_get_w_handle (wrk, session_handle);
685   if (!session)
686     return VPPCOM_EBADFD;
687
688   vpp_handle = session->vpp_handle;
689   vcl_session_table_del_listener (wrk, vpp_handle);
690   session->vpp_handle = ~0;
691   session->session_state = STATE_DISCONNECT;
692
693   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
694         " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
695         vppcom_session_state_str (STATE_DISCONNECT));
696   vcl_evt (VCL_EVT_UNBIND, session);
697   vppcom_send_unbind_sock (vpp_handle);
698
699   return VPPCOM_OK;
700 }
701
702 static int
703 vppcom_session_disconnect (u32 session_handle)
704 {
705   vcl_worker_t *wrk = vcl_worker_get_current ();
706   svm_msg_q_t *vpp_evt_q;
707   vcl_session_t *session;
708   session_state_t state;
709   u64 vpp_handle;
710
711   session = vcl_session_get_w_handle (wrk, session_handle);
712   if (!session)
713     return VPPCOM_EBADFD;
714
715   vpp_handle = session->vpp_handle;
716   state = session->session_state;
717
718   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u state 0x%x (%s)", getpid (),
719         vpp_handle, session_handle, state, vppcom_session_state_str (state));
720
721   if (PREDICT_FALSE (state & STATE_LISTEN))
722     {
723       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
724                     "Cannot disconnect a listen socket!",
725                     getpid (), vpp_handle, session_handle);
726       return VPPCOM_EBADFD;
727     }
728
729   if (state & STATE_VPP_CLOSING)
730     {
731       vpp_evt_q = vcl_session_vpp_evt_q (wrk, session);
732       vcl_send_session_disconnected_reply (vpp_evt_q, wrk->my_client_index,
733                                            vpp_handle, 0);
734       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
735             "REPLY...", getpid (), vpp_handle, session_handle);
736     }
737   else
738     {
739       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...",
740             getpid (), vpp_handle, session_handle);
741       vppcom_send_disconnect_session (vpp_handle);
742     }
743
744   return VPPCOM_OK;
745 }
746
747 static void
748 vcl_cleanup_bapi (void)
749 {
750   socket_client_main_t *scm = &socket_client_main;
751   api_main_t *am = &api_main;
752
753   am->my_client_index = ~0;
754   am->my_registration = 0;
755   am->vl_input_queue = 0;
756   am->msg_index_by_name_and_crc = 0;
757   scm->socket_fd = 0;
758
759   vl_client_api_unmap ();
760 }
761
762 static void
763 vcl_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
764 {
765   vcl_worker_t *sub_child;
766   int tries = 0;
767
768   if (child_wrk->forked_child != ~0)
769     {
770       sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
771       if (sub_child)
772         {
773           /* Wait a bit, maybe the process is going away */
774           while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
775             usleep (1e3);
776           if (kill (sub_child->current_pid, 0) < 0)
777             vcl_cleanup_forked_child (child_wrk, sub_child);
778         }
779     }
780   vcl_worker_cleanup (child_wrk, 1 /* notify vpp */ );
781   VDBG (0, "Cleaned up wrk %u", child_wrk->wrk_index);
782   wrk->forked_child = ~0;
783 }
784
785 static struct sigaction old_sa;
786
787 static void
788 vcl_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
789 {
790   vcl_worker_t *wrk, *child_wrk;
791
792   if (vcl_get_worker_index () == ~0)
793     return;
794
795   if (sigaction (SIGCHLD, &old_sa, 0))
796     {
797       VERR ("couldn't restore sigchld");
798       exit (-1);
799     }
800
801   wrk = vcl_worker_get_current ();
802   if (wrk->forked_child == ~0)
803     return;
804
805   child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
806   if (!child_wrk)
807     goto done;
808
809   if (si && si->si_pid != child_wrk->current_pid)
810     {
811       VDBG (0, "unexpected child pid %u", si->si_pid);
812       goto done;
813     }
814   vcl_cleanup_forked_child (wrk, child_wrk);
815
816 done:
817   if (old_sa.sa_flags & SA_SIGINFO)
818     {
819       void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
820       fn (signum, si, uc);
821     }
822   else
823     {
824       void (*fn) (int) = old_sa.sa_handler;
825       if (fn)
826         fn (signum);
827     }
828 }
829
830 static void
831 vcl_incercept_sigchld ()
832 {
833   struct sigaction sa;
834   clib_memset (&sa, 0, sizeof (sa));
835   sa.sa_sigaction = vcl_intercept_sigchld_handler;
836   sa.sa_flags = SA_SIGINFO;
837   if (sigaction (SIGCHLD, &sa, &old_sa))
838     {
839       VERR ("couldn't intercept sigchld");
840       exit (-1);
841     }
842 }
843
844 static void
845 vcl_app_pre_fork (void)
846 {
847   vcl_incercept_sigchld ();
848 }
849
850 static void
851 vcl_app_fork_child_handler (void)
852 {
853   int rv, parent_wrk_index;
854   vcl_worker_t *parent_wrk;
855   u8 *child_name;
856
857   parent_wrk_index = vcl_get_worker_index ();
858   VDBG (0, "initializing forked child with parent wrk %u", parent_wrk_index);
859
860   /*
861    * Allocate worker
862    */
863   vcl_set_worker_index (~0);
864   if (!vcl_worker_alloc_and_init ())
865     VERR ("couldn't allocate new worker");
866
867   /*
868    * Attach to binary api
869    */
870   child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
871   vcl_cleanup_bapi ();
872   vppcom_api_hookup ();
873   vcm->app_state = STATE_APP_START;
874   rv = vppcom_connect_to_vpp ((char *) child_name);
875   vec_free (child_name);
876   if (rv)
877     {
878       VERR ("couldn't connect to VPP!");
879       return;
880     }
881
882   /*
883    * Register worker with vpp and share sessions
884    */
885   vcl_worker_register_with_vpp ();
886   parent_wrk = vcl_worker_get (parent_wrk_index);
887   vcl_worker_share_sessions (parent_wrk);
888   parent_wrk->forked_child = vcl_get_worker_index ();
889
890   VDBG (0, "forked child main worker initialized");
891   vcm->forking = 0;
892 }
893
894 static void
895 vcl_app_fork_parent_handler (void)
896 {
897   vcm->forking = 1;
898   while (vcm->forking)
899     ;
900 }
901
902 /**
903  * Handle app exit
904  *
905  * Notify vpp of the disconnect and mark the worker as free. If we're the
906  * last worker, do a full cleanup otherwise, since we're probably a forked
907  * child, avoid syscalls as much as possible. We might've lost privileges.
908  */
909 void
910 vppcom_app_exit (void)
911 {
912   if (!pool_elts (vcm->workers))
913     return;
914   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
915   vcl_set_worker_index (~0);
916   vcl_elog_stop (vcm);
917   if (vec_len (vcm->workers) == 1)
918     vl_client_disconnect_from_vlib ();
919   else
920     vl_client_send_disconnect (1 /* vpp should cleanup */ );
921 }
922
923 /*
924  * VPPCOM Public API functions
925  */
926 int
927 vppcom_app_create (char *app_name)
928 {
929   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
930   int rv;
931
932   if (vcm->is_init)
933     {
934       VDBG (1, "already initialized");
935       return VPPCOM_EEXIST;
936     }
937
938   vcm->is_init = 1;
939   vppcom_cfg (&vcm->cfg);
940   vcl_cfg = &vcm->cfg;
941
942   vcm->main_cpu = pthread_self ();
943   vcm->main_pid = getpid ();
944   vcm->app_name = format (0, "%s", app_name);
945   vppcom_init_error_string_table ();
946   svm_fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
947                               20 /* timeout in secs */ );
948   pool_alloc (vcm->workers, vcl_cfg->max_workers);
949   clib_spinlock_init (&vcm->workers_lock);
950   clib_rwlock_init (&vcm->segment_table_lock);
951   pthread_atfork (vcl_app_pre_fork, vcl_app_fork_parent_handler,
952                   vcl_app_fork_child_handler);
953   atexit (vppcom_app_exit);
954
955   /* Allocate default worker */
956   vcl_worker_alloc_and_init ();
957
958   /* API hookup and connect to VPP */
959   vppcom_api_hookup ();
960   vcl_elog_init (vcm);
961   vcm->app_state = STATE_APP_START;
962   rv = vppcom_connect_to_vpp (app_name);
963   if (rv)
964     {
965       VERR ("couldn't connect to VPP!");
966       return rv;
967     }
968   VDBG (0, "sending session enable");
969   rv = vppcom_app_session_enable ();
970   if (rv)
971     {
972       VERR ("vppcom_app_session_enable() failed!");
973       return rv;
974     }
975
976   VDBG (0, "sending app attach");
977   rv = vppcom_app_attach ();
978   if (rv)
979     {
980       VERR ("vppcom_app_attach() failed!");
981       return rv;
982     }
983
984   VDBG (0, "app_name '%s', my_client_index %d (0x%x)", app_name,
985         vcm->workers[0].my_client_index, vcm->workers[0].my_client_index);
986
987   return VPPCOM_OK;
988 }
989
990 void
991 vppcom_app_destroy (void)
992 {
993   int rv;
994   f64 orig_app_timeout;
995
996   if (!pool_elts (vcm->workers))
997     return;
998
999   vcl_evt (VCL_EVT_DETACH, vcm);
1000
1001   if (pool_elts (vcm->workers) == 1)
1002     {
1003       vppcom_app_send_detach ();
1004       orig_app_timeout = vcm->cfg.app_timeout;
1005       vcm->cfg.app_timeout = 2.0;
1006       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
1007       vcm->cfg.app_timeout = orig_app_timeout;
1008       if (PREDICT_FALSE (rv))
1009         VDBG (0, "application detach timed out! returning %d (%s)", rv,
1010               vppcom_retval_str (rv));
1011       vec_free (vcm->app_name);
1012       vcl_worker_cleanup (vcl_worker_get_current (), 0 /* notify vpp */ );
1013     }
1014   else
1015     {
1016       vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
1017     }
1018
1019   vcl_set_worker_index (~0);
1020   vcl_elog_stop (vcm);
1021   vl_client_disconnect_from_vlib ();
1022 }
1023
1024 int
1025 vppcom_session_create (u8 proto, u8 is_nonblocking)
1026 {
1027   vcl_worker_t *wrk = vcl_worker_get_current ();
1028   vcl_session_t *session;
1029
1030   session = vcl_session_alloc (wrk);
1031
1032   session->session_type = proto;
1033   session->session_state = STATE_START;
1034   session->vpp_handle = ~0;
1035   session->is_dgram = proto == VPPCOM_PROTO_UDP;
1036
1037   if (is_nonblocking)
1038     VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
1039
1040   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
1041            is_nonblocking, session_index);
1042
1043   VDBG (0, "created sid %u", session->session_index);
1044
1045   return vcl_session_handle (session);
1046 }
1047
1048 int
1049 vppcom_session_close (uint32_t session_handle)
1050 {
1051   vcl_worker_t *wrk = vcl_worker_get_current ();
1052   u8 is_vep, do_disconnect = 1;
1053   vcl_session_t *session = 0;
1054   session_state_t state;
1055   u32 next_sh, vep_sh;
1056   int rv = VPPCOM_OK;
1057   u64 vpp_handle;
1058
1059   session = vcl_session_get_w_handle (wrk, session_handle);
1060   if (!session)
1061     return VPPCOM_EBADFD;
1062
1063   if (session->shared_index != ~0)
1064     do_disconnect = vcl_worker_unshare_session (wrk, session);
1065
1066   is_vep = session->is_vep;
1067   next_sh = session->vep.next_sh;
1068   vep_sh = session->vep.vep_sh;
1069   state = session->session_state;
1070   vpp_handle = session->vpp_handle;
1071
1072   VDBG (1, "closing session handle %u vpp handle %u", session_handle,
1073         vpp_handle);
1074
1075   if (is_vep)
1076     {
1077       while (next_sh != ~0)
1078         {
1079           rv = vppcom_epoll_ctl (session_handle, EPOLL_CTL_DEL, next_sh, 0);
1080           if (PREDICT_FALSE (rv < 0))
1081             VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u"
1082                   " failed! rv %d (%s)", vpp_handle, next_sh, vep_sh, rv,
1083                   vppcom_retval_str (rv));
1084
1085           next_sh = session->vep.next_sh;
1086         }
1087     }
1088   else
1089     {
1090       if (session->is_vep_session)
1091         {
1092           rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, session_handle, 0);
1093           if (rv < 0)
1094             VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u "
1095                   "failed! rv %d (%s)", vpp_handle, session_handle, vep_sh,
1096                   rv, vppcom_retval_str (rv));
1097         }
1098
1099       if (!do_disconnect)
1100         goto cleanup;
1101
1102       if (state & STATE_LISTEN)
1103         {
1104           rv = vppcom_session_unbind (session_handle);
1105           if (PREDICT_FALSE (rv < 0))
1106             VDBG (0, "vpp handle 0x%llx, sid %u: listener unbind failed! "
1107                   "rv %d (%s)", vpp_handle, session_handle, rv,
1108                   vppcom_retval_str (rv));
1109         }
1110       else if (state & STATE_OPEN)
1111         {
1112           rv = vppcom_session_disconnect (session_handle);
1113           if (PREDICT_FALSE (rv < 0))
1114             clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1115                           "session disconnect failed! rv %d (%s)",
1116                           getpid (), vpp_handle, session_handle,
1117                           rv, vppcom_retval_str (rv));
1118         }
1119       else if (state == STATE_DISCONNECT)
1120         {
1121           svm_msg_q_t *mq = vcl_session_vpp_evt_q (wrk, session);
1122           vcl_send_session_reset_reply (mq, wrk->my_client_index,
1123                                         session->vpp_handle, 0);
1124         }
1125     }
1126
1127 cleanup:
1128
1129   if (vcl_session_is_ct (session))
1130     {
1131       vcl_cut_through_registration_t *ctr;
1132       uword mq_addr;
1133
1134       mq_addr = pointer_to_uword (session->our_evt_q);
1135       ctr = vcl_ct_registration_lock_and_lookup (wrk, mq_addr);
1136       ASSERT (ctr);
1137       if (ctr->epoll_evt_conn_index != ~0)
1138         vcl_mq_epoll_del_evfd (wrk, ctr->epoll_evt_conn_index);
1139       VDBG (0, "Removing ct registration %u",
1140             vcl_ct_registration_index (wrk, ctr));
1141       vcl_ct_registration_del (wrk, ctr);
1142       vcl_ct_registration_lookup_del (wrk, mq_addr);
1143       vcl_ct_registration_unlock (wrk);
1144     }
1145
1146   if (vpp_handle != ~0)
1147     {
1148       vcl_session_table_del_vpp_handle (wrk, vpp_handle);
1149     }
1150   vcl_session_free (wrk, session);
1151
1152   VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle);
1153
1154   vcl_evt (VCL_EVT_CLOSE, session, rv);
1155
1156   return rv;
1157 }
1158
1159 int
1160 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
1161 {
1162   vcl_worker_t *wrk = vcl_worker_get_current ();
1163   vcl_session_t *session = 0;
1164
1165   if (!ep || !ep->ip)
1166     return VPPCOM_EINVAL;
1167
1168   session = vcl_session_get_w_handle (wrk, session_handle);
1169   if (!session)
1170     return VPPCOM_EBADFD;
1171
1172   if (session->is_vep)
1173     {
1174       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1175                     "bind to an epoll session!", getpid (), session_handle);
1176       return VPPCOM_EBADFD;
1177     }
1178
1179   session->transport.is_ip4 = ep->is_ip4;
1180   if (ep->is_ip4)
1181     clib_memcpy_fast (&session->transport.lcl_ip.ip4, ep->ip,
1182                       sizeof (ip4_address_t));
1183   else
1184     clib_memcpy_fast (&session->transport.lcl_ip.ip6, ep->ip,
1185                       sizeof (ip6_address_t));
1186   session->transport.lcl_port = ep->port;
1187
1188   VDBG (0, "VCL<%d>: sid %u: binding to local %s address %U port %u, "
1189         "proto %s", getpid (), session_handle,
1190         session->transport.is_ip4 ? "IPv4" : "IPv6",
1191         format_ip46_address, &session->transport.lcl_ip,
1192         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1193         clib_net_to_host_u16 (session->transport.lcl_port),
1194         session->session_type ? "UDP" : "TCP");
1195   vcl_evt (VCL_EVT_BIND, session);
1196
1197   if (session->session_type == VPPCOM_PROTO_UDP)
1198     vppcom_session_listen (session_handle, 10);
1199
1200   return VPPCOM_OK;
1201 }
1202
1203 int
1204 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
1205 {
1206   vcl_worker_t *wrk = vcl_worker_get_current ();
1207   vcl_session_t *listen_session = 0;
1208   u64 listen_vpp_handle;
1209   int rv;
1210
1211   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1212   if (!listen_session || listen_session->is_vep)
1213     return VPPCOM_EBADFD;
1214
1215   if (q_len == 0 || q_len == ~0)
1216     q_len = vcm->cfg.listen_queue_size;
1217
1218   listen_vpp_handle = listen_session->vpp_handle;
1219   if (listen_session->session_state & STATE_LISTEN)
1220     {
1221       VDBG (0, "session %u [0x%llx]: already in listen state!",
1222             listen_sh, listen_vpp_handle);
1223       return VPPCOM_OK;
1224     }
1225
1226   VDBG (0, "session %u [0x%llx]: sending vpp listen request...",
1227         listen_sh, listen_vpp_handle);
1228
1229   /*
1230    * Send listen request to vpp and wait for reply
1231    */
1232   vppcom_send_bind_sock (listen_session);
1233   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1234                                              STATE_LISTEN,
1235                                              vcm->cfg.session_timeout);
1236
1237   if (PREDICT_FALSE (rv))
1238     {
1239       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1240       VDBG (0, "session %u [0x%llx]: listen failed! returning %d (%s)",
1241             listen_sh, listen_session->vpp_handle, rv,
1242             vppcom_retval_str (rv));
1243       return rv;
1244     }
1245
1246   return VPPCOM_OK;
1247 }
1248
1249 static int
1250 validate_args_session_accept_ (vcl_worker_t * wrk,
1251                                vcl_session_t * listen_session)
1252 {
1253   /* Input validation - expects spinlock on sessions_lockp */
1254   if (listen_session->is_vep)
1255     {
1256       clib_warning ("VCL<%d>: ERROR: sid %u: cannot accept on an "
1257                     "epoll session!", getpid (),
1258                     listen_session->session_index);
1259       return VPPCOM_EBADFD;
1260     }
1261
1262   if (listen_session->session_state != STATE_LISTEN)
1263     {
1264       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1265                     "not in listen state! state 0x%x (%s)", getpid (),
1266                     listen_session->vpp_handle, listen_session->session_index,
1267                     listen_session->session_state,
1268                     vppcom_session_state_str (listen_session->session_state));
1269       return VPPCOM_EBADFD;
1270     }
1271   return VPPCOM_OK;
1272 }
1273
1274 int
1275 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1276                        uint32_t flags)
1277 {
1278   u32 client_session_index = ~0, listen_session_index, accept_flags = 0;
1279   vcl_worker_t *wrk = vcl_worker_get_current ();
1280   session_accepted_msg_t accepted_msg;
1281   vcl_session_t *listen_session = 0;
1282   vcl_session_t *client_session = 0;
1283   svm_msg_q_t *vpp_evt_q;
1284   vcl_session_msg_t *evt;
1285   u64 listen_vpp_handle;
1286   svm_msg_q_msg_t msg;
1287   session_event_t *e;
1288   u8 is_nonblocking;
1289   int rv;
1290
1291   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1292   if (!listen_session)
1293     return VPPCOM_EBADFD;
1294
1295   listen_session_index = listen_session->session_index;
1296   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1297     return rv;
1298
1299   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1300     {
1301       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1302       accept_flags = evt->flags;
1303       accepted_msg = evt->accepted_msg;
1304       goto handle;
1305     }
1306
1307   is_nonblocking = VCL_SESS_ATTR_TEST (listen_session->attr,
1308                                        VCL_SESS_ATTR_NONBLOCK);
1309   if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1310     return VPPCOM_EAGAIN;
1311
1312   while (1)
1313     {
1314       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1315         return VPPCOM_EAGAIN;
1316
1317       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1318       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1319         {
1320           clib_warning ("discarded event: %u", e->event_type);
1321           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1322           continue;
1323         }
1324       clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
1325       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1326       break;
1327     }
1328
1329 handle:
1330
1331   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg);
1332   listen_session = vcl_session_get (wrk, listen_session_index);
1333   client_session = vcl_session_get (wrk, client_session_index);
1334
1335   if (flags & O_NONBLOCK)
1336     VCL_SESS_ATTR_SET (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
1337
1338   listen_vpp_handle = listen_session->vpp_handle;
1339   VDBG (1, "vpp handle 0x%llx, sid %u: Got a client request! "
1340         "vpp handle 0x%llx, sid %u, flags %d, is_nonblocking %u",
1341         listen_vpp_handle, listen_session_handle,
1342         client_session->vpp_handle, client_session_index,
1343         flags, VCL_SESS_ATTR_TEST (client_session->attr,
1344                                    VCL_SESS_ATTR_NONBLOCK));
1345
1346   if (ep)
1347     {
1348       ep->is_ip4 = client_session->transport.is_ip4;
1349       ep->port = client_session->transport.rmt_port;
1350       if (client_session->transport.is_ip4)
1351         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip4,
1352                           sizeof (ip4_address_t));
1353       else
1354         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip6,
1355                           sizeof (ip6_address_t));
1356     }
1357
1358   if (accepted_msg.server_event_queue_address)
1359     vpp_evt_q = uword_to_pointer (accepted_msg.vpp_event_queue_address,
1360                                   svm_msg_q_t *);
1361   else
1362     vpp_evt_q = client_session->vpp_evt_q;
1363
1364   vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
1365                                    client_session->vpp_handle, 0);
1366
1367   VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
1368         "local: %U:%u", listen_session_handle, listen_vpp_handle,
1369         client_session_index, client_session->vpp_handle,
1370         format_ip46_address, &client_session->transport.rmt_ip,
1371         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1372         clib_net_to_host_u16 (client_session->transport.rmt_port),
1373         format_ip46_address, &client_session->transport.lcl_ip,
1374         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1375         clib_net_to_host_u16 (client_session->transport.lcl_port));
1376   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1377            client_session_index);
1378
1379   /*
1380    * Session might have been closed already
1381    */
1382   if (accept_flags)
1383     {
1384       if (accept_flags & VCL_ACCEPTED_F_CLOSED)
1385         client_session->session_state = STATE_VPP_CLOSING;
1386       else if (accept_flags & VCL_ACCEPTED_F_RESET)
1387         client_session->session_state = STATE_DISCONNECT;
1388     }
1389   return vcl_session_handle (client_session);
1390 }
1391
1392 int
1393 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1394 {
1395   vcl_worker_t *wrk = vcl_worker_get_current ();
1396   vcl_session_t *session = 0;
1397   u32 session_index;
1398   int rv;
1399
1400   session = vcl_session_get_w_handle (wrk, session_handle);
1401   if (!session)
1402     return VPPCOM_EBADFD;
1403   session_index = session->session_index;
1404
1405   if (PREDICT_FALSE (session->is_vep))
1406     {
1407       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1408                     "connect on an epoll session!", getpid (),
1409                     session_handle);
1410       return VPPCOM_EBADFD;
1411     }
1412
1413   if (PREDICT_FALSE (session->session_state & CLIENT_STATE_OPEN))
1414     {
1415       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: session already "
1416             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1417             getpid (), session->vpp_handle, session_handle,
1418             session->transport.is_ip4 ? "IPv4" : "IPv6",
1419             format_ip46_address,
1420             &session->transport.rmt_ip, session->transport.is_ip4 ?
1421             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1422             clib_net_to_host_u16 (session->transport.rmt_port),
1423             session->session_type ? "UDP" : "TCP", session->session_state,
1424             vppcom_session_state_str (session->session_state));
1425       return VPPCOM_OK;
1426     }
1427
1428   session->transport.is_ip4 = server_ep->is_ip4;
1429   if (session->transport.is_ip4)
1430     clib_memcpy_fast (&session->transport.rmt_ip.ip4, server_ep->ip,
1431                       sizeof (ip4_address_t));
1432   else
1433     clib_memcpy_fast (&session->transport.rmt_ip.ip6, server_ep->ip,
1434                       sizeof (ip6_address_t));
1435   session->transport.rmt_port = server_ep->port;
1436
1437   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connecting to server %s %U "
1438         "port %d proto %s",
1439         getpid (), session->vpp_handle, session_handle,
1440         session->transport.is_ip4 ? "IPv4" : "IPv6",
1441         format_ip46_address,
1442         &session->transport.rmt_ip, session->transport.is_ip4 ?
1443         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1444         clib_net_to_host_u16 (session->transport.rmt_port),
1445         session->session_type ? "UDP" : "TCP");
1446
1447   /*
1448    * Send connect request and wait for reply from vpp
1449    */
1450   vppcom_send_connect_sock (session);
1451   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
1452                                              vcm->cfg.session_timeout);
1453
1454   session = vcl_session_get (wrk, session_index);
1455
1456   if (PREDICT_FALSE (rv))
1457     {
1458       if (VPPCOM_DEBUG > 0)
1459         {
1460           if (session)
1461             clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: connect "
1462                           "failed! returning %d (%s)", getpid (),
1463                           session->vpp_handle, session_handle, rv,
1464                           vppcom_retval_str (rv));
1465           else
1466             clib_warning ("VCL<%d>: no session for sid %u: connect failed! "
1467                           "returning %d (%s)", getpid (),
1468                           session_handle, rv, vppcom_retval_str (rv));
1469         }
1470     }
1471   else
1472     VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connected!",
1473           getpid (), session->vpp_handle, session_handle);
1474
1475   return rv;
1476 }
1477
1478 static u8
1479 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1480 {
1481   if (!is_ct)
1482     return (e->event_type == FIFO_EVENT_APP_RX
1483             && e->fifo->client_session_index == sid);
1484   else
1485     return (e->event_type == SESSION_IO_EVT_CT_TX);
1486 }
1487
1488 static inline u8
1489 vcl_session_is_readable (vcl_session_t * s)
1490 {
1491   return ((s->session_state & STATE_OPEN)
1492           || (s->session_state == STATE_LISTEN
1493               && s->session_type == VPPCOM_PROTO_UDP));
1494 }
1495
1496 static inline int
1497 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1498                               u8 peek)
1499 {
1500   vcl_worker_t *wrk = vcl_worker_get_current ();
1501   int n_read = 0, rv, is_nonblocking;
1502   vcl_session_t *s = 0;
1503   svm_fifo_t *rx_fifo;
1504   svm_msg_q_msg_t msg;
1505   session_event_t *e;
1506   svm_msg_q_t *mq;
1507   u8 is_ct;
1508
1509   if (PREDICT_FALSE (!buf))
1510     return VPPCOM_EINVAL;
1511
1512   s = vcl_session_get_w_handle (wrk, session_handle);
1513   if (PREDICT_FALSE (!s || s->is_vep))
1514     return VPPCOM_EBADFD;
1515
1516   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
1517     {
1518       session_state_t state = s->session_state;
1519       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1520
1521       VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s),"
1522             " returning %d (%s)", session_handle, s->vpp_handle, state,
1523             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1524       return rv;
1525     }
1526
1527   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1528   is_ct = vcl_session_is_ct (s);
1529   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1530   rx_fifo = s->rx_fifo;
1531   s->has_rx_evt = 0;
1532
1533   if (svm_fifo_is_empty (rx_fifo))
1534     {
1535       if (is_nonblocking)
1536         {
1537           svm_fifo_unset_event (rx_fifo);
1538           return VPPCOM_EWOULDBLOCK;
1539         }
1540       while (svm_fifo_is_empty (rx_fifo))
1541         {
1542           svm_fifo_unset_event (rx_fifo);
1543           svm_msg_q_lock (mq);
1544           if (svm_msg_q_is_empty (mq))
1545             svm_msg_q_wait (mq);
1546
1547           svm_msg_q_sub_w_lock (mq, &msg);
1548           e = svm_msg_q_msg_data (mq, &msg);
1549           svm_msg_q_unlock (mq);
1550           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1551             vcl_handle_mq_event (wrk, e);
1552           svm_msg_q_free_msg (mq, &msg);
1553
1554           if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
1555             return VPPCOM_ECONNRESET;
1556         }
1557     }
1558
1559   if (s->is_dgram)
1560     n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1561   else
1562     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1563
1564   if (svm_fifo_is_empty (rx_fifo))
1565     svm_fifo_unset_event (rx_fifo);
1566
1567   if (is_ct && svm_fifo_want_tx_evt (rx_fifo))
1568     {
1569       svm_fifo_set_want_tx_evt (s->rx_fifo, 0);
1570       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
1571                               SVM_Q_WAIT);
1572     }
1573
1574   VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
1575         getpid (), s->vpp_handle, session_handle, n_read, rx_fifo);
1576
1577   return n_read;
1578 }
1579
1580 int
1581 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1582 {
1583   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1584 }
1585
1586 static int
1587 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1588 {
1589   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1590 }
1591
1592 int
1593 vppcom_session_read_segments (uint32_t session_handle,
1594                               vppcom_data_segments_t ds)
1595 {
1596   vcl_worker_t *wrk = vcl_worker_get_current ();
1597   int n_read = 0, rv, is_nonblocking;
1598   vcl_session_t *s = 0;
1599   svm_fifo_t *rx_fifo;
1600   svm_msg_q_msg_t msg;
1601   session_event_t *e;
1602   svm_msg_q_t *mq;
1603   u8 is_ct;
1604
1605   s = vcl_session_get_w_handle (wrk, session_handle);
1606   if (PREDICT_FALSE (!s || s->is_vep))
1607     return VPPCOM_EBADFD;
1608
1609   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
1610     {
1611       session_state_t state = s->session_state;
1612       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1613       return rv;
1614     }
1615
1616   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1617   is_ct = vcl_session_is_ct (s);
1618   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1619   rx_fifo = s->rx_fifo;
1620   s->has_rx_evt = 0;
1621
1622   if (svm_fifo_is_empty (rx_fifo))
1623     {
1624       if (is_nonblocking)
1625         {
1626           svm_fifo_unset_event (rx_fifo);
1627           return VPPCOM_EWOULDBLOCK;
1628         }
1629       while (svm_fifo_is_empty (rx_fifo))
1630         {
1631           svm_fifo_unset_event (rx_fifo);
1632           svm_msg_q_lock (mq);
1633           if (svm_msg_q_is_empty (mq))
1634             svm_msg_q_wait (mq);
1635
1636           svm_msg_q_sub_w_lock (mq, &msg);
1637           e = svm_msg_q_msg_data (mq, &msg);
1638           svm_msg_q_unlock (mq);
1639           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1640             vcl_handle_mq_event (wrk, e);
1641           svm_msg_q_free_msg (mq, &msg);
1642
1643           if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
1644             return VPPCOM_ECONNRESET;
1645         }
1646     }
1647
1648   n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
1649   svm_fifo_unset_event (rx_fifo);
1650
1651   if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
1652     {
1653       /* If the peer is not polling send notification */
1654       if (!svm_fifo_has_event (s->rx_fifo))
1655         app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
1656                                 SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
1657     }
1658
1659   return n_read;
1660 }
1661
1662 void
1663 vppcom_session_free_segments (uint32_t session_handle,
1664                               vppcom_data_segments_t ds)
1665 {
1666   vcl_worker_t *wrk = vcl_worker_get_current ();
1667   vcl_session_t *s;
1668
1669   s = vcl_session_get_w_handle (wrk, session_handle);
1670   if (PREDICT_FALSE (!s || s->is_vep))
1671     return;
1672
1673   svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
1674 }
1675
1676 static inline int
1677 vppcom_session_read_ready (vcl_session_t * session)
1678 {
1679   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1680   if (PREDICT_FALSE (session->is_vep))
1681     {
1682       clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
1683                     "epoll session!", getpid (), session->session_index);
1684       return VPPCOM_EBADFD;
1685     }
1686
1687   if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
1688     {
1689       session_state_t state = session->session_state;
1690       int rv;
1691
1692       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1693
1694       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
1695             " state 0x%x (%s), returning %d (%s)", getpid (),
1696             session->vpp_handle, session->session_index, state,
1697             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1698       return rv;
1699     }
1700
1701   if (session->session_state & STATE_LISTEN)
1702     return clib_fifo_elts (session->accept_evts_fifo);
1703
1704   return svm_fifo_max_dequeue (session->rx_fifo);
1705 }
1706
1707 int
1708 vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
1709 {
1710   u32 first_copy = clib_min (ds[0].len, max_bytes);
1711   clib_memcpy_fast (buf, ds[0].data, first_copy);
1712   if (first_copy < max_bytes)
1713     {
1714       clib_memcpy_fast (buf + first_copy, ds[1].data,
1715                         clib_min (ds[1].len, max_bytes - first_copy));
1716     }
1717   return 0;
1718 }
1719
1720 static u8
1721 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1722 {
1723   if (!is_ct)
1724     return (e->event_type == FIFO_EVENT_APP_TX
1725             && e->fifo->client_session_index == sid);
1726   else
1727     return (e->event_type == SESSION_IO_EVT_CT_RX);
1728 }
1729
1730 static inline int
1731 vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
1732                              u8 is_flush)
1733 {
1734   vcl_worker_t *wrk = vcl_worker_get_current ();
1735   int rv, n_write, is_nonblocking;
1736   vcl_session_t *s = 0;
1737   svm_fifo_t *tx_fifo = 0;
1738   session_evt_type_t et;
1739   svm_msg_q_msg_t msg;
1740   session_event_t *e;
1741   svm_msg_q_t *mq;
1742   u8 is_ct;
1743
1744   if (PREDICT_FALSE (!buf))
1745     return VPPCOM_EINVAL;
1746
1747   s = vcl_session_get_w_handle (wrk, session_handle);
1748   if (PREDICT_FALSE (!s))
1749     return VPPCOM_EBADFD;
1750
1751   if (PREDICT_FALSE (s->is_vep))
1752     {
1753       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1754                     "cannot write to an epoll session!",
1755                     getpid (), s->vpp_handle, session_handle);
1756
1757       return VPPCOM_EBADFD;
1758     }
1759
1760   if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
1761     {
1762       session_state_t state = s->session_state;
1763       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1764       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
1765             "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
1766             state, vppcom_session_state_str (state));
1767       return rv;
1768     }
1769
1770   tx_fifo = s->tx_fifo;
1771   is_ct = vcl_session_is_ct (s);
1772   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1773   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1774   if (svm_fifo_is_full (tx_fifo))
1775     {
1776       if (is_nonblocking)
1777         {
1778           return VPPCOM_EWOULDBLOCK;
1779         }
1780       while (svm_fifo_is_full (tx_fifo))
1781         {
1782           svm_fifo_set_want_tx_evt (tx_fifo, 1);
1783           svm_msg_q_lock (mq);
1784           if (svm_msg_q_is_empty (mq))
1785             svm_msg_q_wait (mq);
1786
1787           svm_msg_q_sub_w_lock (mq, &msg);
1788           e = svm_msg_q_msg_data (mq, &msg);
1789           svm_msg_q_unlock (mq);
1790
1791           if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
1792             vcl_handle_mq_event (wrk, e);
1793           svm_msg_q_free_msg (mq, &msg);
1794
1795           if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
1796             return VPPCOM_ECONNRESET;
1797         }
1798     }
1799
1800   ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
1801   et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
1802   if (is_flush && !vcl_session_is_ct (s))
1803     et = SESSION_IO_EVT_TX_FLUSH;
1804
1805   if (s->is_dgram)
1806     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
1807                                   s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
1808   else
1809     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
1810                                    SVM_Q_WAIT);
1811
1812   ASSERT (n_write > 0);
1813
1814   VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
1815         s->vpp_handle, session_handle, n_write);
1816
1817   return n_write;
1818 }
1819
1820 int
1821 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
1822 {
1823   return vppcom_session_write_inline (session_handle, buf, n,
1824                                       0 /* is_flush */ );
1825 }
1826
1827 int
1828 vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n)
1829 {
1830   return vppcom_session_write_inline (session_handle, buf, n,
1831                                       1 /* is_flush */ );
1832 }
1833
1834
1835 static vcl_session_t *
1836 vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
1837 {
1838   vcl_session_t *s;
1839   s = vcl_session_get (wrk, f->client_session_index);
1840   if (s)
1841     {
1842       /* rx fifo */
1843       if (type == 0 && s->rx_fifo == f)
1844         return s;
1845       /* tx fifo */
1846       if (type == 1 && s->tx_fifo == f)
1847         return s;
1848     }
1849   s = vcl_session_get (wrk, f->master_session_index);
1850   if (s)
1851     {
1852       if (type == 0 && s->rx_fifo == f)
1853         return s;
1854       if (type == 1 && s->tx_fifo == f)
1855         return s;
1856     }
1857   return 0;
1858 }
1859
1860 static inline int
1861 vppcom_session_write_ready (vcl_session_t * session)
1862 {
1863   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1864   if (PREDICT_FALSE (session->is_vep))
1865     {
1866       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1867                     "cannot write to an epoll session!",
1868                     getpid (), session->vpp_handle, session->session_index);
1869       return VPPCOM_EBADFD;
1870     }
1871
1872   if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
1873     {
1874       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1875                     "cannot write to a listen session!",
1876                     getpid (), session->vpp_handle, session->session_index);
1877       return VPPCOM_EBADFD;
1878     }
1879
1880   if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
1881     {
1882       session_state_t state = session->session_state;
1883       int rv;
1884
1885       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1886       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1887                     "session is not open! state 0x%x (%s), "
1888                     "returning %d (%s)", getpid (), session->vpp_handle,
1889                     session->session_index,
1890                     state, vppcom_session_state_str (state),
1891                     rv, vppcom_retval_str (rv));
1892       return rv;
1893     }
1894
1895   VDBG (3, "VCL<%d>: vpp handle 0x%llx, sid %u: peek %s (%p), ready = %d",
1896         getpid (), session->vpp_handle, session->session_index,
1897         session->tx_fifo, svm_fifo_max_enqueue (session->tx_fifo));
1898
1899   return svm_fifo_max_enqueue (session->tx_fifo);
1900 }
1901
1902 static inline int
1903 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
1904 {
1905   svm_msg_q_msg_t *msg;
1906   u32 n_msgs;
1907   int i;
1908
1909   n_msgs = svm_msg_q_size (mq);
1910   for (i = 0; i < n_msgs; i++)
1911     {
1912       vec_add2 (wrk->mq_msg_vector, msg, 1);
1913       svm_msg_q_sub_w_lock (mq, msg);
1914     }
1915   return n_msgs;
1916 }
1917
1918 #define vcl_fifo_rx_evt_valid_or_break(_fifo)                   \
1919 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                  \
1920   {                                                             \
1921     svm_fifo_unset_event (_fifo);                               \
1922     if (svm_fifo_is_empty (_fifo))                              \
1923       break;                                                    \
1924   }                                                             \
1925
1926 static void
1927 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
1928                             unsigned long n_bits, unsigned long *read_map,
1929                             unsigned long *write_map,
1930                             unsigned long *except_map, u32 * bits_set)
1931 {
1932   session_disconnected_msg_t *disconnected_msg;
1933   session_connected_msg_t *connected_msg;
1934   vcl_session_t *session;
1935   u32 sid;
1936
1937   switch (e->event_type)
1938     {
1939     case FIFO_EVENT_APP_RX:
1940       vcl_fifo_rx_evt_valid_or_break (e->fifo);
1941       sid = e->fifo->client_session_index;
1942       session = vcl_session_get (wrk, sid);
1943       if (!session)
1944         break;
1945       if (sid < n_bits && read_map)
1946         {
1947           clib_bitmap_set_no_check (read_map, sid, 1);
1948           *bits_set += 1;
1949         }
1950       break;
1951     case FIFO_EVENT_APP_TX:
1952       sid = e->fifo->client_session_index;
1953       session = vcl_session_get (wrk, sid);
1954       if (!session)
1955         break;
1956       if (sid < n_bits && write_map)
1957         {
1958           clib_bitmap_set_no_check (write_map, sid, 1);
1959           *bits_set += 1;
1960         }
1961       break;
1962     case SESSION_IO_EVT_CT_TX:
1963       vcl_fifo_rx_evt_valid_or_break (e->fifo);
1964       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
1965       if (!session)
1966         break;
1967       sid = session->session_index;
1968       if (sid < n_bits && read_map)
1969         {
1970           clib_bitmap_set_no_check (read_map, sid, 1);
1971           *bits_set += 1;
1972         }
1973       break;
1974     case SESSION_IO_EVT_CT_RX:
1975       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
1976       if (!session)
1977         break;
1978       sid = session->session_index;
1979       if (sid < n_bits && write_map)
1980         {
1981           clib_bitmap_set_no_check (write_map, sid, 1);
1982           *bits_set += 1;
1983         }
1984       break;
1985     case SESSION_CTRL_EVT_ACCEPTED:
1986       session = vcl_session_accepted (wrk,
1987                                       (session_accepted_msg_t *) e->data);
1988       if (!session)
1989         break;
1990       sid = session->session_index;
1991       if (sid < n_bits && read_map)
1992         {
1993           clib_bitmap_set_no_check (read_map, sid, 1);
1994           *bits_set += 1;
1995         }
1996       break;
1997     case SESSION_CTRL_EVT_CONNECTED:
1998       connected_msg = (session_connected_msg_t *) e->data;
1999       vcl_session_connected_handler (wrk, connected_msg);
2000       break;
2001     case SESSION_CTRL_EVT_DISCONNECTED:
2002       disconnected_msg = (session_disconnected_msg_t *) e->data;
2003       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
2004       if (!session)
2005         break;
2006       sid = session->session_index;
2007       if (sid < n_bits && except_map)
2008         {
2009           clib_bitmap_set_no_check (except_map, sid, 1);
2010           *bits_set += 1;
2011         }
2012       break;
2013     case SESSION_CTRL_EVT_RESET:
2014       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2015       if (sid < n_bits && except_map)
2016         {
2017           clib_bitmap_set_no_check (except_map, sid, 1);
2018           *bits_set += 1;
2019         }
2020       break;
2021     default:
2022       clib_warning ("unhandled: %u", e->event_type);
2023       break;
2024     }
2025 }
2026
2027 static int
2028 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2029                       unsigned long n_bits, unsigned long *read_map,
2030                       unsigned long *write_map, unsigned long *except_map,
2031                       double time_to_wait, u32 * bits_set)
2032 {
2033   svm_msg_q_msg_t *msg;
2034   session_event_t *e;
2035   u32 i;
2036
2037   svm_msg_q_lock (mq);
2038   if (svm_msg_q_is_empty (mq))
2039     {
2040       if (*bits_set)
2041         {
2042           svm_msg_q_unlock (mq);
2043           return 0;
2044         }
2045
2046       if (!time_to_wait)
2047         {
2048           svm_msg_q_unlock (mq);
2049           return 0;
2050         }
2051       else if (time_to_wait < 0)
2052         {
2053           svm_msg_q_wait (mq);
2054         }
2055       else
2056         {
2057           if (svm_msg_q_timedwait (mq, time_to_wait))
2058             {
2059               svm_msg_q_unlock (mq);
2060               return 0;
2061             }
2062         }
2063     }
2064   vcl_mq_dequeue_batch (wrk, mq);
2065   svm_msg_q_unlock (mq);
2066
2067   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2068     {
2069       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2070       e = svm_msg_q_msg_data (mq, msg);
2071       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
2072                                   except_map, bits_set);
2073       svm_msg_q_free_msg (mq, msg);
2074     }
2075   vec_reset_length (wrk->mq_msg_vector);
2076   return *bits_set;
2077 }
2078
2079 static int
2080 vppcom_select_condvar (vcl_worker_t * wrk, unsigned long n_bits,
2081                        unsigned long *read_map, unsigned long *write_map,
2082                        unsigned long *except_map, double time_to_wait,
2083                        u32 * bits_set)
2084 {
2085   double total_wait = 0, wait_slice;
2086   vcl_cut_through_registration_t *cr;
2087
2088   time_to_wait = (time_to_wait == -1) ? 10e9 : time_to_wait;
2089   wait_slice = wrk->cut_through_registrations ? 10e-6 : time_to_wait;
2090   do
2091     {
2092       vcl_ct_registration_lock (wrk);
2093       /* *INDENT-OFF* */
2094       pool_foreach (cr, wrk->cut_through_registrations, ({
2095         vcl_select_handle_mq (wrk, cr->mq, n_bits, read_map, write_map, except_map,
2096                               0, bits_set);
2097       }));
2098       /* *INDENT-ON* */
2099       vcl_ct_registration_unlock (wrk);
2100
2101       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
2102                             write_map, except_map, time_to_wait, bits_set);
2103       total_wait += wait_slice;
2104       if (*bits_set)
2105         return *bits_set;
2106     }
2107   while (total_wait < time_to_wait);
2108
2109   return 0;
2110 }
2111
2112 static int
2113 vppcom_select_eventfd (vcl_worker_t * wrk, unsigned long n_bits,
2114                        unsigned long *read_map, unsigned long *write_map,
2115                        unsigned long *except_map, double time_to_wait,
2116                        u32 * bits_set)
2117 {
2118   vcl_mq_evt_conn_t *mqc;
2119   int __clib_unused n_read;
2120   int n_mq_evts, i;
2121   u64 buf;
2122
2123   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2124   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2125                           vec_len (wrk->mq_events), time_to_wait);
2126   for (i = 0; i < n_mq_evts; i++)
2127     {
2128       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2129       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2130       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
2131                             except_map, 0, bits_set);
2132     }
2133
2134   return (n_mq_evts > 0 ? (int) *bits_set : 0);
2135 }
2136
2137 int
2138 vppcom_select (unsigned long n_bits, unsigned long *read_map,
2139                unsigned long *write_map, unsigned long *except_map,
2140                double time_to_wait)
2141 {
2142   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
2143   vcl_worker_t *wrk = vcl_worker_get_current ();
2144   vcl_session_t *session = 0;
2145   int rv, i;
2146
2147   ASSERT (sizeof (clib_bitmap_t) == sizeof (long int));
2148
2149   if (n_bits && read_map)
2150     {
2151       clib_bitmap_validate (wrk->rd_bitmap, minbits);
2152       clib_memcpy_fast (wrk->rd_bitmap, read_map,
2153                         vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
2154       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
2155     }
2156   if (n_bits && write_map)
2157     {
2158       clib_bitmap_validate (wrk->wr_bitmap, minbits);
2159       clib_memcpy_fast (wrk->wr_bitmap, write_map,
2160                         vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
2161       memset (write_map, 0,
2162               vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
2163     }
2164   if (n_bits && except_map)
2165     {
2166       clib_bitmap_validate (wrk->ex_bitmap, minbits);
2167       clib_memcpy_fast (wrk->ex_bitmap, except_map,
2168                         vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
2169       memset (except_map, 0,
2170               vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
2171     }
2172
2173   if (!n_bits)
2174     return 0;
2175
2176   if (!write_map)
2177     goto check_rd;
2178
2179   /* *INDENT-OFF* */
2180   clib_bitmap_foreach (sid, wrk->wr_bitmap, ({
2181     if (!(session = vcl_session_get (wrk, sid)))
2182       {
2183         if (except_map && sid < minbits)
2184           clib_bitmap_set_no_check (except_map, sid, 1);
2185         continue;
2186       }
2187
2188     rv = svm_fifo_is_full (session->tx_fifo);
2189     if (!rv)
2190       {
2191         clib_bitmap_set_no_check (write_map, sid, 1);
2192         bits_set++;
2193       }
2194   }));
2195
2196 check_rd:
2197   if (!read_map)
2198     goto check_mq;
2199
2200   clib_bitmap_foreach (sid, wrk->rd_bitmap, ({
2201     if (!(session = vcl_session_get (wrk, sid)))
2202       {
2203         if (except_map && sid < minbits)
2204           clib_bitmap_set_no_check (except_map, sid, 1);
2205         continue;
2206       }
2207
2208     rv = vppcom_session_read_ready (session);
2209     if (rv)
2210       {
2211         clib_bitmap_set_no_check (read_map, sid, 1);
2212         bits_set++;
2213       }
2214   }));
2215   /* *INDENT-ON* */
2216
2217 check_mq:
2218
2219   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2220     {
2221       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
2222                                   read_map, write_map, except_map, &bits_set);
2223     }
2224   vec_reset_length (wrk->unhandled_evts_vector);
2225
2226   if (vcm->cfg.use_mq_eventfd)
2227     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
2228                            time_to_wait, &bits_set);
2229   else
2230     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
2231                            time_to_wait, &bits_set);
2232
2233   return (bits_set);
2234 }
2235
2236 static inline void
2237 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_idx)
2238 {
2239   vcl_session_t *session;
2240   vppcom_epoll_t *vep;
2241   u32 sid = vep_idx;
2242
2243   if (VPPCOM_DEBUG <= 1)
2244     return;
2245
2246   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
2247   session = vcl_session_get (wrk, vep_idx);
2248   if (PREDICT_FALSE (!session))
2249     {
2250       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!",
2251                     getpid (), vep_idx);
2252       goto done;
2253     }
2254   if (PREDICT_FALSE (!session->is_vep))
2255     {
2256       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2257                     getpid (), vep_idx);
2258       goto done;
2259     }
2260   vep = &session->vep;
2261   clib_warning ("VCL<%d>: vep_idx (%u): Dumping epoll chain\n"
2262                 "{\n"
2263                 "   is_vep         = %u\n"
2264                 "   is_vep_session = %u\n"
2265                 "   next_sid       = 0x%x (%u)\n"
2266                 "   wait_cont_idx  = 0x%x (%u)\n"
2267                 "}\n", getpid (), vep_idx,
2268                 session->is_vep, session->is_vep_session,
2269                 vep->next_sh, vep->next_sh,
2270                 session->wait_cont_idx, session->wait_cont_idx);
2271
2272   for (sid = vep->next_sh; sid != ~0; sid = vep->next_sh)
2273     {
2274       session = vcl_session_get (wrk, sid);
2275       if (PREDICT_FALSE (!session))
2276         {
2277           clib_warning ("VCL<%d>: ERROR: Invalid sid (%u)!", getpid (), sid);
2278           goto done;
2279         }
2280       if (PREDICT_FALSE (session->is_vep))
2281         clib_warning ("VCL<%d>: ERROR: sid (%u) is a vep!",
2282                       getpid (), vep_idx);
2283       else if (PREDICT_FALSE (!session->is_vep_session))
2284         {
2285           clib_warning ("VCL<%d>: ERROR: session (%u) "
2286                         "is not a vep session!", getpid (), sid);
2287           goto done;
2288         }
2289       vep = &session->vep;
2290       if (PREDICT_FALSE (vep->vep_sh != vep_idx))
2291         clib_warning ("VCL<%d>: ERROR: session (%u) vep_idx (%u) != "
2292                       "vep_idx (%u)!", getpid (),
2293                       sid, session->vep.vep_sh, vep_idx);
2294       if (session->is_vep_session)
2295         {
2296           clib_warning ("vep_idx[%u]: sid 0x%x (%u)\n"
2297                         "{\n"
2298                         "   next_sid       = 0x%x (%u)\n"
2299                         "   prev_sid       = 0x%x (%u)\n"
2300                         "   vep_idx        = 0x%x (%u)\n"
2301                         "   ev.events      = 0x%x\n"
2302                         "   ev.data.u64    = 0x%llx\n"
2303                         "   et_mask        = 0x%x\n"
2304                         "}\n",
2305                         vep_idx, sid, sid,
2306                         vep->next_sh, vep->next_sh,
2307                         vep->prev_sh, vep->prev_sh,
2308                         vep->vep_sh, vep->vep_sh,
2309                         vep->ev.events, vep->ev.data.u64, vep->et_mask);
2310         }
2311     }
2312
2313 done:
2314   clib_warning ("VCL<%d>: vep_idx (%u): Dump complete!\n",
2315                 getpid (), vep_idx);
2316 }
2317
2318 int
2319 vppcom_epoll_create (void)
2320 {
2321   vcl_worker_t *wrk = vcl_worker_get_current ();
2322   vcl_session_t *vep_session;
2323
2324   vep_session = vcl_session_alloc (wrk);
2325
2326   vep_session->is_vep = 1;
2327   vep_session->vep.vep_sh = ~0;
2328   vep_session->vep.next_sh = ~0;
2329   vep_session->vep.prev_sh = ~0;
2330   vep_session->wait_cont_idx = ~0;
2331   vep_session->vpp_handle = ~0;
2332
2333   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_sh);
2334   VDBG (0, "VCL<%d>: Created vep_idx %u / sid %u!",
2335         getpid (), vep_session->session_index, vep_session->session_index);
2336
2337   return vcl_session_handle (vep_session);
2338 }
2339
2340 int
2341 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2342                   struct epoll_event *event)
2343 {
2344   vcl_worker_t *wrk = vcl_worker_get_current ();
2345   vcl_session_t *vep_session;
2346   vcl_session_t *session;
2347   int rv = VPPCOM_OK;
2348
2349   if (vep_handle == session_handle)
2350     {
2351       clib_warning ("VCL<%d>: ERROR: vep_idx == session_index (%u)!",
2352                     getpid (), vep_handle);
2353       return VPPCOM_EINVAL;
2354     }
2355
2356   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2357   if (PREDICT_FALSE (!vep_session))
2358     {
2359       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!", vep_handle);
2360       return VPPCOM_EBADFD;
2361     }
2362   if (PREDICT_FALSE (!vep_session->is_vep))
2363     {
2364       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2365                     getpid (), vep_handle);
2366       return VPPCOM_EINVAL;
2367     }
2368
2369   ASSERT (vep_session->vep.vep_sh == ~0);
2370   ASSERT (vep_session->vep.prev_sh == ~0);
2371
2372   session = vcl_session_get_w_handle (wrk, session_handle);
2373   if (PREDICT_FALSE (!session))
2374     {
2375       VDBG (0, "VCL<%d>: ERROR: Invalid session_handle (%u)!",
2376             getpid (), session_handle);
2377       return VPPCOM_EBADFD;
2378     }
2379   if (PREDICT_FALSE (session->is_vep))
2380     {
2381       clib_warning ("ERROR: session_handle (%u) is a vep!", vep_handle);
2382       return VPPCOM_EINVAL;
2383     }
2384
2385   switch (op)
2386     {
2387     case EPOLL_CTL_ADD:
2388       if (PREDICT_FALSE (!event))
2389         {
2390           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: NULL pointer to "
2391                         "epoll_event structure!", getpid ());
2392           return VPPCOM_EINVAL;
2393         }
2394       if (vep_session->vep.next_sh != ~0)
2395         {
2396           vcl_session_t *next_session;
2397           next_session = vcl_session_get_w_handle (wrk,
2398                                                    vep_session->vep.next_sh);
2399           if (PREDICT_FALSE (!next_session))
2400             {
2401               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: Invalid "
2402                             "vep.next_sid (%u) on vep_idx (%u)!",
2403                             getpid (), vep_session->vep.next_sh, vep_handle);
2404               return VPPCOM_EBADFD;
2405             }
2406           ASSERT (next_session->vep.prev_sh == vep_handle);
2407           next_session->vep.prev_sh = session_handle;
2408         }
2409       session->vep.next_sh = vep_session->vep.next_sh;
2410       session->vep.prev_sh = vep_handle;
2411       session->vep.vep_sh = vep_handle;
2412       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2413       session->vep.ev = *event;
2414       session->is_vep = 0;
2415       session->is_vep_session = 1;
2416       vep_session->vep.next_sh = session_handle;
2417
2418       VDBG (1, "VCL<%d>: EPOLL_CTL_ADD: vep_idx %u, sid %u, events 0x%x, "
2419             "data 0x%llx!", getpid (), vep_handle, session_handle,
2420             event->events, event->data.u64);
2421       vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
2422       break;
2423
2424     case EPOLL_CTL_MOD:
2425       if (PREDICT_FALSE (!event))
2426         {
2427           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_MOD: NULL pointer to "
2428                         "epoll_event structure!", getpid ());
2429           rv = VPPCOM_EINVAL;
2430           goto done;
2431         }
2432       else if (PREDICT_FALSE (!session->is_vep_session))
2433         {
2434           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2435                         "not a vep session!", getpid (), session_handle);
2436           rv = VPPCOM_EINVAL;
2437           goto done;
2438         }
2439       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2440         {
2441           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2442                         "vep_idx (%u) != vep_idx (%u)!",
2443                         getpid (), session_handle,
2444                         session->vep.vep_sh, vep_handle);
2445           rv = VPPCOM_EINVAL;
2446           goto done;
2447         }
2448       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2449       session->vep.ev = *event;
2450       VDBG (1, "VCL<%d>: EPOLL_CTL_MOD: vep_idx %u, sid %u, events 0x%x,"
2451             " data 0x%llx!", getpid (), vep_handle, session_handle,
2452             event->events, event->data.u64);
2453       break;
2454
2455     case EPOLL_CTL_DEL:
2456       if (PREDICT_FALSE (!session->is_vep_session))
2457         {
2458           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2459                         "not a vep session!", getpid (), session_handle);
2460           rv = VPPCOM_EINVAL;
2461           goto done;
2462         }
2463       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2464         {
2465           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2466                         "vep_idx (%u) != vep_idx (%u)!",
2467                         getpid (), session_handle,
2468                         session->vep.vep_sh, vep_handle);
2469           rv = VPPCOM_EINVAL;
2470           goto done;
2471         }
2472
2473       vep_session->wait_cont_idx =
2474         (vep_session->wait_cont_idx == session_handle) ?
2475         session->vep.next_sh : vep_session->wait_cont_idx;
2476
2477       if (session->vep.prev_sh == vep_handle)
2478         vep_session->vep.next_sh = session->vep.next_sh;
2479       else
2480         {
2481           vcl_session_t *prev_session;
2482           prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh);
2483           if (PREDICT_FALSE (!prev_session))
2484             {
2485               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2486                             "vep.prev_sid (%u) on sid (%u)!",
2487                             getpid (), session->vep.prev_sh, session_handle);
2488               return VPPCOM_EBADFD;
2489             }
2490           ASSERT (prev_session->vep.next_sh == session_handle);
2491           prev_session->vep.next_sh = session->vep.next_sh;
2492         }
2493       if (session->vep.next_sh != ~0)
2494         {
2495           vcl_session_t *next_session;
2496           next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh);
2497           if (PREDICT_FALSE (!next_session))
2498             {
2499               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2500                             "vep.next_sid (%u) on sid (%u)!",
2501                             getpid (), session->vep.next_sh, session_handle);
2502               return VPPCOM_EBADFD;
2503             }
2504           ASSERT (next_session->vep.prev_sh == session_handle);
2505           next_session->vep.prev_sh = session->vep.prev_sh;
2506         }
2507
2508       memset (&session->vep, 0, sizeof (session->vep));
2509       session->vep.next_sh = ~0;
2510       session->vep.prev_sh = ~0;
2511       session->vep.vep_sh = ~0;
2512       session->is_vep_session = 0;
2513       VDBG (1, "VCL<%d>: EPOLL_CTL_DEL: vep_idx %u, sid %u!",
2514             getpid (), vep_handle, session_handle);
2515       vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
2516       break;
2517
2518     default:
2519       clib_warning ("VCL<%d>: ERROR: Invalid operation (%d)!", getpid (), op);
2520       rv = VPPCOM_EINVAL;
2521     }
2522
2523   vep_verify_epoll_chain (wrk, vep_handle);
2524
2525 done:
2526   return rv;
2527 }
2528
2529 static inline void
2530 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2531                                 struct epoll_event *events, u32 * num_ev)
2532 {
2533   session_disconnected_msg_t *disconnected_msg;
2534   session_connected_msg_t *connected_msg;
2535   u32 sid = ~0, session_events;
2536   u64 session_evt_data = ~0;
2537   vcl_session_t *session;
2538   u8 add_event = 0;
2539
2540   switch (e->event_type)
2541     {
2542     case FIFO_EVENT_APP_RX:
2543       ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
2544       vcl_fifo_rx_evt_valid_or_break (e->fifo);
2545       sid = e->fifo->client_session_index;
2546       if (!(session = vcl_session_get (wrk, sid)))
2547         break;
2548       session_events = session->vep.ev.events;
2549       if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
2550         break;
2551       add_event = 1;
2552       events[*num_ev].events |= EPOLLIN;
2553       session_evt_data = session->vep.ev.data.u64;
2554       session->has_rx_evt = 1;
2555       break;
2556     case FIFO_EVENT_APP_TX:
2557       sid = e->fifo->client_session_index;
2558       if (!(session = vcl_session_get (wrk, sid)))
2559         break;
2560       session_events = session->vep.ev.events;
2561       if (!(EPOLLOUT & session_events))
2562         break;
2563       add_event = 1;
2564       events[*num_ev].events |= EPOLLOUT;
2565       session_evt_data = session->vep.ev.data.u64;
2566       break;
2567     case SESSION_IO_EVT_CT_TX:
2568       vcl_fifo_rx_evt_valid_or_break (e->fifo);
2569       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
2570       if (PREDICT_FALSE (!session))
2571         break;
2572       sid = session->session_index;
2573       session_events = session->vep.ev.events;
2574       if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
2575         break;
2576       add_event = 1;
2577       events[*num_ev].events |= EPOLLIN;
2578       session_evt_data = session->vep.ev.data.u64;
2579       session->has_rx_evt = 1;
2580       break;
2581     case SESSION_IO_EVT_CT_RX:
2582       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
2583       if (PREDICT_FALSE (!session))
2584         break;
2585       sid = session->session_index;
2586       session_events = session->vep.ev.events;
2587       if (!(EPOLLOUT & session_events))
2588         break;
2589       add_event = 1;
2590       events[*num_ev].events |= EPOLLOUT;
2591       session_evt_data = session->vep.ev.data.u64;
2592       break;
2593     case SESSION_CTRL_EVT_ACCEPTED:
2594       session = vcl_session_accepted (wrk,
2595                                       (session_accepted_msg_t *) e->data);
2596       if (!session)
2597         break;
2598
2599       session_events = session->vep.ev.events;
2600       if (!(EPOLLIN & session_events))
2601         break;
2602
2603       add_event = 1;
2604       events[*num_ev].events |= EPOLLIN;
2605       session_evt_data = session->vep.ev.data.u64;
2606       break;
2607     case SESSION_CTRL_EVT_CONNECTED:
2608       connected_msg = (session_connected_msg_t *) e->data;
2609       vcl_session_connected_handler (wrk, connected_msg);
2610       /* Generate EPOLLOUT because there's no connected event */
2611       sid = vcl_session_index_from_vpp_handle (wrk, connected_msg->handle);
2612       if (!(session = vcl_session_get (wrk, sid)))
2613         break;
2614       session_events = session->vep.ev.events;
2615       if (EPOLLOUT & session_events)
2616         {
2617           add_event = 1;
2618           events[*num_ev].events |= EPOLLOUT;
2619           session_evt_data = session->vep.ev.data.u64;
2620         }
2621       break;
2622     case SESSION_CTRL_EVT_DISCONNECTED:
2623       disconnected_msg = (session_disconnected_msg_t *) e->data;
2624       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
2625       if (!session)
2626         break;
2627       add_event = 1;
2628       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2629       session_evt_data = session->vep.ev.data.u64;
2630       session_events = session->vep.ev.events;
2631       break;
2632     case SESSION_CTRL_EVT_RESET:
2633       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2634       if (!(session = vcl_session_get (wrk, sid)))
2635         break;
2636       add_event = 1;
2637       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2638       session_evt_data = session->vep.ev.data.u64;
2639       session_events = session->vep.ev.events;
2640       break;
2641     default:
2642       VDBG (0, "unhandled: %u", e->event_type);
2643       break;
2644     }
2645
2646   if (add_event)
2647     {
2648       events[*num_ev].data.u64 = session_evt_data;
2649       if (EPOLLONESHOT & session_events)
2650         {
2651           session = vcl_session_get (wrk, sid);
2652           session->vep.ev.events = 0;
2653         }
2654       *num_ev += 1;
2655     }
2656 }
2657
2658 static int
2659 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2660                           struct epoll_event *events, u32 maxevents,
2661                           double wait_for_time, u32 * num_ev)
2662 {
2663   svm_msg_q_msg_t *msg;
2664   session_event_t *e;
2665   int i;
2666
2667   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
2668     goto handle_dequeued;
2669
2670   svm_msg_q_lock (mq);
2671   if (svm_msg_q_is_empty (mq))
2672     {
2673       if (!wait_for_time)
2674         {
2675           svm_msg_q_unlock (mq);
2676           return 0;
2677         }
2678       else if (wait_for_time < 0)
2679         {
2680           svm_msg_q_wait (mq);
2681         }
2682       else
2683         {
2684           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2685             {
2686               svm_msg_q_unlock (mq);
2687               return 0;
2688             }
2689         }
2690     }
2691   vcl_mq_dequeue_batch (wrk, mq);
2692   svm_msg_q_unlock (mq);
2693
2694 handle_dequeued:
2695   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2696     {
2697       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2698       e = svm_msg_q_msg_data (mq, msg);
2699       if (*num_ev < maxevents)
2700         vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
2701       else
2702         vec_add1 (wrk->unhandled_evts_vector, *e);
2703       svm_msg_q_free_msg (mq, msg);
2704     }
2705   vec_reset_length (wrk->mq_msg_vector);
2706
2707   return *num_ev;
2708 }
2709
2710 static int
2711 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
2712                            int maxevents, u32 n_evts, double wait_for_time)
2713 {
2714   vcl_cut_through_registration_t *cr;
2715   double total_wait = 0, wait_slice;
2716   int rv;
2717
2718   wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
2719   wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time;
2720
2721   do
2722     {
2723       vcl_ct_registration_lock (wrk);
2724       /* *INDENT-OFF* */
2725       pool_foreach (cr, wrk->cut_through_registrations, ({
2726         vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &n_evts);
2727       }));
2728       /* *INDENT-ON* */
2729       vcl_ct_registration_unlock (wrk);
2730
2731       rv = vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events,
2732                                      maxevents, n_evts ? 0 : wait_slice,
2733                                      &n_evts);
2734       if (rv)
2735         total_wait += wait_slice;
2736       if (n_evts)
2737         return n_evts;
2738     }
2739   while (total_wait < wait_for_time);
2740   return n_evts;
2741 }
2742
2743 static int
2744 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
2745                            int maxevents, u32 n_evts, double wait_for_time)
2746 {
2747   vcl_mq_evt_conn_t *mqc;
2748   int __clib_unused n_read;
2749   int n_mq_evts, i;
2750   u64 buf;
2751
2752   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2753 again:
2754   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2755                           vec_len (wrk->mq_events), wait_for_time);
2756   for (i = 0; i < n_mq_evts; i++)
2757     {
2758       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2759       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2760       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
2761     }
2762   if (!n_evts && n_mq_evts > 0)
2763     goto again;
2764
2765   return (int) n_evts;
2766 }
2767
2768 int
2769 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
2770                    int maxevents, double wait_for_time)
2771 {
2772   vcl_worker_t *wrk = vcl_worker_get_current ();
2773   vcl_session_t *vep_session;
2774   u32 n_evts = 0;
2775   int i;
2776
2777   if (PREDICT_FALSE (maxevents <= 0))
2778     {
2779       clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
2780                     getpid (), maxevents);
2781       return VPPCOM_EINVAL;
2782     }
2783
2784   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2785   if (!vep_session)
2786     return VPPCOM_EBADFD;
2787
2788   if (PREDICT_FALSE (!vep_session->is_vep))
2789     {
2790       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2791                     getpid (), vep_handle);
2792       return VPPCOM_EINVAL;
2793     }
2794
2795   memset (events, 0, sizeof (*events) * maxevents);
2796
2797   if (vec_len (wrk->unhandled_evts_vector))
2798     {
2799       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2800         {
2801           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
2802                                           events, &n_evts);
2803           if (n_evts == maxevents)
2804             {
2805               i += 1;
2806               break;
2807             }
2808         }
2809
2810       vec_delete (wrk->unhandled_evts_vector, i, 0);
2811     }
2812
2813   if (vcm->cfg.use_mq_eventfd)
2814     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
2815                                       wait_for_time);
2816
2817   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
2818                                     wait_for_time);
2819 }
2820
2821 int
2822 vppcom_session_attr (uint32_t session_handle, uint32_t op,
2823                      void *buffer, uint32_t * buflen)
2824 {
2825   vcl_worker_t *wrk = vcl_worker_get_current ();
2826   vcl_session_t *session;
2827   int rv = VPPCOM_OK;
2828   u32 *flags = buffer;
2829   vppcom_endpt_t *ep = buffer;
2830
2831   session = vcl_session_get_w_handle (wrk, session_handle);
2832   if (!session)
2833     return VPPCOM_EBADFD;
2834
2835   switch (op)
2836     {
2837     case VPPCOM_ATTR_GET_NREAD:
2838       rv = vppcom_session_read_ready (session);
2839       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d",
2840             getpid (), rv);
2841       break;
2842
2843     case VPPCOM_ATTR_GET_NWRITE:
2844       rv = vppcom_session_write_ready (session);
2845       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
2846             getpid (), session_handle, rv);
2847       break;
2848
2849     case VPPCOM_ATTR_GET_FLAGS:
2850       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
2851         {
2852           *flags = O_RDWR | (VCL_SESS_ATTR_TEST (session->attr,
2853                                                  VCL_SESS_ATTR_NONBLOCK));
2854           *buflen = sizeof (*flags);
2855           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_FLAGS: sid %u, flags = 0x%08x, "
2856                 "is_nonblocking = %u", getpid (),
2857                 session_handle, *flags,
2858                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2859         }
2860       else
2861         rv = VPPCOM_EINVAL;
2862       break;
2863
2864     case VPPCOM_ATTR_SET_FLAGS:
2865       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
2866         {
2867           if (*flags & O_NONBLOCK)
2868             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
2869           else
2870             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
2871
2872           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_FLAGS: sid %u, flags = 0x%08x,"
2873                 " is_nonblocking = %u",
2874                 getpid (), session_handle, *flags,
2875                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2876         }
2877       else
2878         rv = VPPCOM_EINVAL;
2879       break;
2880
2881     case VPPCOM_ATTR_GET_PEER_ADDR:
2882       if (PREDICT_TRUE (buffer && buflen &&
2883                         (*buflen >= sizeof (*ep)) && ep->ip))
2884         {
2885           ep->is_ip4 = session->transport.is_ip4;
2886           ep->port = session->transport.rmt_port;
2887           if (session->transport.is_ip4)
2888             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
2889                               sizeof (ip4_address_t));
2890           else
2891             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
2892                               sizeof (ip6_address_t));
2893           *buflen = sizeof (*ep);
2894           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_PEER_ADDR: sid %u, is_ip4 = %u, "
2895                 "addr = %U, port %u", getpid (),
2896                 session_handle, ep->is_ip4, format_ip46_address,
2897                 &session->transport.rmt_ip,
2898                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2899                 clib_net_to_host_u16 (ep->port));
2900         }
2901       else
2902         rv = VPPCOM_EINVAL;
2903       break;
2904
2905     case VPPCOM_ATTR_GET_LCL_ADDR:
2906       if (PREDICT_TRUE (buffer && buflen &&
2907                         (*buflen >= sizeof (*ep)) && ep->ip))
2908         {
2909           ep->is_ip4 = session->transport.is_ip4;
2910           ep->port = session->transport.lcl_port;
2911           if (session->transport.is_ip4)
2912             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip4,
2913                               sizeof (ip4_address_t));
2914           else
2915             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip6,
2916                               sizeof (ip6_address_t));
2917           *buflen = sizeof (*ep);
2918           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_LCL_ADDR: sid %u, is_ip4 = %u,"
2919                 " addr = %U port %d", getpid (),
2920                 session_handle, ep->is_ip4, format_ip46_address,
2921                 &session->transport.lcl_ip,
2922                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2923                 clib_net_to_host_u16 (ep->port));
2924         }
2925       else
2926         rv = VPPCOM_EINVAL;
2927       break;
2928
2929     case VPPCOM_ATTR_GET_LIBC_EPFD:
2930       rv = session->libc_epfd;
2931       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d",
2932             getpid (), rv);
2933       break;
2934
2935     case VPPCOM_ATTR_SET_LIBC_EPFD:
2936       if (PREDICT_TRUE (buffer && buflen &&
2937                         (*buflen == sizeof (session->libc_epfd))))
2938         {
2939           session->libc_epfd = *(int *) buffer;
2940           *buflen = sizeof (session->libc_epfd);
2941
2942           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, "
2943                 "buflen %d", getpid (), session->libc_epfd, *buflen);
2944         }
2945       else
2946         rv = VPPCOM_EINVAL;
2947       break;
2948
2949     case VPPCOM_ATTR_GET_PROTOCOL:
2950       if (buffer && buflen && (*buflen >= sizeof (int)))
2951         {
2952           *(int *) buffer = session->session_type;
2953           *buflen = sizeof (int);
2954
2955           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
2956                 getpid (), *(int *) buffer, *(int *) buffer ? "UDP" : "TCP",
2957                 *buflen);
2958         }
2959       else
2960         rv = VPPCOM_EINVAL;
2961       break;
2962
2963     case VPPCOM_ATTR_GET_LISTEN:
2964       if (buffer && buflen && (*buflen >= sizeof (int)))
2965         {
2966           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2967                                                 VCL_SESS_ATTR_LISTEN);
2968           *buflen = sizeof (int);
2969
2970           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LISTEN: %d, buflen %d",
2971                 getpid (), *(int *) buffer, *buflen);
2972         }
2973       else
2974         rv = VPPCOM_EINVAL;
2975       break;
2976
2977     case VPPCOM_ATTR_GET_ERROR:
2978       if (buffer && buflen && (*buflen >= sizeof (int)))
2979         {
2980           *(int *) buffer = 0;
2981           *buflen = sizeof (int);
2982
2983           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
2984                 getpid (), *(int *) buffer, *buflen);
2985         }
2986       else
2987         rv = VPPCOM_EINVAL;
2988       break;
2989
2990     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
2991       if (buffer && buflen && (*buflen >= sizeof (u32)))
2992         {
2993
2994           /* VPP-TBD */
2995           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
2996                                 session->tx_fifo ? session->tx_fifo->nitems :
2997                                 vcm->cfg.tx_fifo_size);
2998           *buflen = sizeof (u32);
2999
3000           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), "
3001                 "buflen %d, #VPP-TBD#", getpid (),
3002                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
3003         }
3004       else
3005         rv = VPPCOM_EINVAL;
3006       break;
3007
3008     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
3009       if (buffer && buflen && (*buflen == sizeof (u32)))
3010         {
3011           /* VPP-TBD */
3012           session->sndbuf_size = *(u32 *) buffer;
3013           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), "
3014                 "buflen %d, #VPP-TBD#", getpid (),
3015                 session->sndbuf_size, session->sndbuf_size, *buflen);
3016         }
3017       else
3018         rv = VPPCOM_EINVAL;
3019       break;
3020
3021     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
3022       if (buffer && buflen && (*buflen >= sizeof (u32)))
3023         {
3024
3025           /* VPP-TBD */
3026           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
3027                                 session->rx_fifo ? session->rx_fifo->nitems :
3028                                 vcm->cfg.rx_fifo_size);
3029           *buflen = sizeof (u32);
3030
3031           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), "
3032                 "buflen %d, #VPP-TBD#", getpid (),
3033                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
3034         }
3035       else
3036         rv = VPPCOM_EINVAL;
3037       break;
3038
3039     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
3040       if (buffer && buflen && (*buflen == sizeof (u32)))
3041         {
3042           /* VPP-TBD */
3043           session->rcvbuf_size = *(u32 *) buffer;
3044           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), "
3045                 "buflen %d, #VPP-TBD#", getpid (),
3046                 session->sndbuf_size, session->sndbuf_size, *buflen);
3047         }
3048       else
3049         rv = VPPCOM_EINVAL;
3050       break;
3051
3052     case VPPCOM_ATTR_GET_REUSEADDR:
3053       if (buffer && buflen && (*buflen >= sizeof (int)))
3054         {
3055           /* VPP-TBD */
3056           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3057                                                 VCL_SESS_ATTR_REUSEADDR);
3058           *buflen = sizeof (int);
3059
3060           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEADDR: %d, "
3061                 "buflen %d, #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3062         }
3063       else
3064         rv = VPPCOM_EINVAL;
3065       break;
3066
3067     case VPPCOM_ATTR_SET_REUSEADDR:
3068       if (buffer && buflen && (*buflen == sizeof (int)) &&
3069           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
3070         {
3071           /* VPP-TBD */
3072           if (*(int *) buffer)
3073             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEADDR);
3074           else
3075             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEADDR);
3076
3077           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d,"
3078                 " #VPP-TBD#", getpid (),
3079                 VCL_SESS_ATTR_TEST (session->attr,
3080                                     VCL_SESS_ATTR_REUSEADDR), *buflen);
3081         }
3082       else
3083         rv = VPPCOM_EINVAL;
3084       break;
3085
3086     case VPPCOM_ATTR_GET_REUSEPORT:
3087       if (buffer && buflen && (*buflen >= sizeof (int)))
3088         {
3089           /* VPP-TBD */
3090           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3091                                                 VCL_SESS_ATTR_REUSEPORT);
3092           *buflen = sizeof (int);
3093
3094           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d,"
3095                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3096         }
3097       else
3098         rv = VPPCOM_EINVAL;
3099       break;
3100
3101     case VPPCOM_ATTR_SET_REUSEPORT:
3102       if (buffer && buflen && (*buflen == sizeof (int)) &&
3103           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
3104         {
3105           /* VPP-TBD */
3106           if (*(int *) buffer)
3107             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEPORT);
3108           else
3109             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEPORT);
3110
3111           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d,"
3112                 " #VPP-TBD#", getpid (),
3113                 VCL_SESS_ATTR_TEST (session->attr,
3114                                     VCL_SESS_ATTR_REUSEPORT), *buflen);
3115         }
3116       else
3117         rv = VPPCOM_EINVAL;
3118       break;
3119
3120     case VPPCOM_ATTR_GET_BROADCAST:
3121       if (buffer && buflen && (*buflen >= sizeof (int)))
3122         {
3123           /* VPP-TBD */
3124           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3125                                                 VCL_SESS_ATTR_BROADCAST);
3126           *buflen = sizeof (int);
3127
3128           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d,"
3129                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3130         }
3131       else
3132         rv = VPPCOM_EINVAL;
3133       break;
3134
3135     case VPPCOM_ATTR_SET_BROADCAST:
3136       if (buffer && buflen && (*buflen == sizeof (int)))
3137         {
3138           /* VPP-TBD */
3139           if (*(int *) buffer)
3140             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_BROADCAST);
3141           else
3142             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_BROADCAST);
3143
3144           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, "
3145                 "#VPP-TBD#", getpid (),
3146                 VCL_SESS_ATTR_TEST (session->attr,
3147                                     VCL_SESS_ATTR_BROADCAST), *buflen);
3148         }
3149       else
3150         rv = VPPCOM_EINVAL;
3151       break;
3152
3153     case VPPCOM_ATTR_GET_V6ONLY:
3154       if (buffer && buflen && (*buflen >= sizeof (int)))
3155         {
3156           /* VPP-TBD */
3157           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3158                                                 VCL_SESS_ATTR_V6ONLY);
3159           *buflen = sizeof (int);
3160
3161           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, "
3162                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3163         }
3164       else
3165         rv = VPPCOM_EINVAL;
3166       break;
3167
3168     case VPPCOM_ATTR_SET_V6ONLY:
3169       if (buffer && buflen && (*buflen == sizeof (int)))
3170         {
3171           /* VPP-TBD */
3172           if (*(int *) buffer)
3173             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_V6ONLY);
3174           else
3175             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_V6ONLY);
3176
3177           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, "
3178                 "#VPP-TBD#", getpid (),
3179                 VCL_SESS_ATTR_TEST (session->attr,
3180                                     VCL_SESS_ATTR_V6ONLY), *buflen);
3181         }
3182       else
3183         rv = VPPCOM_EINVAL;
3184       break;
3185
3186     case VPPCOM_ATTR_GET_KEEPALIVE:
3187       if (buffer && buflen && (*buflen >= sizeof (int)))
3188         {
3189           /* VPP-TBD */
3190           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3191                                                 VCL_SESS_ATTR_KEEPALIVE);
3192           *buflen = sizeof (int);
3193
3194           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, "
3195                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3196         }
3197       else
3198         rv = VPPCOM_EINVAL;
3199       break;
3200
3201     case VPPCOM_ATTR_SET_KEEPALIVE:
3202       if (buffer && buflen && (*buflen == sizeof (int)))
3203         {
3204           /* VPP-TBD */
3205           if (*(int *) buffer)
3206             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3207           else
3208             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3209
3210           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, "
3211                 "#VPP-TBD#", getpid (),
3212                 VCL_SESS_ATTR_TEST (session->attr,
3213                                     VCL_SESS_ATTR_KEEPALIVE), *buflen);
3214         }
3215       else
3216         rv = VPPCOM_EINVAL;
3217       break;
3218
3219     case VPPCOM_ATTR_GET_TCP_NODELAY:
3220       if (buffer && buflen && (*buflen >= sizeof (int)))
3221         {
3222           /* VPP-TBD */
3223           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3224                                                 VCL_SESS_ATTR_TCP_NODELAY);
3225           *buflen = sizeof (int);
3226
3227           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, "
3228                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3229         }
3230       else
3231         rv = VPPCOM_EINVAL;
3232       break;
3233
3234     case VPPCOM_ATTR_SET_TCP_NODELAY:
3235       if (buffer && buflen && (*buflen == sizeof (int)))
3236         {
3237           /* VPP-TBD */
3238           if (*(int *) buffer)
3239             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3240           else
3241             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3242
3243           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, "
3244                 "#VPP-TBD#", getpid (),
3245                 VCL_SESS_ATTR_TEST (session->attr,
3246                                     VCL_SESS_ATTR_TCP_NODELAY), *buflen);
3247         }
3248       else
3249         rv = VPPCOM_EINVAL;
3250       break;
3251
3252     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3253       if (buffer && buflen && (*buflen >= sizeof (int)))
3254         {
3255           /* VPP-TBD */
3256           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3257                                                 VCL_SESS_ATTR_TCP_KEEPIDLE);
3258           *buflen = sizeof (int);
3259
3260           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, "
3261                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3262         }
3263       else
3264         rv = VPPCOM_EINVAL;
3265       break;
3266
3267     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3268       if (buffer && buflen && (*buflen == sizeof (int)))
3269         {
3270           /* VPP-TBD */
3271           if (*(int *) buffer)
3272             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3273           else
3274             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3275
3276           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, "
3277                 "#VPP-TBD#", getpid (),
3278                 VCL_SESS_ATTR_TEST (session->attr,
3279                                     VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3280         }
3281       else
3282         rv = VPPCOM_EINVAL;
3283       break;
3284
3285     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3286       if (buffer && buflen && (*buflen >= sizeof (int)))
3287         {
3288           /* VPP-TBD */
3289           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3290                                                 VCL_SESS_ATTR_TCP_KEEPINTVL);
3291           *buflen = sizeof (int);
3292
3293           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, "
3294                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3295         }
3296       else
3297         rv = VPPCOM_EINVAL;
3298       break;
3299
3300     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3301       if (buffer && buflen && (*buflen == sizeof (int)))
3302         {
3303           /* VPP-TBD */
3304           if (*(int *) buffer)
3305             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3306           else
3307             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3308
3309           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, "
3310                 "#VPP-TBD#", getpid (),
3311                 VCL_SESS_ATTR_TEST (session->attr,
3312                                     VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3313         }
3314       else
3315         rv = VPPCOM_EINVAL;
3316       break;
3317
3318     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3319       if (buffer && buflen && (*buflen >= sizeof (u32)))
3320         {
3321           /* VPP-TBD */
3322           *(u32 *) buffer = session->user_mss;
3323           *buflen = sizeof (int);
3324
3325           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d,"
3326                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3327         }
3328       else
3329         rv = VPPCOM_EINVAL;
3330       break;
3331
3332     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3333       if (buffer && buflen && (*buflen == sizeof (u32)))
3334         {
3335           /* VPP-TBD */
3336           session->user_mss = *(u32 *) buffer;
3337
3338           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, "
3339                 "#VPP-TBD#", getpid (), session->user_mss, *buflen);
3340         }
3341       else
3342         rv = VPPCOM_EINVAL;
3343       break;
3344
3345     case VPPCOM_ATTR_GET_REFCNT:
3346       rv = vcl_session_get_refcnt (session);
3347       break;
3348
3349     default:
3350       rv = VPPCOM_EINVAL;
3351       break;
3352     }
3353
3354   return rv;
3355 }
3356
3357 int
3358 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3359                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3360 {
3361   vcl_worker_t *wrk = vcl_worker_get_current ();
3362   int rv = VPPCOM_OK;
3363   vcl_session_t *session = 0;
3364
3365   if (ep)
3366     {
3367       session = vcl_session_get_w_handle (wrk, session_handle);
3368       if (PREDICT_FALSE (!session))
3369         {
3370           VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
3371                 getpid (), session_handle);
3372           return VPPCOM_EBADFD;
3373         }
3374       ep->is_ip4 = session->transport.is_ip4;
3375       ep->port = session->transport.rmt_port;
3376     }
3377
3378   if (flags == 0)
3379     rv = vppcom_session_read (session_handle, buffer, buflen);
3380   else if (flags & MSG_PEEK)
3381     rv = vppcom_session_peek (session_handle, buffer, buflen);
3382   else
3383     {
3384       clib_warning ("VCL<%d>: Unsupport flags for recvfrom %d",
3385                     getpid (), flags);
3386       return VPPCOM_EAFNOSUPPORT;
3387     }
3388
3389   if (ep)
3390     {
3391       if (session->transport.is_ip4)
3392         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3393                           sizeof (ip4_address_t));
3394       else
3395         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3396                           sizeof (ip6_address_t));
3397     }
3398
3399   return rv;
3400 }
3401
3402 int
3403 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3404                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3405 {
3406   if (!buffer)
3407     return VPPCOM_EINVAL;
3408
3409   if (ep)
3410     {
3411       // TBD
3412       return VPPCOM_EINVAL;
3413     }
3414
3415   if (flags)
3416     {
3417       // TBD check the flags and do the right thing
3418       VDBG (2, "VCL<%d>: handling flags 0x%u (%d) not implemented yet.",
3419             getpid (), flags, flags);
3420     }
3421
3422   return (vppcom_session_write_inline (session_handle, buffer, buflen, 1));
3423 }
3424
3425 int
3426 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3427 {
3428   vcl_worker_t *wrk = vcl_worker_get_current ();
3429   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3430   u32 i, keep_trying = 1;
3431   svm_msg_q_msg_t msg;
3432   session_event_t *e;
3433   int rv, num_ev = 0;
3434
3435   VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
3436         getpid (), vp, n_sids, wait_for_time);
3437
3438   if (!vp)
3439     return VPPCOM_EFAULT;
3440
3441   do
3442     {
3443       vcl_session_t *session;
3444
3445       /* Dequeue all events and drop all unhandled io events */
3446       while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
3447         {
3448           e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
3449           vcl_handle_mq_event (wrk, e);
3450           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
3451         }
3452       vec_reset_length (wrk->unhandled_evts_vector);
3453
3454       for (i = 0; i < n_sids; i++)
3455         {
3456           session = vcl_session_get (wrk, vp[i].sid);
3457           if (!session)
3458             {
3459               vp[i].revents = POLLHUP;
3460               num_ev++;
3461               continue;
3462             }
3463
3464           vp[i].revents = 0;
3465
3466           if (POLLIN & vp[i].events)
3467             {
3468               rv = vppcom_session_read_ready (session);
3469               if (rv > 0)
3470                 {
3471                   vp[i].revents |= POLLIN;
3472                   num_ev++;
3473                 }
3474               else if (rv < 0)
3475                 {
3476                   switch (rv)
3477                     {
3478                     case VPPCOM_ECONNRESET:
3479                       vp[i].revents = POLLHUP;
3480                       break;
3481
3482                     default:
3483                       vp[i].revents = POLLERR;
3484                       break;
3485                     }
3486                   num_ev++;
3487                 }
3488             }
3489
3490           if (POLLOUT & vp[i].events)
3491             {
3492               rv = vppcom_session_write_ready (session);
3493               if (rv > 0)
3494                 {
3495                   vp[i].revents |= POLLOUT;
3496                   num_ev++;
3497                 }
3498               else if (rv < 0)
3499                 {
3500                   switch (rv)
3501                     {
3502                     case VPPCOM_ECONNRESET:
3503                       vp[i].revents = POLLHUP;
3504                       break;
3505
3506                     default:
3507                       vp[i].revents = POLLERR;
3508                       break;
3509                     }
3510                   num_ev++;
3511                 }
3512             }
3513
3514           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3515             {
3516               vp[i].revents = POLLNVAL;
3517               num_ev++;
3518             }
3519         }
3520       if (wait_for_time != -1)
3521         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3522     }
3523   while ((num_ev == 0) && keep_trying);
3524
3525   if (VPPCOM_DEBUG > 3)
3526     {
3527       clib_warning ("VCL<%d>: returning %d", getpid (), num_ev);
3528       for (i = 0; i < n_sids; i++)
3529         {
3530           clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
3531                         ".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
3532                         vp[i].events, vp[i].revents);
3533         }
3534     }
3535   return num_ev;
3536 }
3537
3538 int
3539 vppcom_mq_epoll_fd (void)
3540 {
3541   vcl_worker_t *wrk = vcl_worker_get_current ();
3542   return wrk->mqs_epfd;
3543 }
3544
3545 int
3546 vppcom_session_index (uint32_t session_handle)
3547 {
3548   return session_handle & 0xFFFFFF;
3549 }
3550
3551 int
3552 vppcom_session_handle (uint32_t session_index)
3553 {
3554   return (vcl_get_worker_index () << 24) | session_index;
3555 }
3556
3557 int
3558 vppcom_worker_register (void)
3559 {
3560   if (!vcl_worker_alloc_and_init ())
3561     return VPPCOM_EEXIST;
3562
3563   if (vcl_worker_set_bapi ())
3564     return VPPCOM_EEXIST;
3565
3566   if (vcl_worker_register_with_vpp ())
3567     return VPPCOM_EEXIST;
3568
3569   return VPPCOM_OK;
3570 }
3571
3572 int
3573 vppcom_worker_index (void)
3574 {
3575   return vcl_get_worker_index ();
3576 }
3577
3578 /*
3579  * fd.io coding-style-patch-verification: ON
3580  *
3581  * Local Variables:
3582  * eval: (c-set-style "gnu")
3583  * End:
3584  */