vcl: handle worker process exit
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <svm/svm_fifo_segment.h>
19 #include <vcl/vppcom.h>
20 #include <vcl/vcl_debug.h>
21 #include <vcl/vcl_private.h>
22
23 __thread uword __vcl_worker_index = ~0;
24
25
26 static int
27 vcl_wait_for_segment (u64 segment_handle)
28 {
29   vcl_worker_t *wrk = vcl_worker_get_current ();
30   u32 wait_for_seconds = 10, segment_index;
31   f64 timeout;
32
33   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
34     return 1;
35
36   timeout = clib_time_now (&wrk->clib_time) + wait_for_seconds;
37   while (clib_time_now (&wrk->clib_time) < timeout)
38     {
39       segment_index = vcl_segment_table_lookup (segment_handle);
40       if (segment_index != VCL_INVALID_SEGMENT_INDEX)
41         return 0;
42       usleep (10);
43     }
44   return 1;
45 }
46
47 const char *
48 vppcom_session_state_str (session_state_t state)
49 {
50   char *st;
51
52   switch (state)
53     {
54     case STATE_START:
55       st = "STATE_START";
56       break;
57
58     case STATE_CONNECT:
59       st = "STATE_CONNECT";
60       break;
61
62     case STATE_LISTEN:
63       st = "STATE_LISTEN";
64       break;
65
66     case STATE_ACCEPT:
67       st = "STATE_ACCEPT";
68       break;
69
70     case STATE_CLOSE_ON_EMPTY:
71       st = "STATE_CLOSE_ON_EMPTY";
72       break;
73
74     case STATE_DISCONNECT:
75       st = "STATE_DISCONNECT";
76       break;
77
78     case STATE_FAILED:
79       st = "STATE_FAILED";
80       break;
81
82     default:
83       st = "UNKNOWN_STATE";
84       break;
85     }
86
87   return st;
88 }
89
90 u8 *
91 format_ip4_address (u8 * s, va_list * args)
92 {
93   u8 *a = va_arg (*args, u8 *);
94   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
95 }
96
97 u8 *
98 format_ip6_address (u8 * s, va_list * args)
99 {
100   ip6_address_t *a = va_arg (*args, ip6_address_t *);
101   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
102
103   i_max_n_zero = ARRAY_LEN (a->as_u16);
104   max_n_zeros = 0;
105   i_first_zero = i_max_n_zero;
106   n_zeros = 0;
107   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
108     {
109       u32 is_zero = a->as_u16[i] == 0;
110       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
111         {
112           i_first_zero = i;
113           n_zeros = 0;
114         }
115       n_zeros += is_zero;
116       if ((!is_zero && n_zeros > max_n_zeros)
117           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
118         {
119           i_max_n_zero = i_first_zero;
120           max_n_zeros = n_zeros;
121           i_first_zero = ARRAY_LEN (a->as_u16);
122           n_zeros = 0;
123         }
124     }
125
126   last_double_colon = 0;
127   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
128     {
129       if (i == i_max_n_zero && max_n_zeros > 1)
130         {
131           s = format (s, "::");
132           i += max_n_zeros - 1;
133           last_double_colon = 1;
134         }
135       else
136         {
137           s = format (s, "%s%x",
138                       (last_double_colon || i == 0) ? "" : ":",
139                       clib_net_to_host_u16 (a->as_u16[i]));
140           last_double_colon = 0;
141         }
142     }
143
144   return s;
145 }
146
147 /* Format an IP46 address. */
148 u8 *
149 format_ip46_address (u8 * s, va_list * args)
150 {
151   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
152   ip46_type_t type = va_arg (*args, ip46_type_t);
153   int is_ip4 = 1;
154
155   switch (type)
156     {
157     case IP46_TYPE_ANY:
158       is_ip4 = ip46_address_is_ip4 (ip46);
159       break;
160     case IP46_TYPE_IP4:
161       is_ip4 = 1;
162       break;
163     case IP46_TYPE_IP6:
164       is_ip4 = 0;
165       break;
166     }
167
168   return is_ip4 ?
169     format (s, "%U", format_ip4_address, &ip46->ip4) :
170     format (s, "%U", format_ip6_address, &ip46->ip6);
171 }
172
173 /*
174  * VPPCOM Utility Functions
175  */
176
177
178 static svm_msg_q_t *
179 vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
180 {
181   if (vcl_session_is_ct (s))
182     return wrk->vpp_event_queues[0];
183   else
184     return wrk->vpp_event_queues[s->tx_fifo->master_thread_index];
185 }
186
187 static void
188 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
189                                  session_handle_t handle, int retval)
190 {
191   app_session_evt_t _app_evt, *app_evt = &_app_evt;
192   session_accepted_reply_msg_t *rmp;
193   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
194   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
195   rmp->handle = handle;
196   rmp->context = context;
197   rmp->retval = retval;
198   app_send_ctrl_evt_to_vpp (mq, app_evt);
199 }
200
201 static void
202 vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
203                                      session_handle_t handle, int retval)
204 {
205   app_session_evt_t _app_evt, *app_evt = &_app_evt;
206   session_disconnected_reply_msg_t *rmp;
207   app_alloc_ctrl_evt_to_vpp (mq, app_evt,
208                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
209   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
210   rmp->handle = handle;
211   rmp->context = context;
212   rmp->retval = retval;
213   app_send_ctrl_evt_to_vpp (mq, app_evt);
214 }
215
216 static void
217 vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
218                               session_handle_t handle, int retval)
219 {
220   app_session_evt_t _app_evt, *app_evt = &_app_evt;
221   session_reset_reply_msg_t *rmp;
222   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_RESET_REPLY);
223   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
224   rmp->handle = handle;
225   rmp->context = context;
226   rmp->retval = retval;
227   app_send_ctrl_evt_to_vpp (mq, app_evt);
228 }
229
230 static u32
231 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
232 {
233   vcl_session_t *session, *listen_session;
234   svm_fifo_t *rx_fifo, *tx_fifo;
235   u32 vpp_wrk_index;
236   svm_msg_q_t *evt_q;
237
238   session = vcl_session_alloc (wrk);
239
240   listen_session = vcl_session_table_lookup_listener (wrk,
241                                                       mp->listener_handle);
242   if (!listen_session)
243     {
244       svm_msg_q_t *evt_q;
245       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
246       clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
247                     "unknown vpp listener handle %llx",
248                     getpid (), mp->listener_handle);
249       vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
250                                        VNET_API_ERROR_INVALID_ARGUMENT);
251       vcl_session_free (wrk, session);
252       return VCL_INVALID_SESSION_INDEX;
253     }
254
255   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
256   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
257
258   if (mp->server_event_queue_address)
259     {
260       session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address,
261                                              svm_msg_q_t *);
262       session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
263                                              svm_msg_q_t *);
264       if (vcl_wait_for_segment (mp->segment_handle))
265         {
266           clib_warning ("segment for session %u couldn't be mounted!",
267                         session->session_index);
268           return VCL_INVALID_SESSION_INDEX;
269         }
270       rx_fifo->master_session_index = session->session_index;
271       tx_fifo->master_session_index = session->session_index;
272       rx_fifo->master_thread_index = vcl_get_worker_index ();
273       tx_fifo->master_thread_index = vcl_get_worker_index ();
274       vec_validate (wrk->vpp_event_queues, 0);
275       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
276       wrk->vpp_event_queues[0] = evt_q;
277     }
278   else
279     {
280       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
281                                              svm_msg_q_t *);
282       rx_fifo->client_session_index = session->session_index;
283       tx_fifo->client_session_index = session->session_index;
284       rx_fifo->client_thread_index = vcl_get_worker_index ();
285       tx_fifo->client_thread_index = vcl_get_worker_index ();
286       vpp_wrk_index = tx_fifo->master_thread_index;
287       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
288       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
289     }
290
291   session->vpp_handle = mp->handle;
292   session->client_context = mp->context;
293   session->rx_fifo = rx_fifo;
294   session->tx_fifo = tx_fifo;
295
296   session->session_state = STATE_ACCEPT;
297   session->transport.rmt_port = mp->port;
298   session->transport.is_ip4 = mp->is_ip4;
299   clib_memcpy_fast (&session->transport.rmt_ip, mp->ip,
300                     sizeof (ip46_address_t));
301
302   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
303   session->transport.lcl_port = listen_session->transport.lcl_port;
304   session->transport.lcl_ip = listen_session->transport.lcl_ip;
305   session->session_type = listen_session->session_type;
306   session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
307
308   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
309         " address %U port %d queue %p!", getpid (), mp->handle,
310         session->session_index,
311         mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip,
312         mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
313         clib_net_to_host_u16 (mp->port), session->vpp_evt_q);
314   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
315
316   return session->session_index;
317 }
318
319 static u32
320 vcl_session_connected_handler (vcl_worker_t * wrk,
321                                session_connected_msg_t * mp)
322 {
323   u32 session_index, vpp_wrk_index;
324   svm_fifo_t *rx_fifo, *tx_fifo;
325   vcl_session_t *session = 0;
326   svm_msg_q_t *evt_q;
327
328   session_index = mp->context;
329   session = vcl_session_get (wrk, session_index);
330   if (!session)
331     {
332       clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
333                     "Invalid session index (%u)!",
334                     getpid (), mp->handle, session_index);
335       return VCL_INVALID_SESSION_INDEX;
336     }
337   if (mp->retval)
338     {
339       clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
340                     session_index, format_api_error, ntohl (mp->retval));
341       session->session_state = STATE_FAILED;
342       session->vpp_handle = mp->handle;
343       return session_index;
344     }
345
346   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
347   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
348   if (vcl_wait_for_segment (mp->segment_handle))
349     {
350       clib_warning ("segment for session %u couldn't be mounted!",
351                     session->session_index);
352       return VCL_INVALID_SESSION_INDEX;
353     }
354
355   rx_fifo->client_session_index = session_index;
356   tx_fifo->client_session_index = session_index;
357   rx_fifo->client_thread_index = vcl_get_worker_index ();
358   tx_fifo->client_thread_index = vcl_get_worker_index ();
359
360   if (mp->client_event_queue_address)
361     {
362       session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
363                                              svm_msg_q_t *);
364       session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
365                                              svm_msg_q_t *);
366
367       vec_validate (wrk->vpp_event_queues, 0);
368       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
369       wrk->vpp_event_queues[0] = evt_q;
370     }
371   else
372     {
373       session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
374                                              svm_msg_q_t *);
375       vpp_wrk_index = tx_fifo->master_thread_index;
376       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
377       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
378     }
379
380   session->rx_fifo = rx_fifo;
381   session->tx_fifo = tx_fifo;
382   session->vpp_handle = mp->handle;
383   session->transport.is_ip4 = mp->is_ip4;
384   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
385                     sizeof (session->transport.lcl_ip));
386   session->transport.lcl_port = mp->lcl_port;
387   session->session_state = STATE_CONNECT;
388
389   /* Add it to lookup table */
390   hash_set (wrk->session_index_by_vpp_handles, mp->handle, session_index);
391
392   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: connect succeeded! "
393         "session_rx_fifo %p, refcnt %d, session_tx_fifo %p, refcnt %d",
394         getpid (), mp->handle, session_index, session->rx_fifo,
395         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
396
397   return session_index;
398 }
399
400 static u32
401 vcl_session_reset_handler (vcl_worker_t * wrk,
402                            session_reset_msg_t * reset_msg)
403 {
404   vcl_session_t *session;
405   u32 sid;
406
407   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
408   session = vcl_session_get (wrk, sid);
409   if (!session)
410     {
411       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
412       return VCL_INVALID_SESSION_INDEX;
413     }
414   session->session_state = STATE_CLOSE_ON_EMPTY;
415   VDBG (0, "reset handle 0x%llx, sid %u ", reset_msg->handle, sid);
416   vcl_send_session_reset_reply (vcl_session_vpp_evt_q (wrk, session),
417                                 wrk->my_client_index, reset_msg->handle, 0);
418   return sid;
419 }
420
421 static u32
422 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
423 {
424   vcl_session_t *session;
425   u32 sid = mp->context;
426
427   session = vcl_session_get (wrk, sid);
428   if (mp->retval)
429     {
430       VERR ("vpp handle 0x%llx, sid %u: bind failed: %U", mp->handle, sid,
431             format_api_error, mp->retval);
432       if (session)
433         {
434           session->session_state = STATE_FAILED;
435           session->vpp_handle = mp->handle;
436           return sid;
437         }
438       else
439         {
440           clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
441                         "Invalid session index (%u)!",
442                         getpid (), mp->handle, sid);
443           return VCL_INVALID_SESSION_INDEX;
444         }
445     }
446
447   session->vpp_handle = mp->handle;
448   session->transport.is_ip4 = mp->lcl_is_ip4;
449   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
450                     sizeof (ip46_address_t));
451   session->transport.lcl_port = mp->lcl_port;
452   vcl_session_table_add_listener (wrk, mp->handle, sid);
453   session->session_state = STATE_LISTEN;
454
455   if (session->is_dgram)
456     {
457       svm_fifo_t *rx_fifo, *tx_fifo;
458       session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
459       rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
460       rx_fifo->client_session_index = sid;
461       tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
462       tx_fifo->client_session_index = sid;
463       session->rx_fifo = rx_fifo;
464       session->tx_fifo = tx_fifo;
465     }
466
467   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: bind succeeded!",
468         getpid (), mp->handle, sid);
469   return sid;
470 }
471
472 static int
473 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
474 {
475   session_accepted_msg_t *accepted_msg;
476   session_disconnected_msg_t *disconnected_msg;
477   vcl_session_msg_t *vcl_msg;
478   vcl_session_t *session;
479   u64 handle;
480   u32 sid;
481
482   switch (e->event_type)
483     {
484     case FIFO_EVENT_APP_RX:
485     case FIFO_EVENT_APP_TX:
486     case SESSION_IO_EVT_CT_RX:
487     case SESSION_IO_EVT_CT_TX:
488       vec_add1 (wrk->unhandled_evts_vector, *e);
489       break;
490     case SESSION_CTRL_EVT_ACCEPTED:
491       accepted_msg = (session_accepted_msg_t *) e->data;
492       handle = accepted_msg->listener_handle;
493       session = vcl_session_table_lookup_listener (wrk, handle);
494       if (!session)
495         {
496           clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
497                         "listener handle %llx", getpid (), handle);
498           break;
499         }
500
501       clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
502       vcl_msg->accepted_msg = *accepted_msg;
503       break;
504     case SESSION_CTRL_EVT_CONNECTED:
505       vcl_session_connected_handler (wrk,
506                                      (session_connected_msg_t *) e->data);
507       break;
508     case SESSION_CTRL_EVT_DISCONNECTED:
509       disconnected_msg = (session_disconnected_msg_t *) e->data;
510       sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
511       session = vcl_session_get (wrk, sid);
512       if (!session)
513         {
514           VDBG (0, "request to disconnect unknown handle 0x%llx",
515                 disconnected_msg->handle);
516           break;
517         }
518       session->session_state = STATE_DISCONNECT;
519       VDBG (0, "disconnected handle 0x%llx, sid %u", disconnected_msg->handle,
520             sid);
521       break;
522     case SESSION_CTRL_EVT_RESET:
523       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
524       break;
525     case SESSION_CTRL_EVT_BOUND:
526       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
527       break;
528     default:
529       clib_warning ("unhandled %u", e->event_type);
530     }
531   return VPPCOM_OK;
532 }
533
534 static inline int
535 vppcom_wait_for_session_state_change (u32 session_index,
536                                       session_state_t state,
537                                       f64 wait_for_time)
538 {
539   vcl_worker_t *wrk = vcl_worker_get_current ();
540   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
541   vcl_session_t *volatile session;
542   svm_msg_q_msg_t msg;
543   session_event_t *e;
544
545   do
546     {
547       session = vcl_session_get (wrk, session_index);
548       if (PREDICT_FALSE (!session))
549         {
550           return VPPCOM_EBADFD;
551         }
552       if (session->session_state & state)
553         {
554           return VPPCOM_OK;
555         }
556       if (session->session_state & STATE_FAILED)
557         {
558           return VPPCOM_ECONNREFUSED;
559         }
560
561       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
562         continue;
563       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
564       vcl_handle_mq_event (wrk, e);
565       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
566     }
567   while (clib_time_now (&wrk->clib_time) < timeout);
568
569   VDBG (0, "VCL<%d>: timeout waiting for state 0x%x (%s)", getpid (), state,
570         vppcom_session_state_str (state));
571   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
572
573   return VPPCOM_ETIMEDOUT;
574 }
575
576 static int
577 vppcom_app_session_enable (void)
578 {
579   int rv;
580
581   if (vcm->app_state != STATE_APP_ENABLED)
582     {
583       vppcom_send_session_enable_disable (1 /* is_enabled == TRUE */ );
584       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
585       if (PREDICT_FALSE (rv))
586         {
587           VDBG (0, "VCL<%d>: application session enable timed out! "
588                 "returning %d (%s)", getpid (), rv, vppcom_retval_str (rv));
589           return rv;
590         }
591     }
592   return VPPCOM_OK;
593 }
594
595 static int
596 vppcom_app_attach (void)
597 {
598   int rv;
599
600   vppcom_app_send_attach ();
601   rv = vcl_wait_for_app_state_change (STATE_APP_ATTACHED);
602   if (PREDICT_FALSE (rv))
603     {
604       VDBG (0, "VCL<%d>: application attach timed out! returning %d (%s)",
605             getpid (), rv, vppcom_retval_str (rv));
606       return rv;
607     }
608
609   return VPPCOM_OK;
610 }
611
612 static int
613 vppcom_session_unbind (u32 session_handle)
614 {
615   vcl_worker_t *wrk = vcl_worker_get_current ();
616   vcl_session_t *session = 0;
617   u64 vpp_handle;
618
619   session = vcl_session_get_w_handle (wrk, session_handle);
620   if (!session)
621     return VPPCOM_EBADFD;
622
623   vpp_handle = session->vpp_handle;
624   vcl_session_table_del_listener (wrk, vpp_handle);
625   session->vpp_handle = ~0;
626   session->session_state = STATE_DISCONNECT;
627
628   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
629         " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
630         vppcom_session_state_str (STATE_DISCONNECT));
631   vcl_evt (VCL_EVT_UNBIND, session);
632   vppcom_send_unbind_sock (vpp_handle);
633
634   return VPPCOM_OK;
635 }
636
637 static int
638 vppcom_session_disconnect (u32 session_handle)
639 {
640   vcl_worker_t *wrk = vcl_worker_get_current ();
641   svm_msg_q_t *vpp_evt_q;
642   vcl_session_t *session;
643   session_state_t state;
644   u64 vpp_handle;
645
646   session = vcl_session_get_w_handle (wrk, session_handle);
647   if (!session)
648     return VPPCOM_EBADFD;
649
650   vpp_handle = session->vpp_handle;
651   state = session->session_state;
652
653   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u state 0x%x (%s)", getpid (),
654         vpp_handle, session_handle, state, vppcom_session_state_str (state));
655
656   if (PREDICT_FALSE (state & STATE_LISTEN))
657     {
658       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
659                     "Cannot disconnect a listen socket!",
660                     getpid (), vpp_handle, session_handle);
661       return VPPCOM_EBADFD;
662     }
663
664   if (state & STATE_CLOSE_ON_EMPTY)
665     {
666       vpp_evt_q = vcl_session_vpp_evt_q (wrk, session);
667       vcl_send_session_disconnected_reply (vpp_evt_q, wrk->my_client_index,
668                                            vpp_handle, 0);
669       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
670             "REPLY...", getpid (), vpp_handle, session_handle);
671     }
672   else
673     {
674       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...",
675             getpid (), vpp_handle, session_handle);
676       vppcom_send_disconnect_session (vpp_handle);
677     }
678
679   return VPPCOM_OK;
680 }
681
682 static void
683 vcl_cleanup_bapi (void)
684 {
685   socket_client_main_t *scm = &socket_client_main;
686   api_main_t *am = &api_main;
687
688   am->my_client_index = ~0;
689   am->my_registration = 0;
690   am->vl_input_queue = 0;
691   am->msg_index_by_name_and_crc = 0;
692   scm->socket_fd = 0;
693
694   vl_client_api_unmap ();
695 }
696
697 void
698 vcl_app_fork_child_handler (void)
699 {
700   u8 *child_name;
701   int rv, parent_wrk;
702
703   parent_wrk = vcl_get_worker_index ();
704   VDBG (0, "initializing forked child with parent wrk %u", parent_wrk);
705
706   /*
707    * Allocate worker
708    */
709   vcl_set_worker_index (~0);
710   if (!vcl_worker_alloc_and_init ())
711     VERR ("couldn't allocate new worker");
712
713   /*
714    * Attach to binary api
715    */
716   child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
717   vcl_cleanup_bapi ();
718   vppcom_api_hookup ();
719   vcm->app_state = STATE_APP_START;
720   rv = vppcom_connect_to_vpp ((char *) child_name);
721   vec_free (child_name);
722   if (rv)
723     {
724       VERR ("couldn't connect to VPP!");
725       return;
726     }
727
728   /*
729    * Register worker with vpp and share sessions
730    */
731   vcl_worker_register_with_vpp ();
732   vcl_worker_share_sessions (parent_wrk);
733
734   VDBG (0, "forked child main worker initialized");
735   vcm->forking = 0;
736 }
737
738 void
739 vcl_app_fork_parent_handler (void)
740 {
741   vcm->forking = 1;
742
743   while (vcm->forking)
744     ;
745 }
746
747 /**
748  * Handle app exit
749  *
750  * Notify vpp of the disconnect and mark the worker as free. If we're the
751  * last worker, do a full cleanup otherwise, since we're probably a forked
752  * child, avoid syscalls as much as possible. We might've lost privileges.
753  */
754 void
755 vppcom_app_exit (void)
756 {
757   if (!pool_elts (vcm->workers))
758     return;
759
760   vcl_worker_cleanup (1 /* notify vpp */ );
761   vcl_elog_stop (vcm);
762   if (vec_len (vcm->workers) == 1)
763     vl_client_disconnect_from_vlib ();
764   else
765     vl_client_send_disconnect ();
766 }
767
768 /*
769  * VPPCOM Public API functions
770  */
771 int
772 vppcom_app_create (char *app_name)
773 {
774   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
775   int rv;
776
777   if (vcm->is_init)
778     {
779       clib_warning ("already initialized");
780       return -1;
781     }
782
783   vcm->is_init = 1;
784   vppcom_cfg (&vcm->cfg);
785   vcl_cfg = &vcm->cfg;
786
787   vcm->main_cpu = pthread_self ();
788   vcm->main_pid = getpid ();
789   vcm->app_name = format (0, "%s", app_name);
790   vppcom_init_error_string_table ();
791   svm_fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
792                               20 /* timeout in secs */ );
793   pool_alloc (vcm->workers, vcl_cfg->max_workers);
794   clib_spinlock_init (&vcm->workers_lock);
795   clib_rwlock_init (&vcm->segment_table_lock);
796   pthread_atfork (NULL, vcl_app_fork_parent_handler,
797                   vcl_app_fork_child_handler);
798   atexit (vppcom_app_exit);
799
800   /* Allocate default worker */
801   vcl_worker_alloc_and_init ();
802
803   /* API hookup and connect to VPP */
804   vppcom_api_hookup ();
805   vcl_elog_init (vcm);
806   vcm->app_state = STATE_APP_START;
807   rv = vppcom_connect_to_vpp (app_name);
808   if (rv)
809     {
810       VERR ("couldn't connect to VPP!");
811       return rv;
812     }
813   VDBG (0, "sending session enable");
814   rv = vppcom_app_session_enable ();
815   if (rv)
816     {
817       VERR ("vppcom_app_session_enable() failed!");
818       return rv;
819     }
820
821   VDBG (0, "sending app attach");
822   rv = vppcom_app_attach ();
823   if (rv)
824     {
825       VERR ("vppcom_app_attach() failed!");
826       return rv;
827     }
828
829   VDBG (0, "app_name '%s', my_client_index %d (0x%x)", app_name,
830         vcm->workers[0].my_client_index, vcm->workers[0].my_client_index);
831
832   return VPPCOM_OK;
833 }
834
835 void
836 vppcom_app_destroy (void)
837 {
838   int rv;
839   f64 orig_app_timeout;
840
841   if (!pool_elts (vcm->workers))
842     return;
843
844   vcl_evt (VCL_EVT_DETACH, vcm);
845
846   if (pool_elts (vcm->workers) == 1)
847     {
848       vppcom_app_send_detach ();
849       orig_app_timeout = vcm->cfg.app_timeout;
850       vcm->cfg.app_timeout = 2.0;
851       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
852       vcm->cfg.app_timeout = orig_app_timeout;
853       if (PREDICT_FALSE (rv))
854         VDBG (0, "application detach timed out! returning %d (%s)", rv,
855               vppcom_retval_str (rv));
856       vec_free (vcm->app_name);
857       vcl_worker_cleanup (0 /* notify vpp */ );
858     }
859   else
860     {
861       vcl_worker_cleanup (1 /* notify vpp */ );
862     }
863
864   vcl_elog_stop (vcm);
865   vl_client_disconnect_from_vlib ();
866 }
867
868 int
869 vppcom_session_create (u8 proto, u8 is_nonblocking)
870 {
871   vcl_worker_t *wrk = vcl_worker_get_current ();
872   vcl_session_t *session;
873
874   session = vcl_session_alloc (wrk);
875
876   session->session_type = proto;
877   session->session_state = STATE_START;
878   session->vpp_handle = ~0;
879   session->is_dgram = proto == VPPCOM_PROTO_UDP;
880
881   if (is_nonblocking)
882     VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
883
884   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
885            is_nonblocking, session_index);
886
887   VDBG (0, "created sid %u", session->session_index);
888
889   return vcl_session_handle (session);
890 }
891
892 int
893 vppcom_session_close (uint32_t session_handle)
894 {
895   vcl_worker_t *wrk = vcl_worker_get_current ();
896   u8 is_vep, do_disconnect = 1;
897   vcl_session_t *session = 0;
898   session_state_t state;
899   u32 next_sh, vep_sh;
900   int rv = VPPCOM_OK;
901   u64 vpp_handle;
902
903   session = vcl_session_get_w_handle (wrk, session_handle);
904   if (!session)
905     return VPPCOM_EBADFD;
906
907   if (session->shared_index != ~0)
908     do_disconnect = vcl_worker_unshare_session (wrk, session);
909
910   is_vep = session->is_vep;
911   next_sh = session->vep.next_sh;
912   vep_sh = session->vep.vep_sh;
913   state = session->session_state;
914   vpp_handle = session->vpp_handle;
915
916   VDBG (0, "Closing session handle %u vpp handle %u", session_handle,
917         vpp_handle);
918
919   if (is_vep)
920     {
921       while (next_sh != ~0)
922         {
923           rv = vppcom_epoll_ctl (session_handle, EPOLL_CTL_DEL, next_sh, 0);
924           if (PREDICT_FALSE (rv < 0))
925             VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u"
926                   " failed! rv %d (%s)", vpp_handle, next_sh, vep_sh, rv,
927                   vppcom_retval_str (rv));
928
929           next_sh = session->vep.next_sh;
930         }
931     }
932   else
933     {
934       if (session->is_vep_session)
935         {
936           rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, session_handle, 0);
937           if (rv < 0)
938             VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u "
939                   "failed! rv %d (%s)", vpp_handle, session_handle, vep_sh,
940                   rv, vppcom_retval_str (rv));
941         }
942
943       if (!do_disconnect)
944         goto cleanup;
945
946       if (state & STATE_LISTEN)
947         {
948           rv = vppcom_session_unbind (session_handle);
949           if (PREDICT_FALSE (rv < 0))
950             VDBG (0, "vpp handle 0x%llx, sid %u: listener unbind failed! "
951                   "rv %d (%s)", vpp_handle, session_handle, rv,
952                   vppcom_retval_str (rv));
953         }
954       else if (state & STATE_OPEN)
955         {
956           rv = vppcom_session_disconnect (session_handle);
957           if (PREDICT_FALSE (rv < 0))
958             clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
959                           "session disconnect failed! rv %d (%s)",
960                           getpid (), vpp_handle, session_handle,
961                           rv, vppcom_retval_str (rv));
962         }
963     }
964
965 cleanup:
966
967   if (vcl_session_is_ct (session))
968     {
969       vcl_cut_through_registration_t *ctr;
970       uword mq_addr;
971
972       mq_addr = pointer_to_uword (session->our_evt_q);
973       ctr = vcl_ct_registration_lock_and_lookup (wrk, mq_addr);
974       ASSERT (ctr);
975       if (ctr->epoll_evt_conn_index != ~0)
976         vcl_mq_epoll_del_evfd (wrk, ctr->epoll_evt_conn_index);
977       VDBG (0, "Removing ct registration %u",
978             vcl_ct_registration_index (wrk, ctr));
979       vcl_ct_registration_del (wrk, ctr);
980       vcl_ct_registration_lookup_del (wrk, mq_addr);
981       vcl_ct_registration_unlock (wrk);
982     }
983
984   if (vpp_handle != ~0)
985     {
986       vcl_session_table_del_vpp_handle (wrk, vpp_handle);
987     }
988   vcl_session_free (wrk, session);
989
990   VDBG (0, "session handle %u vpp handle %u removed", session_handle,
991         vpp_handle);
992
993   vcl_evt (VCL_EVT_CLOSE, session, rv);
994
995   return rv;
996 }
997
998 int
999 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
1000 {
1001   vcl_worker_t *wrk = vcl_worker_get_current ();
1002   vcl_session_t *session = 0;
1003
1004   if (!ep || !ep->ip)
1005     return VPPCOM_EINVAL;
1006
1007   session = vcl_session_get_w_handle (wrk, session_handle);
1008   if (!session)
1009     return VPPCOM_EBADFD;
1010
1011   if (session->is_vep)
1012     {
1013       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1014                     "bind to an epoll session!", getpid (), session_handle);
1015       return VPPCOM_EBADFD;
1016     }
1017
1018   session->transport.is_ip4 = ep->is_ip4;
1019   if (ep->is_ip4)
1020     clib_memcpy_fast (&session->transport.lcl_ip.ip4, ep->ip,
1021                       sizeof (ip4_address_t));
1022   else
1023     clib_memcpy_fast (&session->transport.lcl_ip.ip6, ep->ip,
1024                       sizeof (ip6_address_t));
1025   session->transport.lcl_port = ep->port;
1026
1027   VDBG (0, "VCL<%d>: sid %u: binding to local %s address %U port %u, "
1028         "proto %s", getpid (), session_handle,
1029         session->transport.is_ip4 ? "IPv4" : "IPv6",
1030         format_ip46_address, &session->transport.lcl_ip,
1031         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1032         clib_net_to_host_u16 (session->transport.lcl_port),
1033         session->session_type ? "UDP" : "TCP");
1034   vcl_evt (VCL_EVT_BIND, session);
1035
1036   if (session->session_type == VPPCOM_PROTO_UDP)
1037     vppcom_session_listen (session_handle, 10);
1038
1039   return VPPCOM_OK;
1040 }
1041
1042 int
1043 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
1044 {
1045   vcl_worker_t *wrk = vcl_worker_get_current ();
1046   vcl_session_t *listen_session = 0;
1047   u64 listen_vpp_handle;
1048   int rv;
1049
1050   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1051   if (!listen_session)
1052     return VPPCOM_EBADFD;
1053
1054   if (q_len == 0 || q_len == ~0)
1055     q_len = vcm->cfg.listen_queue_size;
1056
1057   if (listen_session->is_vep)
1058     {
1059       clib_warning ("VCL<%d>: ERROR: sid %u: cannot listen on an "
1060                     "epoll session!", getpid (), listen_sh);
1061       return VPPCOM_EBADFD;
1062     }
1063
1064   listen_vpp_handle = listen_session->vpp_handle;
1065   if (listen_session->session_state & STATE_LISTEN)
1066     {
1067       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: already in listen state!",
1068             getpid (), listen_vpp_handle, listen_sh);
1069       return VPPCOM_OK;
1070     }
1071
1072   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: sending VPP bind+listen "
1073         "request...", getpid (), listen_vpp_handle, listen_sh);
1074
1075   /*
1076    * Send listen request to vpp and wait for reply
1077    */
1078   vppcom_send_bind_sock (listen_session);
1079   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1080                                              STATE_LISTEN,
1081                                              vcm->cfg.session_timeout);
1082
1083   if (PREDICT_FALSE (rv))
1084     {
1085       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1086       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: bind+listen failed! "
1087             "returning %d (%s)", getpid (), listen_session->vpp_handle,
1088             listen_sh, rv, vppcom_retval_str (rv));
1089       return rv;
1090     }
1091
1092   return VPPCOM_OK;
1093 }
1094
1095 static int
1096 validate_args_session_accept_ (vcl_worker_t * wrk,
1097                                vcl_session_t * listen_session)
1098 {
1099   /* Input validation - expects spinlock on sessions_lockp */
1100   if (listen_session->is_vep)
1101     {
1102       clib_warning ("VCL<%d>: ERROR: sid %u: cannot accept on an "
1103                     "epoll session!", getpid (),
1104                     listen_session->session_index);
1105       return VPPCOM_EBADFD;
1106     }
1107
1108   if (listen_session->session_state != STATE_LISTEN)
1109     {
1110       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1111                     "not in listen state! state 0x%x (%s)", getpid (),
1112                     listen_session->vpp_handle, listen_session->session_index,
1113                     listen_session->session_state,
1114                     vppcom_session_state_str (listen_session->session_state));
1115       return VPPCOM_EBADFD;
1116     }
1117   return VPPCOM_OK;
1118 }
1119
1120 int
1121 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1122                        uint32_t flags)
1123 {
1124   u32 client_session_index = ~0, listen_session_index;
1125   vcl_worker_t *wrk = vcl_worker_get_current ();
1126   session_accepted_msg_t accepted_msg;
1127   vcl_session_t *listen_session = 0;
1128   vcl_session_t *client_session = 0;
1129   svm_msg_q_t *vpp_evt_q;
1130   vcl_session_msg_t *evt;
1131   u64 listen_vpp_handle;
1132   svm_msg_q_msg_t msg;
1133   session_event_t *e;
1134   u8 is_nonblocking;
1135   int rv;
1136
1137   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1138   if (!listen_session)
1139     return VPPCOM_EBADFD;
1140
1141   listen_session_index = listen_session->session_index;
1142   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1143     return rv;
1144
1145   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1146     {
1147       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1148       accepted_msg = evt->accepted_msg;
1149       goto handle;
1150     }
1151
1152   is_nonblocking = VCL_SESS_ATTR_TEST (listen_session->attr,
1153                                        VCL_SESS_ATTR_NONBLOCK);
1154   if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1155     return VPPCOM_EAGAIN;
1156
1157   while (1)
1158     {
1159       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1160         return VPPCOM_EAGAIN;
1161
1162       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1163       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1164         {
1165           clib_warning ("discarded event: %u", e->event_type);
1166           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1167           continue;
1168         }
1169       clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
1170       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1171       break;
1172     }
1173
1174 handle:
1175
1176   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg);
1177   listen_session = vcl_session_get (wrk, listen_session_index);
1178   client_session = vcl_session_get (wrk, client_session_index);
1179
1180   if (flags & O_NONBLOCK)
1181     VCL_SESS_ATTR_SET (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
1182
1183   listen_vpp_handle = listen_session->vpp_handle;
1184   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: Got a client request! "
1185         "vpp handle 0x%llx, sid %u, flags %d, is_nonblocking %u",
1186         getpid (), listen_vpp_handle, listen_session_handle,
1187         client_session->vpp_handle, client_session_index,
1188         flags, VCL_SESS_ATTR_TEST (client_session->attr,
1189                                    VCL_SESS_ATTR_NONBLOCK));
1190
1191   if (ep)
1192     {
1193       ep->is_ip4 = client_session->transport.is_ip4;
1194       ep->port = client_session->transport.rmt_port;
1195       if (client_session->transport.is_ip4)
1196         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip4,
1197                           sizeof (ip4_address_t));
1198       else
1199         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip6,
1200                           sizeof (ip6_address_t));
1201     }
1202
1203   if (accepted_msg.server_event_queue_address)
1204     vpp_evt_q = uword_to_pointer (accepted_msg.vpp_event_queue_address,
1205                                   svm_msg_q_t *);
1206   else
1207     vpp_evt_q = client_session->vpp_evt_q;
1208
1209   vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
1210                                    client_session->vpp_handle, 0);
1211
1212   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: accepted vpp handle 0x%llx, "
1213         "sid %u connection from peer %s address %U port %u to local %s "
1214         "address %U port %u", getpid (), listen_vpp_handle,
1215         listen_session_handle, client_session->vpp_handle,
1216         client_session_index,
1217         client_session->transport.is_ip4 ? "IPv4" : "IPv6",
1218         format_ip46_address, &client_session->transport.rmt_ip,
1219         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1220         clib_net_to_host_u16 (client_session->transport.rmt_port),
1221         client_session->transport.is_ip4 ? "IPv4" : "IPv6",
1222         format_ip46_address, &client_session->transport.lcl_ip,
1223         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1224         clib_net_to_host_u16 (client_session->transport.lcl_port));
1225   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1226            client_session_index);
1227
1228   return vcl_session_handle (client_session);
1229 }
1230
1231 int
1232 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1233 {
1234   vcl_worker_t *wrk = vcl_worker_get_current ();
1235   vcl_session_t *session = 0;
1236   u32 session_index;
1237   int rv;
1238
1239   session = vcl_session_get_w_handle (wrk, session_handle);
1240   if (!session)
1241     return VPPCOM_EBADFD;
1242   session_index = session->session_index;
1243
1244   if (PREDICT_FALSE (session->is_vep))
1245     {
1246       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1247                     "connect on an epoll session!", getpid (),
1248                     session_handle);
1249       return VPPCOM_EBADFD;
1250     }
1251
1252   if (PREDICT_FALSE (session->session_state & CLIENT_STATE_OPEN))
1253     {
1254       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: session already "
1255             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1256             getpid (), session->vpp_handle, session_handle,
1257             session->transport.is_ip4 ? "IPv4" : "IPv6",
1258             format_ip46_address,
1259             &session->transport.rmt_ip, session->transport.is_ip4 ?
1260             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1261             clib_net_to_host_u16 (session->transport.rmt_port),
1262             session->session_type ? "UDP" : "TCP", session->session_state,
1263             vppcom_session_state_str (session->session_state));
1264       return VPPCOM_OK;
1265     }
1266
1267   session->transport.is_ip4 = server_ep->is_ip4;
1268   if (session->transport.is_ip4)
1269     clib_memcpy_fast (&session->transport.rmt_ip.ip4, server_ep->ip,
1270                       sizeof (ip4_address_t));
1271   else
1272     clib_memcpy_fast (&session->transport.rmt_ip.ip6, server_ep->ip,
1273                       sizeof (ip6_address_t));
1274   session->transport.rmt_port = server_ep->port;
1275
1276   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connecting to server %s %U "
1277         "port %d proto %s",
1278         getpid (), session->vpp_handle, session_handle,
1279         session->transport.is_ip4 ? "IPv4" : "IPv6",
1280         format_ip46_address,
1281         &session->transport.rmt_ip, session->transport.is_ip4 ?
1282         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1283         clib_net_to_host_u16 (session->transport.rmt_port),
1284         session->session_type ? "UDP" : "TCP");
1285
1286   /*
1287    * Send connect request and wait for reply from vpp
1288    */
1289   vppcom_send_connect_sock (session);
1290   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
1291                                              vcm->cfg.session_timeout);
1292
1293   session = vcl_session_get (wrk, session_index);
1294
1295   if (PREDICT_FALSE (rv))
1296     {
1297       if (VPPCOM_DEBUG > 0)
1298         {
1299           if (session)
1300             clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: connect "
1301                           "failed! returning %d (%s)", getpid (),
1302                           session->vpp_handle, session_handle, rv,
1303                           vppcom_retval_str (rv));
1304           else
1305             clib_warning ("VCL<%d>: no session for sid %u: connect failed! "
1306                           "returning %d (%s)", getpid (),
1307                           session_handle, rv, vppcom_retval_str (rv));
1308         }
1309     }
1310   else
1311     VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connected!",
1312           getpid (), session->vpp_handle, session_handle);
1313
1314   return rv;
1315 }
1316
1317 static u8
1318 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1319 {
1320   if (!is_ct)
1321     return (e->event_type == FIFO_EVENT_APP_RX
1322             && e->fifo->client_session_index == sid);
1323   else
1324     return (e->event_type == SESSION_IO_EVT_CT_TX);
1325 }
1326
1327 static inline u8
1328 vcl_session_is_readable (vcl_session_t * s)
1329 {
1330   return ((s->session_state & STATE_OPEN)
1331           || (s->session_state == STATE_LISTEN
1332               && s->session_type == VPPCOM_PROTO_UDP));
1333 }
1334
1335 static inline int
1336 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1337                               u8 peek)
1338 {
1339   vcl_worker_t *wrk = vcl_worker_get_current ();
1340   int n_read = 0, rv, is_nonblocking;
1341   vcl_session_t *s = 0;
1342   svm_fifo_t *rx_fifo;
1343   svm_msg_q_msg_t msg;
1344   session_event_t *e;
1345   svm_msg_q_t *mq;
1346   u8 is_ct;
1347
1348   if (PREDICT_FALSE (!buf))
1349     return VPPCOM_EINVAL;
1350
1351   s = vcl_session_get_w_handle (wrk, session_handle);
1352   if (PREDICT_FALSE (!s || s->is_vep))
1353     return VPPCOM_EBADFD;
1354
1355   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
1356     {
1357       session_state_t state = s->session_state;
1358       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1359
1360       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
1361             "state 0x%x (%s), returning %d (%s)",
1362             getpid (), s->vpp_handle, session_handle, state,
1363             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1364       return rv;
1365     }
1366
1367   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1368   is_ct = vcl_session_is_ct (s);
1369   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1370   rx_fifo = s->rx_fifo;
1371   s->has_rx_evt = 0;
1372
1373   if (svm_fifo_is_empty (rx_fifo))
1374     {
1375       if (is_nonblocking)
1376         {
1377           svm_fifo_unset_event (rx_fifo);
1378           return VPPCOM_EWOULDBLOCK;
1379         }
1380       while (svm_fifo_is_empty (rx_fifo))
1381         {
1382           svm_fifo_unset_event (rx_fifo);
1383           svm_msg_q_lock (mq);
1384           if (svm_msg_q_is_empty (mq))
1385             svm_msg_q_wait (mq);
1386
1387           svm_msg_q_sub_w_lock (mq, &msg);
1388           e = svm_msg_q_msg_data (mq, &msg);
1389           svm_msg_q_unlock (mq);
1390           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1391             {
1392               vcl_handle_mq_event (wrk, e);
1393               svm_msg_q_free_msg (mq, &msg);
1394               continue;
1395             }
1396           svm_msg_q_free_msg (mq, &msg);
1397
1398           if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
1399             return 0;
1400         }
1401     }
1402
1403   if (s->is_dgram)
1404     n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1405   else
1406     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1407
1408   if (svm_fifo_is_empty (rx_fifo))
1409     svm_fifo_unset_event (rx_fifo);
1410
1411   if (is_ct && svm_fifo_want_tx_evt (rx_fifo))
1412     {
1413       svm_fifo_set_want_tx_evt (s->rx_fifo, 0);
1414       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
1415                               SVM_Q_WAIT);
1416     }
1417
1418   VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
1419         getpid (), s->vpp_handle, session_handle, n_read, rx_fifo);
1420
1421   return n_read;
1422 }
1423
1424 int
1425 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1426 {
1427   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1428 }
1429
1430 static int
1431 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1432 {
1433   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1434 }
1435
1436 int
1437 vppcom_session_read_segments (uint32_t session_handle,
1438                               vppcom_data_segments_t ds)
1439 {
1440   vcl_worker_t *wrk = vcl_worker_get_current ();
1441   int n_read = 0, rv, is_nonblocking;
1442   vcl_session_t *s = 0;
1443   svm_fifo_t *rx_fifo;
1444   svm_msg_q_msg_t msg;
1445   session_event_t *e;
1446   svm_msg_q_t *mq;
1447   u8 is_ct;
1448
1449   s = vcl_session_get_w_handle (wrk, session_handle);
1450   if (PREDICT_FALSE (!s || s->is_vep))
1451     return VPPCOM_EBADFD;
1452
1453   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
1454     {
1455       session_state_t state = s->session_state;
1456       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1457       return rv;
1458     }
1459
1460   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1461   is_ct = vcl_session_is_ct (s);
1462   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1463   rx_fifo = s->rx_fifo;
1464   s->has_rx_evt = 0;
1465
1466   if (svm_fifo_is_empty (rx_fifo))
1467     {
1468       if (is_nonblocking)
1469         {
1470           svm_fifo_unset_event (rx_fifo);
1471           return VPPCOM_EWOULDBLOCK;
1472         }
1473       while (svm_fifo_is_empty (rx_fifo))
1474         {
1475           svm_fifo_unset_event (rx_fifo);
1476           svm_msg_q_lock (mq);
1477           if (svm_msg_q_is_empty (mq))
1478             svm_msg_q_wait (mq);
1479
1480           svm_msg_q_sub_w_lock (mq, &msg);
1481           e = svm_msg_q_msg_data (mq, &msg);
1482           svm_msg_q_unlock (mq);
1483           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1484             {
1485               vcl_handle_mq_event (wrk, e);
1486               svm_msg_q_free_msg (mq, &msg);
1487               continue;
1488             }
1489           svm_msg_q_free_msg (mq, &msg);
1490
1491           if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
1492             return 0;
1493         }
1494     }
1495
1496   n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
1497   svm_fifo_unset_event (rx_fifo);
1498
1499   if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
1500     {
1501       /* If the peer is not polling send notification */
1502       if (!svm_fifo_has_event (s->rx_fifo))
1503         app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
1504                                 SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
1505     }
1506
1507   return n_read;
1508 }
1509
1510 void
1511 vppcom_session_free_segments (uint32_t session_handle,
1512                               vppcom_data_segments_t ds)
1513 {
1514   vcl_worker_t *wrk = vcl_worker_get_current ();
1515   vcl_session_t *s;
1516
1517   s = vcl_session_get_w_handle (wrk, session_handle);
1518   if (PREDICT_FALSE (!s || s->is_vep))
1519     return;
1520
1521   svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
1522 }
1523
1524 static inline int
1525 vppcom_session_read_ready (vcl_session_t * session)
1526 {
1527   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1528   if (PREDICT_FALSE (session->is_vep))
1529     {
1530       clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
1531                     "epoll session!", getpid (), session->session_index);
1532       return VPPCOM_EBADFD;
1533     }
1534
1535   if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
1536     {
1537       session_state_t state = session->session_state;
1538       int rv;
1539
1540       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1541
1542       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
1543             " state 0x%x (%s), returning %d (%s)", getpid (),
1544             session->vpp_handle, session->session_index, state,
1545             vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
1546       return rv;
1547     }
1548
1549   if (session->session_state & STATE_LISTEN)
1550     return clib_fifo_elts (session->accept_evts_fifo);
1551
1552   return svm_fifo_max_dequeue (session->rx_fifo);
1553 }
1554
1555 int
1556 vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
1557 {
1558   u32 first_copy = clib_min (ds[0].len, max_bytes);
1559   clib_memcpy_fast (buf, ds[0].data, first_copy);
1560   if (first_copy < max_bytes)
1561     {
1562       clib_memcpy_fast (buf + first_copy, ds[1].data,
1563                         clib_min (ds[1].len, max_bytes - first_copy));
1564     }
1565   return 0;
1566 }
1567
1568 static u8
1569 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1570 {
1571   if (!is_ct)
1572     return (e->event_type == FIFO_EVENT_APP_TX
1573             && e->fifo->client_session_index == sid);
1574   else
1575     return (e->event_type == SESSION_IO_EVT_CT_RX);
1576 }
1577
1578 int
1579 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
1580 {
1581   vcl_worker_t *wrk = vcl_worker_get_current ();
1582   int rv, n_write, is_nonblocking;
1583   vcl_session_t *s = 0;
1584   svm_fifo_t *tx_fifo = 0;
1585   session_evt_type_t et;
1586   svm_msg_q_msg_t msg;
1587   session_event_t *e;
1588   svm_msg_q_t *mq;
1589   u8 is_ct;
1590
1591   if (PREDICT_FALSE (!buf))
1592     return VPPCOM_EINVAL;
1593
1594   s = vcl_session_get_w_handle (wrk, session_handle);
1595   if (PREDICT_FALSE (!s))
1596     return VPPCOM_EBADFD;
1597
1598   if (PREDICT_FALSE (s->is_vep))
1599     {
1600       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1601                     "cannot write to an epoll session!",
1602                     getpid (), s->vpp_handle, session_handle);
1603
1604       return VPPCOM_EBADFD;
1605     }
1606
1607   if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
1608     {
1609       session_state_t state = s->session_state;
1610       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1611       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
1612             "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
1613             state, vppcom_session_state_str (state));
1614       return rv;
1615     }
1616
1617   tx_fifo = s->tx_fifo;
1618   is_ct = vcl_session_is_ct (s);
1619   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1620   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1621   if (svm_fifo_is_full (tx_fifo))
1622     {
1623       if (is_nonblocking)
1624         {
1625           return VPPCOM_EWOULDBLOCK;
1626         }
1627       while (svm_fifo_is_full (tx_fifo))
1628         {
1629           svm_fifo_set_want_tx_evt (tx_fifo, 1);
1630           svm_msg_q_lock (mq);
1631           if (svm_msg_q_is_empty (mq))
1632             svm_msg_q_wait (mq);
1633
1634           svm_msg_q_sub_w_lock (mq, &msg);
1635           e = svm_msg_q_msg_data (mq, &msg);
1636           svm_msg_q_unlock (mq);
1637
1638           if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
1639             vcl_handle_mq_event (wrk, e);
1640           svm_msg_q_free_msg (mq, &msg);
1641         }
1642     }
1643
1644   ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
1645   et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
1646   if (s->is_dgram)
1647     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
1648                                   s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
1649   else
1650     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
1651                                    SVM_Q_WAIT);
1652
1653   ASSERT (n_write > 0);
1654
1655   VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
1656         s->vpp_handle, session_handle, n_write);
1657
1658   return n_write;
1659 }
1660
1661 static vcl_session_t *
1662 vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
1663 {
1664   vcl_session_t *s;
1665   s = vcl_session_get (wrk, f->client_session_index);
1666   if (s)
1667     {
1668       /* rx fifo */
1669       if (type == 0 && s->rx_fifo == f)
1670         return s;
1671       /* tx fifo */
1672       if (type == 1 && s->tx_fifo == f)
1673         return s;
1674     }
1675   s = vcl_session_get (wrk, f->master_session_index);
1676   if (s)
1677     {
1678       if (type == 0 && s->rx_fifo == f)
1679         return s;
1680       if (type == 1 && s->tx_fifo == f)
1681         return s;
1682     }
1683   return 0;
1684 }
1685
1686 static inline int
1687 vppcom_session_write_ready (vcl_session_t * session)
1688 {
1689   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
1690   if (PREDICT_FALSE (session->is_vep))
1691     {
1692       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1693                     "cannot write to an epoll session!",
1694                     getpid (), session->vpp_handle, session->session_index);
1695       return VPPCOM_EBADFD;
1696     }
1697
1698   if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
1699     {
1700       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1701                     "cannot write to a listen session!",
1702                     getpid (), session->vpp_handle, session->session_index);
1703       return VPPCOM_EBADFD;
1704     }
1705
1706   if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
1707     {
1708       session_state_t state = session->session_state;
1709       int rv;
1710
1711       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
1712       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1713                     "session is not open! state 0x%x (%s), "
1714                     "returning %d (%s)", getpid (), session->vpp_handle,
1715                     session->session_index,
1716                     state, vppcom_session_state_str (state),
1717                     rv, vppcom_retval_str (rv));
1718       return rv;
1719     }
1720
1721   VDBG (3, "VCL<%d>: vpp handle 0x%llx, sid %u: peek %s (%p), ready = %d",
1722         getpid (), session->vpp_handle, session->session_index,
1723         session->tx_fifo, svm_fifo_max_enqueue (session->tx_fifo));
1724
1725   return svm_fifo_max_enqueue (session->tx_fifo);
1726 }
1727
1728 static inline int
1729 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
1730 {
1731   svm_msg_q_msg_t *msg;
1732   u32 n_msgs;
1733   int i;
1734
1735   n_msgs = svm_msg_q_size (mq);
1736   for (i = 0; i < n_msgs; i++)
1737     {
1738       vec_add2 (wrk->mq_msg_vector, msg, 1);
1739       svm_msg_q_sub_w_lock (mq, msg);
1740     }
1741   return n_msgs;
1742 }
1743
1744 #define vcl_fifo_rx_evt_valid_or_break(_fifo)                   \
1745 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                  \
1746   {                                                             \
1747     svm_fifo_unset_event (_fifo);                               \
1748     if (svm_fifo_is_empty (_fifo))                              \
1749       break;                                                    \
1750   }                                                             \
1751
1752 static void
1753 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
1754                             unsigned long n_bits, unsigned long *read_map,
1755                             unsigned long *write_map,
1756                             unsigned long *except_map, u32 * bits_set)
1757 {
1758   session_disconnected_msg_t *disconnected_msg;
1759   session_connected_msg_t *connected_msg;
1760   session_accepted_msg_t *accepted_msg;
1761   vcl_session_msg_t *vcl_msg;
1762   vcl_session_t *session;
1763   u64 handle;
1764   u32 sid;
1765
1766   switch (e->event_type)
1767     {
1768     case FIFO_EVENT_APP_RX:
1769       vcl_fifo_rx_evt_valid_or_break (e->fifo);
1770       sid = e->fifo->client_session_index;
1771       session = vcl_session_get (wrk, sid);
1772       if (!session)
1773         break;
1774       if (sid < n_bits && read_map)
1775         {
1776           clib_bitmap_set_no_check (read_map, sid, 1);
1777           *bits_set += 1;
1778         }
1779       break;
1780     case FIFO_EVENT_APP_TX:
1781       sid = e->fifo->client_session_index;
1782       session = vcl_session_get (wrk, sid);
1783       if (!session)
1784         break;
1785       if (sid < n_bits && write_map)
1786         {
1787           clib_bitmap_set_no_check (write_map, sid, 1);
1788           *bits_set += 1;
1789         }
1790       break;
1791     case SESSION_IO_EVT_CT_TX:
1792       vcl_fifo_rx_evt_valid_or_break (e->fifo);
1793       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
1794       if (!session)
1795         break;
1796       sid = session->session_index;
1797       if (sid < n_bits && read_map)
1798         {
1799           clib_bitmap_set_no_check (read_map, sid, 1);
1800           *bits_set += 1;
1801         }
1802       break;
1803     case SESSION_IO_EVT_CT_RX:
1804       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
1805       if (!session)
1806         break;
1807       sid = session->session_index;
1808       if (sid < n_bits && write_map)
1809         {
1810           clib_bitmap_set_no_check (write_map, sid, 1);
1811           *bits_set += 1;
1812         }
1813       break;
1814     case SESSION_CTRL_EVT_ACCEPTED:
1815       accepted_msg = (session_accepted_msg_t *) e->data;
1816       handle = accepted_msg->listener_handle;
1817       session = vcl_session_table_lookup_listener (wrk, handle);
1818       if (!session)
1819         {
1820           clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
1821                         "listener handle %llx", getpid (), handle);
1822           break;
1823         }
1824
1825       clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
1826       vcl_msg->accepted_msg = *accepted_msg;
1827       sid = session->session_index;
1828       if (sid < n_bits && read_map)
1829         {
1830           clib_bitmap_set_no_check (read_map, sid, 1);
1831           *bits_set += 1;
1832         }
1833       break;
1834     case SESSION_CTRL_EVT_CONNECTED:
1835       connected_msg = (session_connected_msg_t *) e->data;
1836       vcl_session_connected_handler (wrk, connected_msg);
1837       break;
1838     case SESSION_CTRL_EVT_DISCONNECTED:
1839       disconnected_msg = (session_disconnected_msg_t *) e->data;
1840       sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
1841       if (sid < n_bits && except_map)
1842         {
1843           clib_bitmap_set_no_check (except_map, sid, 1);
1844           *bits_set += 1;
1845         }
1846       break;
1847     case SESSION_CTRL_EVT_RESET:
1848       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
1849       if (sid < n_bits && except_map)
1850         {
1851           clib_bitmap_set_no_check (except_map, sid, 1);
1852           *bits_set += 1;
1853         }
1854       break;
1855     default:
1856       clib_warning ("unhandled: %u", e->event_type);
1857       break;
1858     }
1859 }
1860
1861 static int
1862 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
1863                       unsigned long n_bits, unsigned long *read_map,
1864                       unsigned long *write_map, unsigned long *except_map,
1865                       double time_to_wait, u32 * bits_set)
1866 {
1867   svm_msg_q_msg_t *msg;
1868   session_event_t *e;
1869   u32 i;
1870
1871   svm_msg_q_lock (mq);
1872   if (svm_msg_q_is_empty (mq))
1873     {
1874       if (*bits_set)
1875         {
1876           svm_msg_q_unlock (mq);
1877           return 0;
1878         }
1879
1880       if (!time_to_wait)
1881         {
1882           svm_msg_q_unlock (mq);
1883           return 0;
1884         }
1885       else if (time_to_wait < 0)
1886         {
1887           svm_msg_q_wait (mq);
1888         }
1889       else
1890         {
1891           if (svm_msg_q_timedwait (mq, time_to_wait))
1892             {
1893               svm_msg_q_unlock (mq);
1894               return 0;
1895             }
1896         }
1897     }
1898   vcl_mq_dequeue_batch (wrk, mq);
1899   svm_msg_q_unlock (mq);
1900
1901   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
1902     {
1903       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
1904       e = svm_msg_q_msg_data (mq, msg);
1905       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
1906                                   except_map, bits_set);
1907       svm_msg_q_free_msg (mq, msg);
1908     }
1909   vec_reset_length (wrk->mq_msg_vector);
1910   return *bits_set;
1911 }
1912
1913 static int
1914 vppcom_select_condvar (vcl_worker_t * wrk, unsigned long n_bits,
1915                        unsigned long *read_map, unsigned long *write_map,
1916                        unsigned long *except_map, double time_to_wait,
1917                        u32 * bits_set)
1918 {
1919   double total_wait = 0, wait_slice;
1920   vcl_cut_through_registration_t *cr;
1921
1922   time_to_wait = (time_to_wait == -1) ? 10e9 : time_to_wait;
1923   wait_slice = wrk->cut_through_registrations ? 10e-6 : time_to_wait;
1924   do
1925     {
1926       vcl_ct_registration_lock (wrk);
1927       /* *INDENT-OFF* */
1928       pool_foreach (cr, wrk->cut_through_registrations, ({
1929         vcl_select_handle_mq (wrk, cr->mq, n_bits, read_map, write_map, except_map,
1930                               0, bits_set);
1931       }));
1932       /* *INDENT-ON* */
1933       vcl_ct_registration_unlock (wrk);
1934
1935       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
1936                             write_map, except_map, time_to_wait, bits_set);
1937       total_wait += wait_slice;
1938       if (*bits_set)
1939         return *bits_set;
1940     }
1941   while (total_wait < time_to_wait);
1942
1943   return 0;
1944 }
1945
1946 static int
1947 vppcom_select_eventfd (vcl_worker_t * wrk, unsigned long n_bits,
1948                        unsigned long *read_map, unsigned long *write_map,
1949                        unsigned long *except_map, double time_to_wait,
1950                        u32 * bits_set)
1951 {
1952   vcl_mq_evt_conn_t *mqc;
1953   int __clib_unused n_read;
1954   int n_mq_evts, i;
1955   u64 buf;
1956
1957   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
1958   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
1959                           vec_len (wrk->mq_events), time_to_wait);
1960   for (i = 0; i < n_mq_evts; i++)
1961     {
1962       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
1963       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
1964       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
1965                             except_map, 0, bits_set);
1966     }
1967
1968   return (n_mq_evts > 0 ? (int) *bits_set : 0);
1969 }
1970
1971 int
1972 vppcom_select (unsigned long n_bits, unsigned long *read_map,
1973                unsigned long *write_map, unsigned long *except_map,
1974                double time_to_wait)
1975 {
1976   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
1977   vcl_worker_t *wrk = vcl_worker_get_current ();
1978   vcl_session_t *session = 0;
1979   int rv, i;
1980
1981   ASSERT (sizeof (clib_bitmap_t) == sizeof (long int));
1982
1983   if (n_bits && read_map)
1984     {
1985       clib_bitmap_validate (wrk->rd_bitmap, minbits);
1986       clib_memcpy_fast (wrk->rd_bitmap, read_map,
1987                         vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
1988       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (clib_bitmap_t));
1989     }
1990   if (n_bits && write_map)
1991     {
1992       clib_bitmap_validate (wrk->wr_bitmap, minbits);
1993       clib_memcpy_fast (wrk->wr_bitmap, write_map,
1994                         vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
1995       memset (write_map, 0,
1996               vec_len (wrk->wr_bitmap) * sizeof (clib_bitmap_t));
1997     }
1998   if (n_bits && except_map)
1999     {
2000       clib_bitmap_validate (wrk->ex_bitmap, minbits);
2001       clib_memcpy_fast (wrk->ex_bitmap, except_map,
2002                         vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
2003       memset (except_map, 0,
2004               vec_len (wrk->ex_bitmap) * sizeof (clib_bitmap_t));
2005     }
2006
2007   if (!n_bits)
2008     return 0;
2009
2010   if (!write_map)
2011     goto check_rd;
2012
2013   /* *INDENT-OFF* */
2014   clib_bitmap_foreach (sid, wrk->wr_bitmap, ({
2015     if (!(session = vcl_session_get (wrk, sid)))
2016       {
2017         if (except_map && sid < minbits)
2018           clib_bitmap_set_no_check (except_map, sid, 1);
2019         continue;
2020       }
2021
2022     rv = svm_fifo_is_full (session->tx_fifo);
2023     if (!rv)
2024       {
2025         clib_bitmap_set_no_check (write_map, sid, 1);
2026         bits_set++;
2027       }
2028   }));
2029
2030 check_rd:
2031   if (!read_map)
2032     goto check_mq;
2033
2034   clib_bitmap_foreach (sid, wrk->rd_bitmap, ({
2035     if (!(session = vcl_session_get (wrk, sid)))
2036       {
2037         if (except_map && sid < minbits)
2038           clib_bitmap_set_no_check (except_map, sid, 1);
2039         continue;
2040       }
2041
2042     rv = vppcom_session_read_ready (session);
2043     if (rv)
2044       {
2045         clib_bitmap_set_no_check (read_map, sid, 1);
2046         bits_set++;
2047       }
2048   }));
2049   /* *INDENT-ON* */
2050
2051 check_mq:
2052
2053   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2054     {
2055       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
2056                                   read_map, write_map, except_map, &bits_set);
2057     }
2058   vec_reset_length (wrk->unhandled_evts_vector);
2059
2060   if (vcm->cfg.use_mq_eventfd)
2061     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
2062                            time_to_wait, &bits_set);
2063   else
2064     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
2065                            time_to_wait, &bits_set);
2066
2067   return (bits_set);
2068 }
2069
2070 static inline void
2071 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_idx)
2072 {
2073   vcl_session_t *session;
2074   vppcom_epoll_t *vep;
2075   u32 sid = vep_idx;
2076
2077   if (VPPCOM_DEBUG <= 1)
2078     return;
2079
2080   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
2081   session = vcl_session_get (wrk, vep_idx);
2082   if (PREDICT_FALSE (!session))
2083     {
2084       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!",
2085                     getpid (), vep_idx);
2086       goto done;
2087     }
2088   if (PREDICT_FALSE (!session->is_vep))
2089     {
2090       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2091                     getpid (), vep_idx);
2092       goto done;
2093     }
2094   vep = &session->vep;
2095   clib_warning ("VCL<%d>: vep_idx (%u): Dumping epoll chain\n"
2096                 "{\n"
2097                 "   is_vep         = %u\n"
2098                 "   is_vep_session = %u\n"
2099                 "   next_sid       = 0x%x (%u)\n"
2100                 "   wait_cont_idx  = 0x%x (%u)\n"
2101                 "}\n", getpid (), vep_idx,
2102                 session->is_vep, session->is_vep_session,
2103                 vep->next_sh, vep->next_sh,
2104                 session->wait_cont_idx, session->wait_cont_idx);
2105
2106   for (sid = vep->next_sh; sid != ~0; sid = vep->next_sh)
2107     {
2108       session = vcl_session_get (wrk, sid);
2109       if (PREDICT_FALSE (!session))
2110         {
2111           clib_warning ("VCL<%d>: ERROR: Invalid sid (%u)!", getpid (), sid);
2112           goto done;
2113         }
2114       if (PREDICT_FALSE (session->is_vep))
2115         clib_warning ("VCL<%d>: ERROR: sid (%u) is a vep!",
2116                       getpid (), vep_idx);
2117       else if (PREDICT_FALSE (!session->is_vep_session))
2118         {
2119           clib_warning ("VCL<%d>: ERROR: session (%u) "
2120                         "is not a vep session!", getpid (), sid);
2121           goto done;
2122         }
2123       vep = &session->vep;
2124       if (PREDICT_FALSE (vep->vep_sh != vep_idx))
2125         clib_warning ("VCL<%d>: ERROR: session (%u) vep_idx (%u) != "
2126                       "vep_idx (%u)!", getpid (),
2127                       sid, session->vep.vep_sh, vep_idx);
2128       if (session->is_vep_session)
2129         {
2130           clib_warning ("vep_idx[%u]: sid 0x%x (%u)\n"
2131                         "{\n"
2132                         "   next_sid       = 0x%x (%u)\n"
2133                         "   prev_sid       = 0x%x (%u)\n"
2134                         "   vep_idx        = 0x%x (%u)\n"
2135                         "   ev.events      = 0x%x\n"
2136                         "   ev.data.u64    = 0x%llx\n"
2137                         "   et_mask        = 0x%x\n"
2138                         "}\n",
2139                         vep_idx, sid, sid,
2140                         vep->next_sh, vep->next_sh,
2141                         vep->prev_sh, vep->prev_sh,
2142                         vep->vep_sh, vep->vep_sh,
2143                         vep->ev.events, vep->ev.data.u64, vep->et_mask);
2144         }
2145     }
2146
2147 done:
2148   clib_warning ("VCL<%d>: vep_idx (%u): Dump complete!\n",
2149                 getpid (), vep_idx);
2150 }
2151
2152 int
2153 vppcom_epoll_create (void)
2154 {
2155   vcl_worker_t *wrk = vcl_worker_get_current ();
2156   vcl_session_t *vep_session;
2157
2158   vep_session = vcl_session_alloc (wrk);
2159
2160   vep_session->is_vep = 1;
2161   vep_session->vep.vep_sh = ~0;
2162   vep_session->vep.next_sh = ~0;
2163   vep_session->vep.prev_sh = ~0;
2164   vep_session->wait_cont_idx = ~0;
2165   vep_session->vpp_handle = ~0;
2166
2167   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_sh);
2168   VDBG (0, "VCL<%d>: Created vep_idx %u / sid %u!",
2169         getpid (), vep_session->session_index, vep_session->session_index);
2170
2171   return vcl_session_handle (vep_session);
2172 }
2173
2174 int
2175 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2176                   struct epoll_event *event)
2177 {
2178   vcl_worker_t *wrk = vcl_worker_get_current ();
2179   vcl_session_t *vep_session;
2180   vcl_session_t *session;
2181   int rv = VPPCOM_OK;
2182
2183   if (vep_handle == session_handle)
2184     {
2185       clib_warning ("VCL<%d>: ERROR: vep_idx == session_index (%u)!",
2186                     getpid (), vep_handle);
2187       return VPPCOM_EINVAL;
2188     }
2189
2190   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2191   if (PREDICT_FALSE (!vep_session))
2192     {
2193       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!", vep_handle);
2194       return VPPCOM_EBADFD;
2195     }
2196   if (PREDICT_FALSE (!vep_session->is_vep))
2197     {
2198       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2199                     getpid (), vep_handle);
2200       return VPPCOM_EINVAL;
2201     }
2202
2203   ASSERT (vep_session->vep.vep_sh == ~0);
2204   ASSERT (vep_session->vep.prev_sh == ~0);
2205
2206   session = vcl_session_get_w_handle (wrk, session_handle);
2207   if (PREDICT_FALSE (!session))
2208     {
2209       VDBG (0, "VCL<%d>: ERROR: Invalid session_handle (%u)!",
2210             getpid (), session_handle);
2211       return VPPCOM_EBADFD;
2212     }
2213   if (PREDICT_FALSE (session->is_vep))
2214     {
2215       clib_warning ("ERROR: session_handle (%u) is a vep!", vep_handle);
2216       return VPPCOM_EINVAL;
2217     }
2218
2219   switch (op)
2220     {
2221     case EPOLL_CTL_ADD:
2222       if (PREDICT_FALSE (!event))
2223         {
2224           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: NULL pointer to "
2225                         "epoll_event structure!", getpid ());
2226           return VPPCOM_EINVAL;
2227         }
2228       if (vep_session->vep.next_sh != ~0)
2229         {
2230           vcl_session_t *next_session;
2231           next_session = vcl_session_get_w_handle (wrk,
2232                                                    vep_session->vep.next_sh);
2233           if (PREDICT_FALSE (!next_session))
2234             {
2235               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: Invalid "
2236                             "vep.next_sid (%u) on vep_idx (%u)!",
2237                             getpid (), vep_session->vep.next_sh, vep_handle);
2238               return VPPCOM_EBADFD;
2239             }
2240           ASSERT (next_session->vep.prev_sh == vep_handle);
2241           next_session->vep.prev_sh = session_handle;
2242         }
2243       session->vep.next_sh = vep_session->vep.next_sh;
2244       session->vep.prev_sh = vep_handle;
2245       session->vep.vep_sh = vep_handle;
2246       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2247       session->vep.ev = *event;
2248       session->is_vep = 0;
2249       session->is_vep_session = 1;
2250       vep_session->vep.next_sh = session_handle;
2251
2252       VDBG (1, "VCL<%d>: EPOLL_CTL_ADD: vep_idx %u, sid %u, events 0x%x, "
2253             "data 0x%llx!", getpid (), vep_handle, session_handle,
2254             event->events, event->data.u64);
2255       vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
2256       break;
2257
2258     case EPOLL_CTL_MOD:
2259       if (PREDICT_FALSE (!event))
2260         {
2261           clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_MOD: NULL pointer to "
2262                         "epoll_event structure!", getpid ());
2263           rv = VPPCOM_EINVAL;
2264           goto done;
2265         }
2266       else if (PREDICT_FALSE (!session->is_vep_session))
2267         {
2268           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2269                         "not a vep session!", getpid (), session_handle);
2270           rv = VPPCOM_EINVAL;
2271           goto done;
2272         }
2273       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2274         {
2275           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_MOD: "
2276                         "vep_idx (%u) != vep_idx (%u)!",
2277                         getpid (), session_handle,
2278                         session->vep.vep_sh, vep_handle);
2279           rv = VPPCOM_EINVAL;
2280           goto done;
2281         }
2282       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2283       session->vep.ev = *event;
2284       VDBG (1, "VCL<%d>: EPOLL_CTL_MOD: vep_idx %u, sid %u, events 0x%x,"
2285             " data 0x%llx!", getpid (), vep_handle, session_handle,
2286             event->events, event->data.u64);
2287       break;
2288
2289     case EPOLL_CTL_DEL:
2290       if (PREDICT_FALSE (!session->is_vep_session))
2291         {
2292           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2293                         "not a vep session!", getpid (), session_handle);
2294           rv = VPPCOM_EINVAL;
2295           goto done;
2296         }
2297       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2298         {
2299           clib_warning ("VCL<%d>: ERROR: sid %u EPOLL_CTL_DEL: "
2300                         "vep_idx (%u) != vep_idx (%u)!",
2301                         getpid (), session_handle,
2302                         session->vep.vep_sh, vep_handle);
2303           rv = VPPCOM_EINVAL;
2304           goto done;
2305         }
2306
2307       vep_session->wait_cont_idx =
2308         (vep_session->wait_cont_idx == session_handle) ?
2309         session->vep.next_sh : vep_session->wait_cont_idx;
2310
2311       if (session->vep.prev_sh == vep_handle)
2312         vep_session->vep.next_sh = session->vep.next_sh;
2313       else
2314         {
2315           vcl_session_t *prev_session;
2316           prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh);
2317           if (PREDICT_FALSE (!prev_session))
2318             {
2319               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2320                             "vep.prev_sid (%u) on sid (%u)!",
2321                             getpid (), session->vep.prev_sh, session_handle);
2322               return VPPCOM_EBADFD;
2323             }
2324           ASSERT (prev_session->vep.next_sh == session_handle);
2325           prev_session->vep.next_sh = session->vep.next_sh;
2326         }
2327       if (session->vep.next_sh != ~0)
2328         {
2329           vcl_session_t *next_session;
2330           next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh);
2331           if (PREDICT_FALSE (!next_session))
2332             {
2333               clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
2334                             "vep.next_sid (%u) on sid (%u)!",
2335                             getpid (), session->vep.next_sh, session_handle);
2336               return VPPCOM_EBADFD;
2337             }
2338           ASSERT (next_session->vep.prev_sh == session_handle);
2339           next_session->vep.prev_sh = session->vep.prev_sh;
2340         }
2341
2342       memset (&session->vep, 0, sizeof (session->vep));
2343       session->vep.next_sh = ~0;
2344       session->vep.prev_sh = ~0;
2345       session->vep.vep_sh = ~0;
2346       session->is_vep_session = 0;
2347       VDBG (1, "VCL<%d>: EPOLL_CTL_DEL: vep_idx %u, sid %u!",
2348             getpid (), vep_handle, session_handle);
2349       vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
2350       break;
2351
2352     default:
2353       clib_warning ("VCL<%d>: ERROR: Invalid operation (%d)!", getpid (), op);
2354       rv = VPPCOM_EINVAL;
2355     }
2356
2357   vep_verify_epoll_chain (wrk, vep_handle);
2358
2359 done:
2360   return rv;
2361 }
2362
2363 static inline void
2364 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2365                                 struct epoll_event *events, u32 * num_ev)
2366 {
2367   session_disconnected_msg_t *disconnected_msg;
2368   session_connected_msg_t *connected_msg;
2369   session_accepted_msg_t *accepted_msg;
2370   u64 session_evt_data = ~0, handle;
2371   u32 sid = ~0, session_events;
2372   vcl_session_msg_t *vcl_msg;
2373   vcl_session_t *session;
2374   u8 add_event = 0;
2375
2376   switch (e->event_type)
2377     {
2378     case FIFO_EVENT_APP_RX:
2379       ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
2380       vcl_fifo_rx_evt_valid_or_break (e->fifo);
2381       sid = e->fifo->client_session_index;
2382       session = vcl_session_get (wrk, sid);
2383       session_events = session->vep.ev.events;
2384       if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
2385         break;
2386       add_event = 1;
2387       events[*num_ev].events |= EPOLLIN;
2388       session_evt_data = session->vep.ev.data.u64;
2389       session->has_rx_evt = 1;
2390       break;
2391     case FIFO_EVENT_APP_TX:
2392       sid = e->fifo->client_session_index;
2393       session = vcl_session_get (wrk, sid);
2394       session_events = session->vep.ev.events;
2395       if (!(EPOLLOUT & session_events))
2396         break;
2397       add_event = 1;
2398       events[*num_ev].events |= EPOLLOUT;
2399       session_evt_data = session->vep.ev.data.u64;
2400       break;
2401     case SESSION_IO_EVT_CT_TX:
2402       vcl_fifo_rx_evt_valid_or_break (e->fifo);
2403       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
2404       sid = session->session_index;
2405       session_events = session->vep.ev.events;
2406       if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
2407         break;
2408       add_event = 1;
2409       events[*num_ev].events |= EPOLLIN;
2410       session_evt_data = session->vep.ev.data.u64;
2411       session->has_rx_evt = 1;
2412       break;
2413     case SESSION_IO_EVT_CT_RX:
2414       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
2415       sid = session->session_index;
2416       session_events = session->vep.ev.events;
2417       if (!(EPOLLOUT & session_events))
2418         break;
2419       add_event = 1;
2420       events[*num_ev].events |= EPOLLOUT;
2421       session_evt_data = session->vep.ev.data.u64;
2422       break;
2423     case SESSION_CTRL_EVT_ACCEPTED:
2424       accepted_msg = (session_accepted_msg_t *) e->data;
2425       handle = accepted_msg->listener_handle;
2426       session = vcl_session_table_lookup_listener (wrk, handle);
2427       if (!session)
2428         {
2429           clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
2430                         "listener handle %llx", getpid (), handle);
2431           break;
2432         }
2433
2434       clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
2435       vcl_msg->accepted_msg = *accepted_msg;
2436       session_events = session->vep.ev.events;
2437       if (!(EPOLLIN & session_events))
2438         break;
2439
2440       add_event = 1;
2441       events[*num_ev].events |= EPOLLIN;
2442       session_evt_data = session->vep.ev.data.u64;
2443       break;
2444     case SESSION_CTRL_EVT_CONNECTED:
2445       connected_msg = (session_connected_msg_t *) e->data;
2446       vcl_session_connected_handler (wrk, connected_msg);
2447       /* Generate EPOLLOUT because there's no connected event */
2448       sid = vcl_session_index_from_vpp_handle (wrk, connected_msg->handle);
2449       session = vcl_session_get (wrk, sid);
2450       session_events = session->vep.ev.events;
2451       if (EPOLLOUT & session_events)
2452         {
2453           add_event = 1;
2454           events[*num_ev].events |= EPOLLOUT;
2455           session_evt_data = session->vep.ev.data.u64;
2456         }
2457       break;
2458     case SESSION_CTRL_EVT_DISCONNECTED:
2459       disconnected_msg = (session_disconnected_msg_t *) e->data;
2460       sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
2461       if (!(session = vcl_session_get (wrk, sid)))
2462         break;
2463       add_event = 1;
2464       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2465       session_evt_data = session->vep.ev.data.u64;
2466       session_events = session->vep.ev.events;
2467       break;
2468     case SESSION_CTRL_EVT_RESET:
2469       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2470       if (!(session = vcl_session_get (wrk, sid)))
2471         break;
2472       add_event = 1;
2473       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2474       session_evt_data = session->vep.ev.data.u64;
2475       session_events = session->vep.ev.events;
2476       break;
2477     default:
2478       VDBG (0, "unhandled: %u", e->event_type);
2479       break;
2480     }
2481
2482   if (add_event)
2483     {
2484       events[*num_ev].data.u64 = session_evt_data;
2485       if (EPOLLONESHOT & session_events)
2486         {
2487           session = vcl_session_get (wrk, sid);
2488           session->vep.ev.events = 0;
2489         }
2490       *num_ev += 1;
2491     }
2492 }
2493
2494 static int
2495 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2496                           struct epoll_event *events, u32 maxevents,
2497                           double wait_for_time, u32 * num_ev)
2498 {
2499   svm_msg_q_msg_t *msg;
2500   session_event_t *e;
2501   int i;
2502
2503   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
2504     goto handle_dequeued;
2505
2506   svm_msg_q_lock (mq);
2507   if (svm_msg_q_is_empty (mq))
2508     {
2509       if (!wait_for_time)
2510         {
2511           svm_msg_q_unlock (mq);
2512           return 0;
2513         }
2514       else if (wait_for_time < 0)
2515         {
2516           svm_msg_q_wait (mq);
2517         }
2518       else
2519         {
2520           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2521             {
2522               svm_msg_q_unlock (mq);
2523               return 0;
2524             }
2525         }
2526     }
2527   vcl_mq_dequeue_batch (wrk, mq);
2528   svm_msg_q_unlock (mq);
2529
2530 handle_dequeued:
2531   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2532     {
2533       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2534       e = svm_msg_q_msg_data (mq, msg);
2535       if (*num_ev < maxevents)
2536         vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
2537       else
2538         vec_add1 (wrk->unhandled_evts_vector, *e);
2539       svm_msg_q_free_msg (mq, msg);
2540     }
2541   vec_reset_length (wrk->mq_msg_vector);
2542
2543   return *num_ev;
2544 }
2545
2546 static int
2547 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
2548                            int maxevents, u32 n_evts, double wait_for_time)
2549 {
2550   vcl_cut_through_registration_t *cr;
2551   double total_wait = 0, wait_slice;
2552   int rv;
2553
2554   wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
2555   wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time;
2556
2557   do
2558     {
2559       vcl_ct_registration_lock (wrk);
2560       /* *INDENT-OFF* */
2561       pool_foreach (cr, wrk->cut_through_registrations, ({
2562         vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &n_evts);
2563       }));
2564       /* *INDENT-ON* */
2565       vcl_ct_registration_unlock (wrk);
2566
2567       rv = vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events,
2568                                      maxevents, n_evts ? 0 : wait_slice,
2569                                      &n_evts);
2570       if (rv)
2571         total_wait += wait_slice;
2572       if (n_evts)
2573         return n_evts;
2574     }
2575   while (total_wait < wait_for_time);
2576   return n_evts;
2577 }
2578
2579 static int
2580 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
2581                            int maxevents, u32 n_evts, double wait_for_time)
2582 {
2583   vcl_mq_evt_conn_t *mqc;
2584   int __clib_unused n_read;
2585   int n_mq_evts, i;
2586   u64 buf;
2587
2588   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2589 again:
2590   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2591                           vec_len (wrk->mq_events), wait_for_time);
2592   for (i = 0; i < n_mq_evts; i++)
2593     {
2594       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2595       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2596       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
2597     }
2598   if (!n_evts && n_mq_evts > 0)
2599     goto again;
2600
2601   return (int) n_evts;
2602 }
2603
2604 int
2605 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
2606                    int maxevents, double wait_for_time)
2607 {
2608   vcl_worker_t *wrk = vcl_worker_get_current ();
2609   vcl_session_t *vep_session;
2610   u32 n_evts = 0;
2611   int i;
2612
2613   if (PREDICT_FALSE (maxevents <= 0))
2614     {
2615       clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
2616                     getpid (), maxevents);
2617       return VPPCOM_EINVAL;
2618     }
2619
2620   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2621   if (!vep_session)
2622     return VPPCOM_EBADFD;
2623
2624   if (PREDICT_FALSE (!vep_session->is_vep))
2625     {
2626       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2627                     getpid (), vep_handle);
2628       return VPPCOM_EINVAL;
2629     }
2630
2631   memset (events, 0, sizeof (*events) * maxevents);
2632
2633   if (vec_len (wrk->unhandled_evts_vector))
2634     {
2635       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2636         {
2637           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
2638                                           events, &n_evts);
2639           if (n_evts == maxevents)
2640             {
2641               i += 1;
2642               break;
2643             }
2644         }
2645
2646       vec_delete (wrk->unhandled_evts_vector, i, 0);
2647     }
2648
2649   if (vcm->cfg.use_mq_eventfd)
2650     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
2651                                       wait_for_time);
2652
2653   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
2654                                     wait_for_time);
2655 }
2656
2657 int
2658 vppcom_session_attr (uint32_t session_handle, uint32_t op,
2659                      void *buffer, uint32_t * buflen)
2660 {
2661   vcl_worker_t *wrk = vcl_worker_get_current ();
2662   vcl_session_t *session;
2663   int rv = VPPCOM_OK;
2664   u32 *flags = buffer;
2665   vppcom_endpt_t *ep = buffer;
2666
2667   session = vcl_session_get_w_handle (wrk, session_handle);
2668   if (!session)
2669     return VPPCOM_EBADFD;
2670
2671   switch (op)
2672     {
2673     case VPPCOM_ATTR_GET_NREAD:
2674       rv = vppcom_session_read_ready (session);
2675       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d",
2676             getpid (), rv);
2677       break;
2678
2679     case VPPCOM_ATTR_GET_NWRITE:
2680       rv = vppcom_session_write_ready (session);
2681       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
2682             getpid (), session_handle, rv);
2683       break;
2684
2685     case VPPCOM_ATTR_GET_FLAGS:
2686       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
2687         {
2688           *flags = O_RDWR | (VCL_SESS_ATTR_TEST (session->attr,
2689                                                  VCL_SESS_ATTR_NONBLOCK));
2690           *buflen = sizeof (*flags);
2691           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_FLAGS: sid %u, flags = 0x%08x, "
2692                 "is_nonblocking = %u", getpid (),
2693                 session_handle, *flags,
2694                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2695         }
2696       else
2697         rv = VPPCOM_EINVAL;
2698       break;
2699
2700     case VPPCOM_ATTR_SET_FLAGS:
2701       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
2702         {
2703           if (*flags & O_NONBLOCK)
2704             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
2705           else
2706             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
2707
2708           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_FLAGS: sid %u, flags = 0x%08x,"
2709                 " is_nonblocking = %u",
2710                 getpid (), session_handle, *flags,
2711                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2712         }
2713       else
2714         rv = VPPCOM_EINVAL;
2715       break;
2716
2717     case VPPCOM_ATTR_GET_PEER_ADDR:
2718       if (PREDICT_TRUE (buffer && buflen &&
2719                         (*buflen >= sizeof (*ep)) && ep->ip))
2720         {
2721           ep->is_ip4 = session->transport.is_ip4;
2722           ep->port = session->transport.rmt_port;
2723           if (session->transport.is_ip4)
2724             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
2725                               sizeof (ip4_address_t));
2726           else
2727             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
2728                               sizeof (ip6_address_t));
2729           *buflen = sizeof (*ep);
2730           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_PEER_ADDR: sid %u, is_ip4 = %u, "
2731                 "addr = %U, port %u", getpid (),
2732                 session_handle, ep->is_ip4, format_ip46_address,
2733                 &session->transport.rmt_ip,
2734                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2735                 clib_net_to_host_u16 (ep->port));
2736         }
2737       else
2738         rv = VPPCOM_EINVAL;
2739       break;
2740
2741     case VPPCOM_ATTR_GET_LCL_ADDR:
2742       if (PREDICT_TRUE (buffer && buflen &&
2743                         (*buflen >= sizeof (*ep)) && ep->ip))
2744         {
2745           ep->is_ip4 = session->transport.is_ip4;
2746           ep->port = session->transport.lcl_port;
2747           if (session->transport.is_ip4)
2748             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip4,
2749                               sizeof (ip4_address_t));
2750           else
2751             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip6,
2752                               sizeof (ip6_address_t));
2753           *buflen = sizeof (*ep);
2754           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_LCL_ADDR: sid %u, is_ip4 = %u,"
2755                 " addr = %U port %d", getpid (),
2756                 session_handle, ep->is_ip4, format_ip46_address,
2757                 &session->transport.lcl_ip,
2758                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2759                 clib_net_to_host_u16 (ep->port));
2760         }
2761       else
2762         rv = VPPCOM_EINVAL;
2763       break;
2764
2765     case VPPCOM_ATTR_GET_LIBC_EPFD:
2766       rv = session->libc_epfd;
2767       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d",
2768             getpid (), rv);
2769       break;
2770
2771     case VPPCOM_ATTR_SET_LIBC_EPFD:
2772       if (PREDICT_TRUE (buffer && buflen &&
2773                         (*buflen == sizeof (session->libc_epfd))))
2774         {
2775           session->libc_epfd = *(int *) buffer;
2776           *buflen = sizeof (session->libc_epfd);
2777
2778           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, "
2779                 "buflen %d", getpid (), session->libc_epfd, *buflen);
2780         }
2781       else
2782         rv = VPPCOM_EINVAL;
2783       break;
2784
2785     case VPPCOM_ATTR_GET_PROTOCOL:
2786       if (buffer && buflen && (*buflen >= sizeof (int)))
2787         {
2788           *(int *) buffer = session->session_type;
2789           *buflen = sizeof (int);
2790
2791           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
2792                 getpid (), *(int *) buffer, *(int *) buffer ? "UDP" : "TCP",
2793                 *buflen);
2794         }
2795       else
2796         rv = VPPCOM_EINVAL;
2797       break;
2798
2799     case VPPCOM_ATTR_GET_LISTEN:
2800       if (buffer && buflen && (*buflen >= sizeof (int)))
2801         {
2802           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2803                                                 VCL_SESS_ATTR_LISTEN);
2804           *buflen = sizeof (int);
2805
2806           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LISTEN: %d, buflen %d",
2807                 getpid (), *(int *) buffer, *buflen);
2808         }
2809       else
2810         rv = VPPCOM_EINVAL;
2811       break;
2812
2813     case VPPCOM_ATTR_GET_ERROR:
2814       if (buffer && buflen && (*buflen >= sizeof (int)))
2815         {
2816           *(int *) buffer = 0;
2817           *buflen = sizeof (int);
2818
2819           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
2820                 getpid (), *(int *) buffer, *buflen);
2821         }
2822       else
2823         rv = VPPCOM_EINVAL;
2824       break;
2825
2826     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
2827       if (buffer && buflen && (*buflen >= sizeof (u32)))
2828         {
2829
2830           /* VPP-TBD */
2831           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
2832                                 session->tx_fifo ? session->tx_fifo->nitems :
2833                                 vcm->cfg.tx_fifo_size);
2834           *buflen = sizeof (u32);
2835
2836           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), "
2837                 "buflen %d, #VPP-TBD#", getpid (),
2838                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
2839         }
2840       else
2841         rv = VPPCOM_EINVAL;
2842       break;
2843
2844     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
2845       if (buffer && buflen && (*buflen == sizeof (u32)))
2846         {
2847           /* VPP-TBD */
2848           session->sndbuf_size = *(u32 *) buffer;
2849           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), "
2850                 "buflen %d, #VPP-TBD#", getpid (),
2851                 session->sndbuf_size, session->sndbuf_size, *buflen);
2852         }
2853       else
2854         rv = VPPCOM_EINVAL;
2855       break;
2856
2857     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
2858       if (buffer && buflen && (*buflen >= sizeof (u32)))
2859         {
2860
2861           /* VPP-TBD */
2862           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
2863                                 session->rx_fifo ? session->rx_fifo->nitems :
2864                                 vcm->cfg.rx_fifo_size);
2865           *buflen = sizeof (u32);
2866
2867           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), "
2868                 "buflen %d, #VPP-TBD#", getpid (),
2869                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
2870         }
2871       else
2872         rv = VPPCOM_EINVAL;
2873       break;
2874
2875     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
2876       if (buffer && buflen && (*buflen == sizeof (u32)))
2877         {
2878           /* VPP-TBD */
2879           session->rcvbuf_size = *(u32 *) buffer;
2880           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), "
2881                 "buflen %d, #VPP-TBD#", getpid (),
2882                 session->sndbuf_size, session->sndbuf_size, *buflen);
2883         }
2884       else
2885         rv = VPPCOM_EINVAL;
2886       break;
2887
2888     case VPPCOM_ATTR_GET_REUSEADDR:
2889       if (buffer && buflen && (*buflen >= sizeof (int)))
2890         {
2891           /* VPP-TBD */
2892           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2893                                                 VCL_SESS_ATTR_REUSEADDR);
2894           *buflen = sizeof (int);
2895
2896           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEADDR: %d, "
2897                 "buflen %d, #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2898         }
2899       else
2900         rv = VPPCOM_EINVAL;
2901       break;
2902
2903     case VPPCOM_ATTR_SET_REUSEADDR:
2904       if (buffer && buflen && (*buflen == sizeof (int)) &&
2905           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
2906         {
2907           /* VPP-TBD */
2908           if (*(int *) buffer)
2909             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEADDR);
2910           else
2911             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEADDR);
2912
2913           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d,"
2914                 " #VPP-TBD#", getpid (),
2915                 VCL_SESS_ATTR_TEST (session->attr,
2916                                     VCL_SESS_ATTR_REUSEADDR), *buflen);
2917         }
2918       else
2919         rv = VPPCOM_EINVAL;
2920       break;
2921
2922     case VPPCOM_ATTR_GET_REUSEPORT:
2923       if (buffer && buflen && (*buflen >= sizeof (int)))
2924         {
2925           /* VPP-TBD */
2926           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2927                                                 VCL_SESS_ATTR_REUSEPORT);
2928           *buflen = sizeof (int);
2929
2930           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d,"
2931                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2932         }
2933       else
2934         rv = VPPCOM_EINVAL;
2935       break;
2936
2937     case VPPCOM_ATTR_SET_REUSEPORT:
2938       if (buffer && buflen && (*buflen == sizeof (int)) &&
2939           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
2940         {
2941           /* VPP-TBD */
2942           if (*(int *) buffer)
2943             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEPORT);
2944           else
2945             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEPORT);
2946
2947           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d,"
2948                 " #VPP-TBD#", getpid (),
2949                 VCL_SESS_ATTR_TEST (session->attr,
2950                                     VCL_SESS_ATTR_REUSEPORT), *buflen);
2951         }
2952       else
2953         rv = VPPCOM_EINVAL;
2954       break;
2955
2956     case VPPCOM_ATTR_GET_BROADCAST:
2957       if (buffer && buflen && (*buflen >= sizeof (int)))
2958         {
2959           /* VPP-TBD */
2960           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2961                                                 VCL_SESS_ATTR_BROADCAST);
2962           *buflen = sizeof (int);
2963
2964           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d,"
2965                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2966         }
2967       else
2968         rv = VPPCOM_EINVAL;
2969       break;
2970
2971     case VPPCOM_ATTR_SET_BROADCAST:
2972       if (buffer && buflen && (*buflen == sizeof (int)))
2973         {
2974           /* VPP-TBD */
2975           if (*(int *) buffer)
2976             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_BROADCAST);
2977           else
2978             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_BROADCAST);
2979
2980           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, "
2981                 "#VPP-TBD#", getpid (),
2982                 VCL_SESS_ATTR_TEST (session->attr,
2983                                     VCL_SESS_ATTR_BROADCAST), *buflen);
2984         }
2985       else
2986         rv = VPPCOM_EINVAL;
2987       break;
2988
2989     case VPPCOM_ATTR_GET_V6ONLY:
2990       if (buffer && buflen && (*buflen >= sizeof (int)))
2991         {
2992           /* VPP-TBD */
2993           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2994                                                 VCL_SESS_ATTR_V6ONLY);
2995           *buflen = sizeof (int);
2996
2997           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, "
2998                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2999         }
3000       else
3001         rv = VPPCOM_EINVAL;
3002       break;
3003
3004     case VPPCOM_ATTR_SET_V6ONLY:
3005       if (buffer && buflen && (*buflen == sizeof (int)))
3006         {
3007           /* VPP-TBD */
3008           if (*(int *) buffer)
3009             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_V6ONLY);
3010           else
3011             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_V6ONLY);
3012
3013           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, "
3014                 "#VPP-TBD#", getpid (),
3015                 VCL_SESS_ATTR_TEST (session->attr,
3016                                     VCL_SESS_ATTR_V6ONLY), *buflen);
3017         }
3018       else
3019         rv = VPPCOM_EINVAL;
3020       break;
3021
3022     case VPPCOM_ATTR_GET_KEEPALIVE:
3023       if (buffer && buflen && (*buflen >= sizeof (int)))
3024         {
3025           /* VPP-TBD */
3026           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3027                                                 VCL_SESS_ATTR_KEEPALIVE);
3028           *buflen = sizeof (int);
3029
3030           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, "
3031                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3032         }
3033       else
3034         rv = VPPCOM_EINVAL;
3035       break;
3036
3037     case VPPCOM_ATTR_SET_KEEPALIVE:
3038       if (buffer && buflen && (*buflen == sizeof (int)))
3039         {
3040           /* VPP-TBD */
3041           if (*(int *) buffer)
3042             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3043           else
3044             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3045
3046           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, "
3047                 "#VPP-TBD#", getpid (),
3048                 VCL_SESS_ATTR_TEST (session->attr,
3049                                     VCL_SESS_ATTR_KEEPALIVE), *buflen);
3050         }
3051       else
3052         rv = VPPCOM_EINVAL;
3053       break;
3054
3055     case VPPCOM_ATTR_GET_TCP_NODELAY:
3056       if (buffer && buflen && (*buflen >= sizeof (int)))
3057         {
3058           /* VPP-TBD */
3059           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3060                                                 VCL_SESS_ATTR_TCP_NODELAY);
3061           *buflen = sizeof (int);
3062
3063           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, "
3064                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3065         }
3066       else
3067         rv = VPPCOM_EINVAL;
3068       break;
3069
3070     case VPPCOM_ATTR_SET_TCP_NODELAY:
3071       if (buffer && buflen && (*buflen == sizeof (int)))
3072         {
3073           /* VPP-TBD */
3074           if (*(int *) buffer)
3075             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3076           else
3077             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3078
3079           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, "
3080                 "#VPP-TBD#", getpid (),
3081                 VCL_SESS_ATTR_TEST (session->attr,
3082                                     VCL_SESS_ATTR_TCP_NODELAY), *buflen);
3083         }
3084       else
3085         rv = VPPCOM_EINVAL;
3086       break;
3087
3088     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3089       if (buffer && buflen && (*buflen >= sizeof (int)))
3090         {
3091           /* VPP-TBD */
3092           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3093                                                 VCL_SESS_ATTR_TCP_KEEPIDLE);
3094           *buflen = sizeof (int);
3095
3096           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, "
3097                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3098         }
3099       else
3100         rv = VPPCOM_EINVAL;
3101       break;
3102
3103     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3104       if (buffer && buflen && (*buflen == sizeof (int)))
3105         {
3106           /* VPP-TBD */
3107           if (*(int *) buffer)
3108             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3109           else
3110             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3111
3112           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, "
3113                 "#VPP-TBD#", getpid (),
3114                 VCL_SESS_ATTR_TEST (session->attr,
3115                                     VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3116         }
3117       else
3118         rv = VPPCOM_EINVAL;
3119       break;
3120
3121     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3122       if (buffer && buflen && (*buflen >= sizeof (int)))
3123         {
3124           /* VPP-TBD */
3125           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3126                                                 VCL_SESS_ATTR_TCP_KEEPINTVL);
3127           *buflen = sizeof (int);
3128
3129           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, "
3130                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3131         }
3132       else
3133         rv = VPPCOM_EINVAL;
3134       break;
3135
3136     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3137       if (buffer && buflen && (*buflen == sizeof (int)))
3138         {
3139           /* VPP-TBD */
3140           if (*(int *) buffer)
3141             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3142           else
3143             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3144
3145           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, "
3146                 "#VPP-TBD#", getpid (),
3147                 VCL_SESS_ATTR_TEST (session->attr,
3148                                     VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3149         }
3150       else
3151         rv = VPPCOM_EINVAL;
3152       break;
3153
3154     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3155       if (buffer && buflen && (*buflen >= sizeof (u32)))
3156         {
3157           /* VPP-TBD */
3158           *(u32 *) buffer = session->user_mss;
3159           *buflen = sizeof (int);
3160
3161           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d,"
3162                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3163         }
3164       else
3165         rv = VPPCOM_EINVAL;
3166       break;
3167
3168     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3169       if (buffer && buflen && (*buflen == sizeof (u32)))
3170         {
3171           /* VPP-TBD */
3172           session->user_mss = *(u32 *) buffer;
3173
3174           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, "
3175                 "#VPP-TBD#", getpid (), session->user_mss, *buflen);
3176         }
3177       else
3178         rv = VPPCOM_EINVAL;
3179       break;
3180
3181     case VPPCOM_ATTR_GET_REFCNT:
3182       rv = vcl_session_get_refcnt (session);
3183       break;
3184
3185     default:
3186       rv = VPPCOM_EINVAL;
3187       break;
3188     }
3189
3190   return rv;
3191 }
3192
3193 int
3194 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3195                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3196 {
3197   vcl_worker_t *wrk = vcl_worker_get_current ();
3198   int rv = VPPCOM_OK;
3199   vcl_session_t *session = 0;
3200
3201   if (ep)
3202     {
3203       session = vcl_session_get_w_handle (wrk, session_handle);
3204       if (PREDICT_FALSE (!session))
3205         {
3206           VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
3207                 getpid (), session_handle);
3208           return VPPCOM_EBADFD;
3209         }
3210       ep->is_ip4 = session->transport.is_ip4;
3211       ep->port = session->transport.rmt_port;
3212     }
3213
3214   if (flags == 0)
3215     rv = vppcom_session_read (session_handle, buffer, buflen);
3216   else if (flags & MSG_PEEK)
3217     rv = vppcom_session_peek (session_handle, buffer, buflen);
3218   else
3219     {
3220       clib_warning ("VCL<%d>: Unsupport flags for recvfrom %d",
3221                     getpid (), flags);
3222       return VPPCOM_EAFNOSUPPORT;
3223     }
3224
3225   if (ep)
3226     {
3227       if (session->transport.is_ip4)
3228         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3229                           sizeof (ip4_address_t));
3230       else
3231         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3232                           sizeof (ip6_address_t));
3233     }
3234
3235   return rv;
3236 }
3237
3238 int
3239 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3240                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3241 {
3242   if (!buffer)
3243     return VPPCOM_EINVAL;
3244
3245   if (ep)
3246     {
3247       // TBD
3248       return VPPCOM_EINVAL;
3249     }
3250
3251   if (flags)
3252     {
3253       // TBD check the flags and do the right thing
3254       VDBG (2, "VCL<%d>: handling flags 0x%u (%d) not implemented yet.",
3255             getpid (), flags, flags);
3256     }
3257
3258   return (vppcom_session_write (session_handle, buffer, buflen));
3259 }
3260
3261 int
3262 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3263 {
3264   vcl_worker_t *wrk = vcl_worker_get_current ();
3265   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3266   u32 i, keep_trying = 1;
3267   svm_msg_q_msg_t msg;
3268   session_event_t *e;
3269   int rv, num_ev = 0;
3270
3271   VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
3272         getpid (), vp, n_sids, wait_for_time);
3273
3274   if (!vp)
3275     return VPPCOM_EFAULT;
3276
3277   do
3278     {
3279       vcl_session_t *session;
3280
3281       /* Dequeue all events and drop all unhandled io events */
3282       while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
3283         {
3284           e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
3285           vcl_handle_mq_event (wrk, e);
3286           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
3287         }
3288       vec_reset_length (wrk->unhandled_evts_vector);
3289
3290       for (i = 0; i < n_sids; i++)
3291         {
3292           session = vcl_session_get (wrk, vp[i].sid);
3293           if (!session)
3294             {
3295               vp[i].revents = POLLHUP;
3296               num_ev++;
3297               continue;
3298             }
3299
3300           vp[i].revents = 0;
3301
3302           if (POLLIN & vp[i].events)
3303             {
3304               rv = vppcom_session_read_ready (session);
3305               if (rv > 0)
3306                 {
3307                   vp[i].revents |= POLLIN;
3308                   num_ev++;
3309                 }
3310               else if (rv < 0)
3311                 {
3312                   switch (rv)
3313                     {
3314                     case VPPCOM_ECONNRESET:
3315                       vp[i].revents = POLLHUP;
3316                       break;
3317
3318                     default:
3319                       vp[i].revents = POLLERR;
3320                       break;
3321                     }
3322                   num_ev++;
3323                 }
3324             }
3325
3326           if (POLLOUT & vp[i].events)
3327             {
3328               rv = vppcom_session_write_ready (session);
3329               if (rv > 0)
3330                 {
3331                   vp[i].revents |= POLLOUT;
3332                   num_ev++;
3333                 }
3334               else if (rv < 0)
3335                 {
3336                   switch (rv)
3337                     {
3338                     case VPPCOM_ECONNRESET:
3339                       vp[i].revents = POLLHUP;
3340                       break;
3341
3342                     default:
3343                       vp[i].revents = POLLERR;
3344                       break;
3345                     }
3346                   num_ev++;
3347                 }
3348             }
3349
3350           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3351             {
3352               vp[i].revents = POLLNVAL;
3353               num_ev++;
3354             }
3355         }
3356       if (wait_for_time != -1)
3357         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3358     }
3359   while ((num_ev == 0) && keep_trying);
3360
3361   if (VPPCOM_DEBUG > 3)
3362     {
3363       clib_warning ("VCL<%d>: returning %d", getpid (), num_ev);
3364       for (i = 0; i < n_sids; i++)
3365         {
3366           clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
3367                         ".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
3368                         vp[i].events, vp[i].revents);
3369         }
3370     }
3371   return num_ev;
3372 }
3373
3374 int
3375 vppcom_mq_epoll_fd (void)
3376 {
3377   vcl_worker_t *wrk = vcl_worker_get_current ();
3378   return wrk->mqs_epfd;
3379 }
3380
3381 int
3382 vppcom_session_index (uint32_t session_handle)
3383 {
3384   return session_handle & 0xFFFFFF;
3385 }
3386
3387 int
3388 vppcom_session_handle (uint32_t session_index)
3389 {
3390   return (vcl_get_worker_index () << 24) | session_index;
3391 }
3392
3393 int
3394 vppcom_worker_register (void)
3395 {
3396   if (!vcl_worker_alloc_and_init ())
3397     return VPPCOM_EEXIST;
3398
3399   if (vcl_worker_set_bapi ())
3400     return VPPCOM_EEXIST;
3401
3402   if (vcl_worker_register_with_vpp ())
3403     return VPPCOM_EEXIST;
3404
3405   return VPPCOM_OK;
3406 }
3407
3408 int
3409 vppcom_worker_index (void)
3410 {
3411   return vcl_get_worker_index ();
3412 }
3413
3414 /*
3415  * fd.io coding-style-patch-verification: ON
3416  *
3417  * Local Variables:
3418  * eval: (c-set-style "gnu")
3419  * End:
3420  */